iT邦幫忙

2017 iT 邦幫忙鐵人賽
DAY 5
1
Big Data

資料科學:使用 Clojure系列 第 5

Day 05 - Clojure 基礎知識(五)

core.async

core.async 這個庫的目的是允許工程師在程式中建立多個互相不需要乎相關聯(先後順序)的函數組。core.async 是從 Go 語言中的通道(channel)的概念衍伸而來。這裡會討論到通信方法、使用 parking / blocking 的技術管理線程,使用 alts!! 及處理序列(queues),最後討論處理如何 callbacks。

在 Clojure 設計者們的思考中,他們嘗試把事物的本質定義為:它們能識別的事件與它們對這些事件如何進行處置的集合體。(define every thing's essence as the set of the events it recognizes and how it responds)。

首先建立一個有 core.async 的新專案

lein new app playsync

修改 project.clj:dependencies(最新版參照 Github,不過這邊維持原作者的版本)

[[org.clojure/clojure "1.7.0"]
 [org.clojure/core.async "0.1.346.0-17112a-alpha"]]

通過修改 core.clj 的 namespace 引用,就會在當前的命名空間引入 core.async 的工具了:

(ns playsync.core
  (:require [clojure.core.async
             :as a
             :refer [>! <! >!! <!! go chan buffer close! thread
                     alts! alts!! timeout]]))

>!<!>!!<!! 不是比較喔,是 core.async 的 macro 的名稱。接著來看操作範例:

(def echo-chan (chan))
(go (println (<! echo-chan)))
(>!! echo-chan "ketchup")
; => true
; => ketchup

解釋:

  • 第一行建立了一個叫做 echo-chan 的 channel。可以把消息放入,也可以取出。可以想像在放置、取出過程中將會有短暫的暫停。
  • 第二行使用 go 創建了一個新 list。在 go 裡面的所有東西(稱為 go block)都併發的運行在一個單獨的線程上。注意:go block 會把函數放在一個線程池(thread pool)裡面,線程數是 CPU 數量的兩倍。<! 的意思是,當有消息進入了 chan,就把消息取出並顯示。
  • 第三行 >!! 則把字串 "ketchup" 放到 echo-chan 中,返回 true 表示成功。

Buffering

上面的通道容量只有一,換句話說,若通道內的訊息沒有被提取,是沒辦法繼續放入新的訊息的。可以為通道指定容量。

(def echo-buffer (chan 2))
(>!! echo-buffer "ketchup")
; => true
(>!! echo-buffer "ketchup")
; => true
(>!! echo-buffer "ketchup")
; This blocks because the channel buffer is full

第三個 "ketchup" 輸入時,REPL 會阻塞。如果我們添加額外參數到通道上,參數就有另外的性質。sliding-buffer 「滑動緩衝」會按照先進先出的概念丟棄通道內容,而 dropping-buffer 「丟棄緩衝」則按照後進先出的方式丟棄通道內容。使用方法如下:

(def echo-buffer (sliding-buffer 2))

Parking & Blocking

Action Inside go block Outside go block
Put >! or >!! >!!
Take <! or <!! <!!

差別在於,停泊的時候,當前線程會把等待中的任務「放到一邊」,所以線程可以空出來做其他沒有等待中任務的程式段。而阻塞則會持續占用該線程。當有非常多通道的時候,停泊會在堪用的線程中切換任務,而阻塞(兩個驚嘆號)則會開啟新線程繼續處理任務。

thread

假如程序將占用線程很長一段時間,可以改用 future 的通道版本 thread。怎麼說呢?例如,我們使用四個線程執行下載。當下載進行中,線程並不能把當前任務「停泊」(停泊只能夠在通道有訊息在裡面,然而無存取的狀態才行)。這時候,線程池其實是處於被占用狀態。對於這種不可停泊的任務,可使用 thread 在線程池外另外創建線程。


上一篇
Day 04 - Clojure 基礎知識(四)
下一篇
Day 06 - 單元一:描述統計(一)
系列文
資料科學:使用 Clojure30

尚未有邦友留言

立即登入留言