連假過一半了,有點感傷...直接開始正題吧...
如果要將多執行緒的概念放到Observable當中,那就會用到Scheduler,如果有用過GCD的話,Scheduler的實作方式就是基於GCD然後再封裝,可以在各種operator在串接時,切換到不同的Scheduler,Scheduler可以較常使用有以下四種:
MainScheduler
封裝GCD的MainQueue,任務會執行在MainThread,通常用於更新UI的程式
SerialDispatchQueueScheduler
封装GCD的SerialQueue,任務會依序執行,可用於將任務放置在背景依序執行,在使用observeOn時,Rx對此進行多項優化。
ConcurrentDispatchQueueScheduler
封裝GCD的ConcurrentQueue,任務會同時被執行,可用於任務需要較長時間執行的狀況。
OperationQueueScheduler
封裝NSOperationQueue,跟ConcurrentDispatchQueueScheduler類似,但可以控制最大Concurrent的任務數量。
值得注意的是,Scheduler 不等於Thread,它們不是一對一的關係,除了MainScheduler是在Main Thread被處理之外,你不會知道你的Scheduler在哪個Thread被處理。
前面講過,Observable 只有在被訂閱的時候,才會開始發送元素給他的訂閱者,無論他串了多少個 map, filter, flatMap … 等多個 operator 也一樣,那他發出元素的那個起點,要在哪個 Scheduler 執行,這個就是 subscribeOn。
因為 Functional programming 的特性就是可以串很多個 operator,在串的過程中,我也可以決定要在哪個 Scheduler 執行,這時候就用 observeOn。
我們先寫一個 func,它能告訴我現在運行的 Thread 是哪個。
func getThreadName() -> String {
if Thread.current.isMainThread {
return "Main Thread"
} else if let name = Thread.current.name {
if name == "" {"
return "Anonymous Thread"
}
return name
} else {
return "Unknown Thread"
}
}
接著,我們建立一個 Observable,並且經過一次 map operator,接著訂閱它,我們看看在一般的狀況下,執行結果會是如何
Observable<Int>.create { observer in
print("subscription code: \(self.getThreadName())")
observer.onNext(1)
observer.onCompleted()
return Disposables.create()
}
.map { number -> Int in
print("1: \(self.getThreadName()) / \(number)")
return number * 2
}
.subscribe(onNext: { number in
print("2: \(self.getThreadName()) / \(number)")
})
.disposed(by: disposeBag)
執行結果
subscription code: Main Thread
1: Main Thread / 1
2: Main Thread / 2
Observable<Int>.create { observer in
print("subscription code: \(self.getThreadName())")
observer.onNext(1)
observer.onCompleted()
return Disposables.create()
}
.subscribeOn(ConcurrentDispatchQueueScheduler.init(qos: .background))
.map { number -> Int in
print("1: \(self.getThreadName()) / \(number)")
return number * 2
}
.subscribe(onNext: { number in
print("2: \(self.getThreadName()) / \(number)")
})
.disposed(by: disposeBag)
執行結果
subscription code: Anonymous Thread
1: Anonymous Thread / 1
2: Anonymous Thread / 2
subscribeOn 個人覺得有幾個重點
Observable<Int>.create { observer in
print("subscription code: \(self.getThreadName())")
observer.onNext(1)
observer.onCompleted()
return Disposables.create()
}
.subscribeOn(ConcurrentDispatchQueueScheduler.init(qos: .background))
.observeOn(MainScheduler.instance)
.map { number -> Int in
print("1: \(self.getThreadName()) / \(number)")
return number * 2
}
.observeOn(SerialDispatchQueueScheduler.init(qos: .background))
.subscribe(onNext: { number in
print("2: \(self.getThreadName()) / \(number)")
})
.disposed(by: disposeBag)
observeOn 個人覺得有一個重點
如果沒指定 subscribeOn,就看當前 Observable create 的產生元素的 Scheduler 在哪就在哪
Observable<Int>.create { observer in
print("subscription code: \(self.getThreadName())")
observer.onNext(1)
observer.onCompleted()
return Disposables.create()
}
.observeOn(ConcurrentDispatchQueueScheduler.init(qos: .background))
.map { number -> Int in
print("1: \(self.getThreadName()) / \(number)")
return number * 2
}
.observeOn(MainScheduler.instance)
.subscribe(onNext: { number in
print("2: \(self.getThreadName()) / \(number)")
})
.disposed(by: disposeBag)
執行結果
subscription code: Main Thread
1: Anonymous Thread / 1
2: Main Thread / 2
如果沒指定observeOn,就看當前subscribeOn的Scheduler在哪就在哪,也就是說,只要控制subscribeOn的 Scheduler,後續沒特別切換Scheduler,後續的stream都會是在同一個Scheduler處理。
今天講完Scheduler,到此為止,Rx的基礎應該是都帶過了,明天開始希望能帶一些實作,也對應之前所講的總總,就這樣,掰掰