後來,頭就被我改了一下
type Sequence
chain::Task
end
function (s::Sequence)(f::Function, g::Function)
s.chain = init_task(bridge, s.chain, init_task(f, g))
return s
end
function seq(s)
function start()
produce()
for x in s
produce(Disposible(x))
end
return :done
end
return Sequence(init_task(start))
end
附上尾巴
function to_list(s::Sequence)
result = Vector()
while true
x = consume(s.chain)
println(x)
if x == :done
break
else
append!(result, get(x))
end
end
return result
end
講一講為什麼頭跟尾巴這麼難想,因為後來我發現頭要負責push,也就是把東西丟出去的任務,然後尾巴要負責把東西pull進來,整個前後要一致。
再來最最重要的是需要一個訊號跟尾巴說我們做完了,所以我就傳了:done
作為結束訊號。
不過就要補上以下的程式碼。
function _map(x::Symbol, f::Function)
return x
end
function _filter(x::Symbol, f::Function)
return x
end
function _reduce(x::Symbol, f::Function)
return x
end
如此一來,就能順利把東西往下丟了!
t = seq([1,2,3,4,5])(map, x -> x+5)(map, x -> x*2)
l = to_list(t)
印出
Disposible(1) -> Disposible(6)
Disposible(6) -> Disposible(12)
Disposible(12)
Disposible(2) -> Disposible(7)
Disposible(7) -> Disposible(14)
Disposible(14)
Disposible(3) -> Disposible(8)
Disposible(8) -> Disposible(16)
Disposible(16)
Disposible(4) -> Disposible(9)
Disposible(9) -> Disposible(18)
Disposible(18)
Disposible(5) -> Disposible(10)
Disposible(10) -> Disposible(20)
Disposible(20)
done -> done
done -> done
done
回傳
Any[12,14,16,18,20]
到這邊算是大功告成了!開發reactive programming的任務成功!
接著需要把程式碼整理一下跟擴展功能,這部份就不會寫到文章裡了。