iT邦幫忙

2021 iThome 鐵人賽

DAY 8
1

本篇是用來補充 RxJava 的基礎知識跟 functional programming 的應用,最後將會說明上一篇沒有解釋到的實作細節。

Observable Creation

建立 Observable 的各種 factory method。

Observable.just

非常直覺簡單,看下面的範例就知道怎麼用了:

val a = Observable.just(1, 2, 4)
a.subscribe { number -> println(number) }

// 1 
// 2 
// 4

Observable.create

它的特性是,只有在呼叫 subscribe 的時候,lambda 裡面的內容才會被執行,也就是說是 lazy 的行為。而 emitter 呢,有三個主要會被使用的方法: onNext, onComplete, onError,onNext 可以發出新的資料,呼叫 onComplete 代表這個 Observable 已經結束了,不會有新的內容,onError 則是發出一個例外錯誤,錯誤發生之後也依樣不會有新的內容。

val a = Observable.create<Int> { emitter: ObservableEmitter ->
    println("start")
    emitter.onNext(1)
    emitter.onNext(2)
    emitter.onComplete()
    emitter.onNext(4) // This one never be sent
    println("end")
}
println("before subscribe")
a.subscribe { number -> println(number) }

// before subscribe
// start
// 1
// 2
// end

Basic operation

map

用來做資料轉換

val a = Observable.just(1, 2, 4)
val b = a.map { it * 2 }
b.subscribe { number -> println(number) }

// 2
// 4
// 8

filter

用來做資料過濾

val a = Observable.just(1, 2, 4)
val b = a.filter { it % 2 == 0 }
val c = a.filter { it % 2 != 0 }
println("subscribe b")
b.subscribe { number -> println(number) }
println("subscribe c")
c.subscribe { number -> println(number) }

// subscribe b
// 2
// 4
// subscribe c
// 1

reduce

用來做資料整合。

val a = Observable.just(1, 2, 4)
val b = a.reduce { t1, t2 ->
    println("t1: $t1, t2: $t2")
    t1 * t2
}

b.subscribe { number -> println("result: $number") }

// t1: 1, t2: 2
// t1: 2, t2: 4
// result: 8

scan

與 reduce 類似,但差別是每一次的結果都會再發送出來,但是 reduce 只會發送最後一個

val a = Observable.just(1, 2, 4)
val b = a.scan { t1, t2 ->
    println("t1: $t1, t2: $t2")
    t1 * t2
}

b.subscribe { number -> println("result: $number") }

// result: 1
// t1: 1, t2: 2
// result: 2
// t1: 2, t2: 4
// result: 8

Subject

Subject 一種比較特別的 Observable,他同時是 Observable 也是 Observer,也就是說他可以透過 subscribe 其他 Observable 來獲得資料:

val a = BehaviorSubject.create<Int>()// BehaviorSubject 是其中一種 Subject 的實作
val b = Observable.just(1, 2, 3)

a.subscribe { number -> println("result: $number") }
b.subscribe(a)

// result: 1
// result: 2
// result: 3

在執行 b.subscribe(a) 之後,所有在 b 這個 Observable 中的元素將會傳給 a 這個 Subject ,然後因為 a 在前面已經有被別人 subscribe 了,所以會執行裡面的 println(),其運作機制如下圖所示:

Screen Shot 2021-08-27 at 2.41.33 PM.png

也可以直接透過 Subject 來發送資料,如下面範例:

val a = BehaviorSubject.create<Int>()

a.subscribe { number -> println("result: $number") }

a.onNext(1)
a.onNext(2)
a.onComplete()
a.onNext(4)

// result: 1
// result: 2

三種不同類型的 Subject

  • BehaviorSubject:不管是任何 Observer 在任何時間 subscribe,一開始都會收到最新的一筆資料,跟 LiveData 的行為一樣。
  • PublishSubject:永遠只會收到當下傳送的資料,在 subscribe 之前的任何資料都是不會收到的。
  • ReplaySubject:記得所有傳輸過的資料,每一個新的 Observer 都會完整的收到歷史資料,但是使用上要特別注意,記憶體的量不是無限的!

下面的範例示範了這三個 Subject 的差別,建議大家自己動手做實驗,實際下去玩玩看有助於你理解喔!

val behavior = BehaviorSubject.create<Int>()
val publish = PublishSubject.create<Int>()
val replay = ReplaySubject.create<Int>()

behavior.onNext(1)
publish.onNext(1)
replay.onNext(1)
behavior.onNext(2)
publish.onNext(2)
replay.onNext(2)

behavior.subscribe { number -> println("behavior: $number") }
publish.subscribe { number -> println("publish: $number") }
replay.subscribe { number -> println("replay: $number") }

behavior.onNext(4)
publish.onNext(4)
replay.onNext(4)

// behavior: 2
// replay: 1
// replay: 2
// behavior: 4
// publish: 4
// replay: 4

Merge Observable

Observable 有很多不一樣的組合方式,在這裡只會提到幾個常用的,跟本專案有用到的其中幾個,其他就要靠讀者自己看文件了。

CombineLatest

只要任何一個 Observable 有新的值產生出來,就會被觸發,並且與其他的 Observable 上一次發送出來的值一起被送出來,如下方範例,a 在一開始連續送出了兩個資料,但是由於 b 還沒有任何資料產生,所以 a 的第一個資料在這邊就沒有任何用處。

val a = PublishSubject.create<Int>()
val b = PublishSubject.create<Int>()

Observables.combineLatest(a, b)
    .map { (first, second) ->
        println("first: $first, second: $second")
        first + second
    }
    .subscribe { number -> println("result: $number") }

a.onNext(1)
a.onNext(2)
b.onNext(1)
b.onNext(3)
a.onNext(5)
b.onNext(8)

// first: 2, second: 1
// result: 3
// first: 2, second: 3
// result: 5
// first: 5, second: 3
// result: 8
// first: 5, second: 8
// result: 13

Zip

跟 CombineLatest 用法類似,但是不一樣的是,他是組合出所有的 Observable 發出第一個資料,第二個資料,第三個等等,是一個在意 index 一致時用的 operator。

val a = PublishSubject.create<Int>()
val b = PublishSubject.create<Int>()

Observables.zip(a, b)
    .map { (first, second) ->
        println("first: $first, second: $second")
        first + second
    }
    .subscribe { number -> println("result: $number") }

a.onNext(1) // [1] []
a.onNext(2) // [1, 2] []
b.onNext(1) // [2] [] emit (1, 1)
b.onNext(3) // [] [] emit (2, 3)
a.onNext(5) // [5] []
b.onNext(8) // [] [] emit (5, 8)

// first: 1, second: 1
// result: 2
// first: 2, second: 3
// result: 5
// first: 5, second: 8
// result: 13

withLatestFrom

這個其實算是比較難理解的一個 operator ,但是它很有用!因為他的特性是讓其中一個 Observable 是處於“被動”狀態,所以不會發送太多不想要的資訊,請看下方範例:

val a = PublishSubject.create<Int>()
val b = PublishSubject.create<Int>()

a.withLatestFrom(b) { first, second ->
    println("first: $first, second: $second")
    first + second
}
    .subscribe { number -> println("result: $number") }

a.onNext(1) // b 沒有值,所以不會觸發
a.onNext(2) // b 沒有值,所以不會觸發
b.onNext(1) // b 有值了,但是主動權是在 a 身上,所以一樣不會觸發
a.onNext(3) // b 有值了,由 a 主動觸發,得到 3+1 = 4
a.onNext(5) // b 有值了,由 a 主動觸發,而且會拿到一樣的 b,得到 5+1 = 6
b.onNext(8) // 只是更新 b ,不會有新的資訊產生出來

// first: 3, second: 1
// result: 4
// first: 5, second: 1
// result: 6

Optional

在 Java 中有一個很有用的類別: Optional ,他存在的目的就是為了能夠妥善的處理 null 的這個狀態,而且跟 List 一樣,是一個有泛型行為的類別。在 Functional programming 中,像這種可以使用“泛型”這種形式,而且還可以使用 map , flatmap, filter 這些高階函式的類別,稱之為 Type class 。為什麼 Optional 很有用呢?因為有了它,可以讓我們在忽略 Null 的情況下進行邏輯運算,請看下面的範例:

fun main() {
  val a: String? = null
  val b: String? = "Hihi"
  val optA = Optional.ofNullable(a)
  val optB = Optional.ofNullable(b)

  printOptStringLength(optA)
  printOptStringLength(optB)
}

private fun printOptStringLength(optString: Optional<String>) {
  optString.map { it.length }.ifPresent { length ->
      println("length: $length")
  }
}

a 跟 b 的型別一樣都是 nullable ,如果我現在有個需求是想知道這字串的長度,要怎麼做呢?使用 Kotlin 的作法可能是用 ?. 這種語法來做安全的呼叫,但是使用 Optional 的話,就可以去除掉使用 ?. 這樣的語法,取而代之的是 map

看到這邊,應該有人很想改程式碼了,這樣看起來沒有比較好啊!程式碼反而更長了不是嗎?的確,這個案例不是很需要用到 Optional ,但是為了讓大家做初步的理解,請再看看下面這個範例:

fun main() {
    multiplyNumberAndStringLen(4, null)
    multiplyNumberAndStringLen(null, "Hihi")
    multiplyNumberAndStringLen(4, "Hihi")
}

private fun multiplyNumberAndStringLen(a: Int?, b: String?) {
    val optA = Optional.ofNullable(a)
    val optB = Optional.ofNullable(b)

    optA.flatMap { number -> optB.map { str -> number * str.length } }
        .ifPresent { number ->
            println("Multiply result: $number")
        }
}

// Multiply result: 16

遇到多個 nullable 這種案例的時候,使用 Optional 就會相對簡潔。如果不喜歡 Java 的這個 Optional (因為 gradle 需要做另外的設定 Android Studio 才不會顯示紅字),其實你也可以自己做出一個這樣的 Type class,實作並不難,還有想知道更多關於 Functional programming 的用法的話,可以參考我去年寫的文章:https://ithelp.ithome.com.tw/articles/10240335

BoardViewModel moveNote 的實作

終於補充完所有的知識了,我們再回來看一下 moveNote 的實作是長什麼樣吧!

fun moveNote(noteId: String, delta: Position) {
    Observable.just(Pair(noteId, delta))
        .withLatestFrom(allNotes) { (noteId, delta), notes ->
            val currentNote = notes.find { it.id == noteId }
            Optional.ofNullable(currentNote?.copy(position = currentNote.position + delta))
        }
        .mapOptional { it }
        .subscribe { newNote ->
            noteRepository.putNote(newNote)
        }
        .addTo(disposableBag)
}

這邊使用到了 withLatestFrom ,根據本篇前面的解說,withLatestFrom 中的 Observable 是被動的,這代表什麼呢?

一個主動的便利貼移動事件 + 一個被動的的全部便利貼的最新狀態

這樣不就剛好能組合出最新的便利貼應該要放在什麼位置了嗎?因為我能從全部便利貼 - notes 中,由 Id 找到相對應的便利貼,然後與 delta 的資料結合起來計算出新的位置,最後再丟給 noteRepository 去更新,問題解決!萬一我現在用其他的 operator 會發生什麼事呢?例如 combineLatest 呢?

Observable.combineLatest(Observable.just(Pair(noteId, delta)), allNotes)

如果使用 combineLatest ,因為在 noteRepository 中 putNote 的更新將會觸發 allNote 的更新,所以這個 combineLatest 的版本還會再一次的觸發,因此 putNote 會再執行一次,把新的值藉由塞入 noteRepository ,產生了一個永無止境的無窮迴圈,最後 app 將會因為 ANR 而閃退,所以 withLatestFrom 在這邊就發揮了極大的作用,避免掉了這個無窮迴圈。

而這邊為什麼會用到 Optional 呢?因為 RxJava 不接受任何 Null 的值被傳出去,所以這邊使用 Optional 來做包裝,然後下面的 mapOptional 將會解開這樣的包裝,如果 Optional 裡面的值是空的話,就不會繼續往下走。

NoteRepository putNote 的實作

class InMemoryNoteRepository(): NoteRepository {

    private val notesSubject = BehaviorSubject.create<List<Note>>()
    private val noteMap = ConcurrentHashMap<String, Note>()

    init {
        val initNote = Note.createRandomNote()
        noteMap[initNote.id] = initNote
        notesSubject.onNext(noteMap.elements().toList())
    }

    override fun getAll(): Observable<List<Note>> {
        return notesSubject.hide()
    }

    override fun putNote(newNote: Note) {
        noteMap[newNote.id] = newNote
        notesSubject.onNext(noteMap.elements().toList())
    }
}

相信大家在看到這麼多使用 Subject 的範例後,理解上面的程式碼就不是困難的事了,這邊我就不多加解說了。

小結

今天看到了 merge observable 在這專案中的其中一個應用,如果沒有 withLatestFrom,使用 RP 將會困難重重,一不小心就會造成無窮迴圈,或是因為不熟悉 RP 而多了很多額外變數,讓程式碼變得難以閱讀,如果額外的變數的同時又會造成 side effect。在寫 RP 時,應該盡量避免 side effect,才能好好地確保程式執行的正確性。

補充 side effect(又置入了一篇文章XD): https://ithelp.ithome.com.tw/articles/10236884


上一篇
便利貼中的手勢操作
下一篇
MultiThreading and Custom extension function.
系列文
Jetpack Compose X Android Architecture X Functional Reactive Programming30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言