拜 摩爾定律 所賜,現在的計算資源越來越強,有越來越多的程式可以分散地、平行化 (parallelism) 的運行,平行運行下的程式勢必會遇到諸如 race condition、deadlock 或難以測試的問題,之後幾天我們會使用 pure function 來建立支援平行化和異步計算的 library,除此之外,我們還可以學習從 functional programming 的角度應對平行運行時的設計思維,
pure function 的好處之一就是易於組合和模組化,所以我們會維持一貫主題 分離關注點,直到真的 ''運行" 之前,把所有計算、轉換當成某種表達式來 "組合" 起來。
就讓我們從簡單的使用案例開始吧!
假設我們有個 sum
function 加總 Seq,
def sum(ints: IndexedSeq[Int]) =
ints.foldLeft(0)(_ + _)
如果我們不想依序的加總,我們可以用 divide-and-conquer (分治) 算法來加速計算,
def sum(ints: IndexedSeq[Int]): Int =
if ints.size <= 1 then
ints.headOption.getOrElse(0)
else
val (l, r) = ints.splitAt(ints.size / 2)
sum(l) + sum(r)
sum(l) 和 sum(r) 是我們可以進行平行運算的地方,所以我們可以定義 Par[A]
這個容器型態來表示它可能會在不同執行緒上運行,然後將欲回傳的屬性表示 A,以此處來說就是 Int,最後我們得取得運行結果,所以我們需要以下 2 個 function:
def unit[A](a: => A): Par[A]
接受還沒 evaluate (運行) 的 call-by-name 表達式參數,然後回傳 Par,表示它可能會在不同執行緒上運行。
def get[A](a: Par[A]): A
從平行化運行的 Par 中取得結果。
根據我們定義的 type 修改後的程式如下。
def sum(ints: IndexedSeq[Int]): Int =
if ints.size <= 1 then
ints.headOption.getOrElse(0)
else
val (l, r) = ints.splitAt(ints.size / 2)
val sumL: Par[Int] = Par.unit(sum(l))
val sumR: Par[Int] = Par.unit(sum(r))
Par.get(sumL) + Par.get(sumR)
為什麼我們不使用 java.lang.Thraed 呢?最大的原因是 Thread 的 start 和 join 並沒有回傳有意義的值,所以若我們要從 Runnable 取得結果時,勢必會發生 side effect。
public interface Runnable { public abstract void run(); } public class Thread implements Runnable { public synchronized void start() public final void join() }
在這裡,如果我們使用 Substitution Model 把 sumL 和 sumR 替換掉的話,雖然結果還是正確,但它失去平行化功能了,
Par.get(Par.unit(sum(l))) + Par.get(Par.unit(sum(r)))
代表 unit 在給 get 當參數用時,有著 side effect,我們不能直接內嵌 unit 進去,因為 get 得等待 Par 運行完成然後取得結果,
所以看起來我們要避免調用 get,或者減少調用次數,在最終階段才調用,且我們也想要 Par 型態是有能力組合異步計算,而不用等待執行緒完成,
或許我們可定義一個新 function map2 來嘗試解決,
def sum(ints: IndexedSeq[Int]): Par[Int] =
if ints.size <= 1 then
Par.unit(ints.headOption.getOrElse(0))
else
val (l, r) = ints.splitAt(ints.size / 2)
Par.map2(sum(l), sum(r))(_ + _)
但此時又有新的問題了,如果我們細部拆解調用順序,如下圖,
可以發現因為 scala function 預設是 strict 的關係,evaluate 參數的順序是由左到右,所以 sum(r) 不會馬上執行,而是要等到 sum(l) 做完才會輪到 sum(r),看起來我們得讓 map2 lazy,而且能立即把 2 個參數平行化運行。
Par.map2(Par.unit(sum(l)), Par.unit(sum(r)))(_ + _)
但這樣真的好嗎?如果是以下程式平行化運行好像沒什麼太意義,我們真的需要分隔 logical thread (邏輯執行緒) 來運行嗎?
Par.map2(Par.unit(1), Par.unit(1))(_ + _)
這也點出了另一個問題,我們沒有選項,能明確讓程式知道我們真的要把平行化運行從主執行緒 fork 出來,因此我們可以在定義一個 fork function 表達 fork 的決心,
def fork[A](a: => Par[A]): Par[A]
有了 fork,我們可以讓 map2 保持 strict,最後我們的 sum 會長的下面這樣。
def sum(ints: IndexedSeq[Int]): Par[Int] =
if ints.size <= 1 then
Par.unit(ints.headOption.getOrElse(0))
else
val (l, r) = ints.splitAt(ints.size / 2)
Par.map2(Par.fork(sum(l)), Par.fork(sum(r)))(_ + _)
現在來看一下之前的 unit function 要 strict 還是 lazy,因為有 fork function 的關係,我們可以讓 unit 變為 strict,然後使用 fork 和 unit 來實作 lazyUnit function,
def unit[A](a: A): Par[A]
def lazyUnit[A](a: => A): Par[A] = fork(unit(a))
最後來看一下 fork 該如何實現,因為 Par 只是個容器型態,表示我們會用多執行緒運行程式,
如果 fork 的實作會立即建立多執行緒,我們會失去控制平行化策略的彈性,從 基於介面而非實現開發 這個觀點來看,我們需要一個 function 來負責啟動,倘若不立即建立多執行緒,那我們就更需要一個 function 來啟動,
所以我們可以把 get
改名為 run
,來命令我們的 library 開始運行,並取回運行結果。
def run[A](a: Par[A]): A
我們的 library 需要哪些核心 API 已經推導的差不多了,明天繼續吧!