golang 的 chan select 實在太方便,其實任何提供了協程的語言都能很好且方便的支持 chan 和 select,因爲經常寫 typescript 腳本,於是我把這兩個組件實現到了一個 typescript,你可以直接使用我的庫來得到 chan 和 select,本文後續是實現代碼的分析,你也可以參照分析去任何支持協程的語言中把golang的特性發揚光大
緊跟上篇文章 我們已經實現了 Reader 和 Writer 可以用來創建和通知讀寫任務了
但 Reader 只能讀 Writer 只能寫, chan 是可以讀寫的,所以我們現在來創建一個新的 class RW ,它可讀寫並帶有一個可選的讀寫緩衝。
RW 要實現可讀寫直接包含一個 Reader 和 Writer 的實例即可,要實現緩衝直接使用一個數組或鏈表即可,只要能滿足數組先進先出即可
class RW<T>
{
// 讀寫緩存
private list: Array<T> | undefined
// 可取器
private r_ = new Reader()
// 寫入器
private w_ = new Writer()
}
緩存需要先進先出,數組刪除第一個元素會很慢,鏈表則快很多,但是我們緩存長度是固定的數組可以一次分配足夠內存,鏈表則需要頻繁申請和釋放內存,爲此我使用原始數組實現了一個先進先出的 Ring 隊列來充當緩存,其定義如下
class Ring<T> {
private offset_ = 0
private size_ = 0
constructor(private readonly arrs: Array<T>) {
}
get length(): number {
return this.size_
}
get capacity(): number {
return this.arrs.length
}
// 在隊列某位壓入一個數據,如果隊列已慢就返回 false
push(val: T): boolean {
const arrs = this.arrs
const size = this.size_
if (size == arrs.length) {
return false
}
arrs[(this.offset_ + size) % arrs.length] = val
this.size_++
return true
}
// 如果隊列不爲空 就將其第一個元素 彈出,否則返回迭代器 {done:true}
pop(): IteratorResult<T> {
const size = this.size_
if (size == 0) {
return noResult
}
const val = this.arrs[this.offset_++]
if (this.offset_ == this.arrs.length) {
this.offset_ = 0
}
this.size_--
return {
value: val,
}
}
}
所以我的 RW 定義如下
class RW<T>
{
// 讀寫緩存
private list: Ring<T> | undefined
// 可取器
private r_ = new Reader()
// 寫入器
private w_ = new Writer()
constructor(buf: number) {
if (buf > 0) { // 如果構造傳入了緩存大小就創建一個緩存
this.list = new Ring<T>(new Array<T>(buf))
}
}
}
我們的 class RW 包含了 Reader Writer 屬性和一個緩存,我們先看下要如何讀取,
爲此定義了兩個函數 tryRead 和 read, tryRead 完成了步驟 1,2,3 如果成功了就不必執行 read 了,read 則單獨完成了步驟 4,下面是實現代碼
class RW<T>{
tryRead(): IteratorResult<any> | undefined {
const list = this.list
if (list) { // 緩存存在
const result = list.pop()
if (!result.done) { // 緩存中有值,將緩存的值讀出來傳給調用者
return result
}
// 緩存爲空 執行後續讀取代碼
}
// 已經關閉 返回一個 undefined 作爲 關閉標記
if (this.isClosed) {
return
}
// 從 writer 獲取值
const w = this.w_
if (w.isEmpty) { // writer 爲空沒有寫入任務
return noResult //返回讀取失敗
}
return {
value: w.invoke(), // writer 有寫入任務,則讓 writer 完成一個任務並把寫入的值傳遞給 讀取者
}
}
read(callback: ReadCallback): ReadValue {
// 直接註冊一個讀取任務,所以 read 之前必須要先調用 tryRead 以確定沒有值可讀
return this.r_.connect(callback)
}
}
我們還是先看下要如何寫入,
爲此定義了兩個函數 tryWrite 和 write, tryWrite 完成了步驟 1,2,3 如果成功了就不必執行 write 了,write 則單獨完成了步驟 4,下面是實現代碼
class RW<T>{
tryWrite(val: T): boolean | undefined {
// 如果已經關閉 就返回一個 undefined 作爲關閉標記
if (this.isClosed) {
return
}
const r = this.r_
if (r.isEmpty) { // 沒有讀取任務
// 將數據寫入緩存
return this.list?.push(val) ?? false // 緩存滿了 push 會返回失敗
}
// 存在讀取任務,則用值通知 讀取任務完成
r.invoke({
value: val,
})
return true // 返回寫入成功
}
write(callback: WriteCallback, reject: RejectCallback, val: T): WirteValue {
// 直接註冊一個寫入任務,所以 write 之前必須要先調用 tryWrite 確定沒有值可寫
return this.w_.connect(callback, reject, val)
}
}
下面是 class RW 的完整代碼
class RW<T>{
private list: Ring<T> | undefined
constructor(buf: number) {
if (buf > 0) {
this.list = new Ring<T>(new Array<T>(buf))
}
}
private r_ = new Reader()
private w_ = new Writer()
tryRead(): IteratorResult<any> | undefined {
// 讀取緩存
const list = this.list
if (list) {
const result = list.pop()
if (!result.done) {
return result
}
}
// 是否關閉
if (this.isClosed) {
return
}
// 讀取 writer
const w = this.w_
if (w.isEmpty) { // 沒有寫入者
return noResult
}
return {
value: w.invoke(),
}
}
read(callback: ReadCallback): ReadValue {
// 設置待讀
return this.r_.connect(callback)
}
tryWrite(val: T): boolean | undefined {
if (this.isClosed) {
return
}
const r = this.r_
if (r.isEmpty) { // 沒有讀取者
// 寫入緩存
return this.list?.push(val) ?? false
}
r.invoke({
value: val,
})
return true
}
write(callback: WriteCallback, reject: RejectCallback, val: T): WirteValue {
// 設置待寫
return this.w_.connect(callback, reject, val)
}
close(): boolean {
if (this.isClosed) {
return false
}
this.isClosed = true
this.w_.close()
this.r_.close()
const closed = this.closed_
if (closed) {
this.closed_ = undefined
closed.resolve()
}
return true
}
wait(): undefined | Promise<void> {
if (this.isClosed) {
return
}
let closed = this.closed_
if (closed) {
return closed.promise
}
closed = new Completer<void>()
this.closed_ = closed
return closed.promise
}
private closed_: Completer<void> | undefined
isClosed = false
get length(): number {
return this.list?.length ?? 0
}
get capacity(): number {
return this.list?.capacity ?? 0
}
}
到此我們其實就已經實現好了 chan 的功能,只是現在是 RW 用回調函數來通知 寫入和讀取完成,對於支持協程的語言我們只需要 使用一個 class Chan 來包裝下然後以 協程 的形式來等待而非回調函數來等待通知即可,因爲很簡單基本上是對 RW 的調用所以直接貼關鍵代碼了
export class Chan<T> implements ReadChannel<T>, WriteChannel<T> {
// 存儲了底層的讀寫實現
private rw_: RW<T>
get rw(): RW<T> {
return this.rw_
}
constructor(buf = 0) {
// 依據參數創建是否帶緩存的底層讀寫器
this.rw_ = new RW<T>(Math.floor(buf))
}
// golang <-chan 的 chan 讀取實現
read(): IteratorResult<T> | Promise<IteratorResult<T>> {
const rw = this.rw_
const val = rw.tryRead()
if (val === undefined) {
// chan 已經關閉
return noResult
} else if (!val.done) {
// 返回讀取到的值
return val
}
// 使用 js 的 Promise 來等待
return new Promise((resolve) => {
rw.read(resolve) // 通知 Promise 完成
})
}
// golang 沒有直接提供這個函數但可以使用 select default 來實現,這裏直接提供了
tryRead(): IteratorResult<T> | undefined {
const rw = this.rw_
const val = rw.tryRead()
if (val === undefined) {
// chan 已經關閉
return noResult
} else if (!val.done) {
// 返回讀取到的值
return val
}
return undefined
}
// golang chan<- 的 chan 寫入實現
write(val: T, exception?: boolean): boolean | Promise<boolean> {
const rw = this.rw_
const result = rw.tryWrite(val)
if (result === undefined) {
// chan 已經關閉
if (exception) { // 依據調用參數(使用者自己決定) 是要拋出異常還是返回 false
throw errChannelClosed
}
return false
} else if (result) {
// 寫入 chan 成功
return true
}
// 使用 js 的 Promise 來等待
return new Promise((resolve, reject) => {
rw.write(resolve, exception ? reject : undefined, val) // 通知 Promise 成功或失敗
})
}
// golang 沒有直接提供這個函數但可以使用 select default 來實現,這裏直接提供了
tryWrite(val: T, exception?: boolean): boolean {
const rw = this.rw_
const result = rw.tryWrite(val)
if (result === undefined) {
// chan 已經關閉
if (exception) {
throw errChannelClosed
}
return false
} else if (result) {
// 寫入 chan 成功
return true
}
// 目前不可寫
return false
}
// 這個返回已經緩存了多少個數據
get length(): number {
return this.rw.length
}
// 這裏返回緩衝區容量
get capacity(): number {
return this.rw.capacity
}
// 這裏實現 js 的迭代器,就可以使用 原生的 await for of 來讀取 chan 了
async *[Symbol.asyncIterator](): AsyncGenerator<T> {
while (true) {
const val = await this.read()
if (val.done) {
break
}
yield val.value
}
}
}
所有工具都其實都已經完備了,下篇文章將分析如何實現 select 來等待多個 chan