在上一篇中的最後我問了一個問題:“為什麼沒有使用 Flowable 而是繼續用 Observable 來解決 Backpressure?”,不知道大家的心目中有沒有答案了呢?這邊先給大家答案,其實上次的實作是有問題的!
為了模擬我們在 FirebaseNoteRepository
中的實作,照慣例來用簡單的程式碼來做實驗吧!我想做的實驗如下:如果上游持續不斷的發送新資料,在有使用 throttleLast
的情況下,接收方還是有可能會塞車嗎?
val timer = Observable.interval(5, TimeUnit.MILLISECONDS)
timer
.throttleLast(30, TimeUnit.MILLISECONDS)
.doOnNext { println("New item from throttleLast --------- $it") }
.observeOn(Schedulers.io())
.subscribe{
Thread.sleep(1000)
println("New item for subscribe --------- $it")
}
Thread.sleep(10000)
這邊我使用了 Observable 的 interval
來模擬持續不斷發送的事件,每五毫秒就送出一個新事件,接著再對這個 observable 使用 throttleLast
,每 30 毫秒就只留一個事件,並且最後切換執行緒再進行 subscribe 。在 subscribe 這邊,為了模擬長時間的任務,畢竟我們是在寫入網路資料,在每次執行的時候會停止該執行緒一秒鐘。這邊幾乎完整還原了上一篇最後所使用的方式,接下來看看輸出是長怎樣吧!
...
New item from throttleLast --------- 191
New item from throttleLast --------- 197
New item from throttleLast --------- 203
New item for subscribe --------- 5
New item from throttleLast --------- 209
New item from throttleLast --------- 215
New item from throttleLast --------- 221
...
...
...
New item from throttleLast --------- 1805
New item from throttleLast --------- 1811
New item for subscribe --------- 54
New item from throttleLast --------- 1817
New item from throttleLast --------- 1823
New item from throttleLast --------- 1829
從執行結果來看,subscribe 中接收到的事件慢了好幾拍!而且差距只會越來越遠,所以就算使用了 throttleLast
,只要還是 Observable ,就無法解決 Backpressure 的問題!所以只要我們換成 Flowable 就沒問題了嗎?
val timer = Observable.interval(5, TimeUnit.MILLISECONDS)
timer
**.toFlowable(BackpressureStrategy.LATEST)** // 切換為 Flowable
.doOnNext { println("New item from flowable --------- $it") }
.observeOn(Schedulers.io())
.subscribe{
Thread.sleep(1000)
println("New item for subscribe --------- $it")
}
Thread.sleep(10000)
執行結果如下:
New item from flowable --------- 0
New item from flowable --------- 1
New item from flowable --------- 2
New item from flowable --------- 3
New item from flowable --------- 4
...
...
New item from flowable --------- 126
New item from flowable --------- 127
New item for subscribe --------- 0
New item for subscribe --------- 1
New item for subscribe --------- 2
New item for subscribe --------- 3
New item for subscribe --------- 4
New item for subscribe --------- 5
New item for subscribe --------- 6
New item for subscribe --------- 7
New item for subscribe --------- 8
Flowable 執行到 127 就停了,接著 subscribe 這裡才慢慢的一個一個拿出來,所以這裡顯示的資料是 Flowable 的 buffer 給我們的(還記得 Flowable 預設的 BUFFEER_SIZE 是 128 嗎?)。但是,這並不是我們想要的結果!我們預期要拿到的結果是現在此時此刻的最新結果,而不是過去所累積的資料,現在資料很明顯的卡在某個地方!
這時候請看到 observeOn
這邊,點進去實作後我們發現了:
Flowable 的 obserbeOn
還有另一個有 buffer size 的實作!看來我們接近答案了,如果將 buffer size 的數值調成 1 的話會發生什麼事呢?
val timer = Observable.interval(5, TimeUnit.MILLISECONDS)
timer
.toFlowable(BackpressureStrategy.LATEST)
.doOnNext { println("New item from flowable --------- $it") }
**.observeOn(Schedulers.io(), false, 1)** // 使用有 buffer size 的 observeOn
.subscribe{
Thread.sleep(1000)
println("New item for subscribe --------- $it")
}
Thread.sleep(10000)
結果如下:
New item from flowable --------- 0
New item for subscribe --------- 0
New item from flowable --------- 201
New item for subscribe --------- 201
New item from flowable --------- 402
New item for subscribe --------- 402
New item from flowable --------- 603
New item for subscribe --------- 603
New item from flowable --------- 803
New item for subscribe --------- 803
...
看起來很棒! subscribe 拿到的永遠是最新的值了!
有時候我們在解決問題時,這邊試一試,那邊試一試,從網路上複製別人的解法貼到自己的程式碼中,在這過程中有可能就試出了沒問題的版本,雖然不是很懂裡面的運作機制,但是也沒太放在心上就直接 release 了。但是在這時候就有可能悄悄的埋下一顆未爆彈,等到爆炸的時候,沒人知道為什麼爆炸,可能得要花上一整天的時間來 Debug 才能找出原因。
所以要怎麼辦呢?事實上我也犯了很多次這樣的錯,如果專案時間很趕的話我們也沒有時間慢慢研究原理不是嗎?暫時可以動就好拉!沒錯,我們很多時候是要跟時間妥協的,所以在這時候至少養成一個好習慣,就是當你覺得某個程式碼片段你不太確定其中運作的原理的時候,就加個 TODO 註解吧!並且確實的放到 backlog 中,至少大家都會意識到它的存在。
說到這,其實上一篇的程式碼還有另一個問題,我們在設定新資料時所使用的 firebase api 是同步的還是非同步呢?如果是非同步,不就表示還是有可能造成塞車嗎?那我們來回頭看看是怎麼設定新資料的:
private fun setNoteDocument(note: Note) {
val noteData = hashMapOf(
FIELD_TEXT to note.text,
FIELD_COLOR to note.color.color,
FIELD_POSITION_X to note.position.x.toString(),
FIELD_POSITION_Y to note.position.y.toString()
)
firestore.collection(COLLECTION_NOTES)
.document(note.id)
.set(noteData)
}
// Firebase - DocumentReference
/**
* Overwrites the document referred to by this {@code DocumentReference}. If the document does not
* yet exist, it will be created. If a document already exists, it will be overwritten.
*
* @param data The data to write to the document (e.g. a Map or a POJO containing the desired
* document contents).
* @return A Task that will be resolved when the write finishes.
*/
@NonNull
public Task<Void> set(@NonNull Object data) {
return set(data, SetOptions.OVERWRITE);
}
點進去 set 實作後發現,原來這會回傳一個 Task
,那這表示我們使用的是一個非同步的作法,那這時候該怎麼辦呢?而且根據官方文件,他們似乎沒提供同步的作法(我沒有找到,如果讀者有找到的話也請跟我講),同時官方文件也建議了每秒鐘不應該有超過一個寫入的請求:
看起來我們回到了原點了,花了這麼多時間結果才發現用 Flowable 也無法完美解決我們在 firestore 上遇到的問題,不過這是一個很棒的經驗,至少學習到了 Backpressure 更正確的用法,還有更加確定我們的實作是如何運行的,因此在這邊我決定遵循文件的建議,將寫入的頻率調整到每秒鐘一次:
updatingNoteSubject
.throttleLast(1000, TimeUnit.MILLISECONDS) // 每秒只執行寫入一次
.observeOn(AndroidSchedulers.mainThread())
.subscribe { optNote ->
optNote.ifPresent { setNoteDocument(it) }
}
終於完成手勢移動便利貼的這個功能了,但是我們的 App 還有很多功能要做呢!接下來我們將完成:
由於這些最終都會操作到 NoteRepository ,所以在本篇的最後,我們就把所有需要用到的 CRUD 功能都開一開吧!
interface NoteRepository {
fun getAllNotes(): Observable<List<Note>>
fun getNoteById(id: String): Observable<Note>
fun putNote(note: Note)
fun createNote(note: Note)
fun deleteNote(noteId: String)
}
class FirebaseNoteRepository: NoteRepository {
...
...
override fun getNoteById(id: String): Observable<Note> {
return allNotesSubject.map { notes ->
Optional.ofNullable(notes.find { note -> note.id == id })
}.mapOptional { it }
}
override fun createNote(note: Note) {
setNoteDocument(note)
}
override fun deleteNote(noteId: String) {
firestore.collection(COLLECTION_NOTES)
.document(noteId)
.delete()
}
}
由於實作相對容易,就不直逐行解說了,明天將會跟大家一起來完成這些新的功能!