iT邦幫忙

第 12 屆 iThome 鐵人賽

DAY 19
0
Mobile Development

RxSwift / 30天探索之旅系列 第 19

第 19 天 - Schedulers

  • 分享至 

  • xImage
  •  

連假過一半了,有點感傷...直接開始正題吧...

Scheduler

如果要將多執行緒的概念放到Observable當中,那就會用到Scheduler,如果有用過GCD的話,Scheduler的實作方式就是基於GCD然後再封裝,可以在各種operator在串接時,切換到不同的Scheduler,Scheduler可以較常使用有以下四種:

MainScheduler
封裝GCD的MainQueue,任務會執行在MainThread,通常用於更新UI的程式

SerialDispatchQueueScheduler
封装GCD的SerialQueue,任務會依序執行,可用於將任務放置在背景依序執行,在使用observeOn時,Rx對此進行多項優化。

ConcurrentDispatchQueueScheduler
封裝GCD的ConcurrentQueue,任務會同時被執行,可用於任務需要較長時間執行的狀況。

OperationQueueScheduler
封裝NSOperationQueue,跟ConcurrentDispatchQueueScheduler類似,但可以控制最大Concurrent的任務數量。

值得注意的是,Scheduler 不等於Thread,它們不是一對一的關係,除了MainScheduler是在Main Thread被處理之外,你不會知道你的Scheduler在哪個Thread被處理。

SubscribeOn

前面講過,Observable 只有在被訂閱的時候,才會開始發送元素給他的訂閱者,無論他串了多少個 map, filter, flatMap … 等多個 operator 也一樣,那他發出元素的那個起點,要在哪個 Scheduler 執行,這個就是 subscribeOn。

ObserveOn

因為 Functional programming 的特性就是可以串很多個 operator,在串的過程中,我也可以決定要在哪個 Scheduler 執行,這時候就用 observeOn。

用範例來解釋

我們先寫一個 func,它能告訴我現在運行的 Thread 是哪個。

func getThreadName() -> String {
    if Thread.current.isMainThread {
        return "Main Thread"
    } else if let name = Thread.current.name {
        if name == "" {"
            return "Anonymous Thread"
        }
        return name
    } else {
        return "Unknown Thread"
    }
}

接著,我們建立一個 Observable,並且經過一次 map operator,接著訂閱它,我們看看在一般的狀況下,執行結果會是如何

Observable<Int>.create { observer in
    print("subscription code: \(self.getThreadName())")

    observer.onNext(1)
    observer.onCompleted()

    return Disposables.create()
}
.map { number -> Int in
    print("1: \(self.getThreadName()) / \(number)")
    return number * 2
}
.subscribe(onNext: { number in
    print("2: \(self.getThreadName()) / \(number)")
})
.disposed(by: disposeBag)

執行結果

subscription code: Main Thread
1: Main Thread / 1
2: Main Thread / 2

使用 subscribeOn

Observable<Int>.create { observer in
    print("subscription code: \(self.getThreadName())")

    observer.onNext(1)
    observer.onCompleted()

    return Disposables.create()
}
.subscribeOn(ConcurrentDispatchQueueScheduler.init(qos: .background))
.map { number -> Int in
    print("1: \(self.getThreadName()) / \(number)")
    return number * 2
}
.subscribe(onNext: { number in
    print("2: \(self.getThreadName()) / \(number)")
})
.disposed(by: disposeBag)

執行結果

subscription code: Anonymous Thread
1: Anonymous Thread / 1
2: Anonymous Thread / 2

subscribeOn 個人覺得有幾個重點

  1. 它受用範圍就在 create Observable 的部分,也就是在生產元素的部分
  2. subscribeOn() 不限擺放順序,你也可以放在 map 之後,或是 observeOn 之後,但通常會放在 create Observable 之後,畢竟這比較直觀
  3. subscribeOn() 可以寫多次,但只有第一次設定有用,因為在訂閱的那一刻已經決定好了

使用 observeOn

Observable<Int>.create { observer in
    print("subscription code: \(self.getThreadName())")

    observer.onNext(1)
    observer.onCompleted()

    return Disposables.create()
}
.subscribeOn(ConcurrentDispatchQueueScheduler.init(qos: .background))
.observeOn(MainScheduler.instance)
.map { number -> Int in
    print("1: \(self.getThreadName()) / \(number)")
    return number * 2
}
.observeOn(SerialDispatchQueueScheduler.init(qos: .background))
.subscribe(onNext: { number in
    print("2: \(self.getThreadName()) / \(number)")
})
.disposed(by: disposeBag)

observeOn 個人覺得有一個重點

  1. 它受用為執行 observeOn() 之後的範圍,如果之後再次遇到 observeOn() ,也就是切換到下一個所設置的 Scheduler,所以可以切來切去,因為每一個 operator 之後的 Observable 都是新的,你要在哪個 Scheduler 處理就在哪處理的概念

如果沒指定 subscribeOn 或 observeOn 的話 ...?

如果沒指定 subscribeOn,就看當前 Observable create 的產生元素的 Scheduler 在哪就在哪

Observable<Int>.create { observer in
    print("subscription code: \(self.getThreadName())")

    observer.onNext(1)
    observer.onCompleted()

    return Disposables.create()
}
.observeOn(ConcurrentDispatchQueueScheduler.init(qos: .background))
.map { number -> Int in
    print("1: \(self.getThreadName()) / \(number)")
    return number * 2
}
.observeOn(MainScheduler.instance)
.subscribe(onNext: { number in
    print("2: \(self.getThreadName()) / \(number)")
})
.disposed(by: disposeBag)

執行結果

subscription code: Main Thread
1: Anonymous Thread / 1
2: Main Thread / 2

如果沒指定observeOn,就看當前subscribeOn的Scheduler在哪就在哪,也就是說,只要控制subscribeOn的 Scheduler,後續沒特別切換Scheduler,後續的stream都會是在同一個Scheduler處理。


今天講完Scheduler,到此為止,Rx的基礎應該是都帶過了,明天開始希望能帶一些實作,也對應之前所講的總總,就這樣,掰掰


上一篇
第 18 天 - Error Handling Operators (下)
下一篇
第 20 天 - TableView + Rx 與範例(上)
系列文
RxSwift / 30天探索之旅30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言