補充完了必要的知識後,現在再回頭看一下之前遇到的問題吧!
整合完 Firebase 之後發現了兩個問題:
針對問題一,有了之前補充的知識,馬上就能知道這個問題就是 Backpressure 所導致的,所以我們可以使用 RxJava 所提供的 operator 來解決這問題。至於問題二,既然我們無法使用同步的資料,那就完全使用本地資料來更新吧!如此一來在移動便利貼的時候就不會卡了。
所以我們有兩邊的資料需要做更新,一邊是上傳到雲端,其他一起編輯的使用者就可以看到我正在移動便利貼,另一個是本地端,讓本地端的使用者可以做到即時的位置更新。 那如何將一個資料流分流到這二邊呢?沒錯,這邊就會用到上一篇所說的 Multicasting!使用 Multicasting 就能將原本一個 Observable 的資料流,分出來給其他兩個 Observable 做使用,請看以下圖示:
在原本 NoteRepository 的實作中, putNote()
會將所有的資料一個不漏的上傳到 Firebase 中,但是因為這樣會造成塞車,所以我們就可以使用 RxJava 所提供的 operator: throttle
,來減少事件的數量,而throttle
呢是一個很常用來解決 Backpressure 問題的 operator,上圖可以看到原本輸入事件的頻率較高,一條箭頭中有兩個事件(橘色圓點),而在經過 throttle
之後,一個箭頭中減少到只剩一個事件,請注意這邊的數量差距比例並不是重點(比例不會是 2:1),上圖的重點是事件數量有減少。事實上,throttle
的運作機制是以時間來當作丟掉事件的基準,而不是事件的數量。
接著來看到下方的箭頭,這邊的事件數量會跟外面輸入的數量是一樣的,我們將用它來更新本地端的資料。然後這邊上下兩個箭頭呢,都是使用同一個 Observable 來當作事件的來源,只是後面再用不同的方式將源頭轉換成兩個不一樣的 Observable。在這邊我們將採取的方式是使用 Subject 來當作源頭:
private val updatingNoteSubject = BehaviorSubject.createDefault(Optional.empty<Note>())
override fun putNote(note: Note) {
updatingNoteSubject.onNext(Optional.of(note))
}
一個 Subject 可以同時給很多不同的 Observer 給 subscribe,也不會有重複執行同一個 function 的問題( Multicasting 的問題請看上一篇),這邊到目前為止應該沒什麼大問題,但這邊為什麼還要再包一個 Optional
呢?請繼續往下看:
init {
updatingNoteSubject
.throttleLast(300, TimeUnit.MILLISECONDS)
.toIO()
.subscribe { optNote ->
optNote.ifPresent { setNoteDocument(it) }
}
}
private fun setNoteDocument(note: Note) {
val noteData = hashMapOf(
FIELD_TEXT to note.text,
FIELD_COLOR to note.color.color,
FIELD_POSITION_X to note.position.x.toString(),
FIELD_POSITION_Y to note.position.y.toString()
)
firestore.collection(COLLECTION_NOTES)
.document(note.id)
.set(noteData)
}
上傳到 Firebase 的部分,在經過了 throttleLast
之後,事件的數量就減少為每 300 毫秒才有一個,並且將 300 毫秒內的最後一個事件,使用 firestore 的 API 來覆寫資料。
雖然說我們把資料分流了,但是對於 EditorViewModel 來說,他認識的只有 NoteRepository 的 getAllNotes()
所回傳的 Observable ,所以我們還是要將資料整合起來,給 EditorViewModel 當作唯一的資料來源,在 NoteRepository 的實作中所做的任何事情 EditorViewModel 一率都不需要關心。
還記得嗎?Firebase 在更新完資料後,會藉由 SnapshotListener
來通知最新的結果,這邊的結果包含了其他使用者所做的任何操作,像是改變顏色拉,改變其他便利貼的位置等等,所以從 Firebase 獲取資料是一個必要的動作。另外一方面,本地端的更新也是必要的資料,兩邊的任何事件都不能有遺漏。所以這時候就有一個很適合這種情況的 opeartor 就要登場了,它就是 combineLatest
。
// allNotesSubject 是從 firebase 來的資料
override fun getAllNotes(): Observable<List<Note>> {
return Observables.combineLatest(updatingNoteSubject, allNotesSubject)
.map { (optNote, allNotes) ->
optNote.map { note ->
val noteIndex = allNotes.indexOfFirst { it.id == note.id }
allNotes.subList(0, noteIndex) + note + allNotes.subList(noteIndex + 1, allNotes.size)
}.orElseGet { allNotes }
}
}
匯集這兩個地方的資料的時候,會有一種資料不一致的情況:就是正在移動中的便利貼,跟從 Firebase 上面的便利貼,在同一個 id 的情況下他們的位置一定是不一樣的,那這時候要選哪一個呢?答案很明顯的當然是要選擇本地的那一份資料,不然 ViewModel 拿到的就是過去的資料而無法即時拖拉便利貼了,所以上面的程式碼大致上就是在做這一件事。
但是還有一個問題,萬一我已經完成編輯有一段時間了,其他人在拖曳我上次編輯過的便利貼會因為這個機制而無法即時看到更新,因為 updatingNoteSubject
自從上次更新完位置後就沒有改變過資料內容了,現在的這套機永遠會以 updatingNoteSubject
中的內容為優先,就算使用者已經沒有要編輯他了也一樣。
為了解決這個問題,我設計了一個機制,當使用者有一段時間沒有編輯了,就將 updatingNoteSubject
中的內容給清空,如此一來,就可以順利的解決上述的問題了:
init {
updatingNoteSubject
.filter { it.isPresent }
.debounce(300, TimeUnit.MILLISECONDS) // debounce 也是其中一種解決 backpressure 問題的 operator
.subscribe {
updatingNoteSubject.onNext(Optional.empty<Note>())
}
}
上述的說明就是我使用 Optional
的目的,他可以解決我使用 combineLatest
時所遇到的問題。以下是本階段完整的程式碼:
class FirebaseNoteRepository: NoteRepository {
private val firestore = FirebaseFirestore.getInstance()
private val allNotesSubject = BehaviorSubject.create<List<Note>>()
private val updatingNoteSubject = BehaviorSubject.createDefault(Optional.empty<Note>())
private val query = firestore.collection(COLLECTION_NOTES)
.limit(100)
init {
query.addSnapshotListener { result, e ->
result?.let { onSnapshotUpdated(it) }
}
updatingNoteSubject
.throttleLast(300, TimeUnit.MILLISECONDS)
.toIO()
.subscribe { optNote ->
optNote.ifPresent { setNoteDocument(it) }
}
updatingNoteSubject
.filter { it.isPresent }
.debounce(300, TimeUnit.MILLISECONDS)
.subscribe {
updatingNoteSubject.onNext(Optional.empty<Note>())
}
}
override fun getAllNotes(): Observable<List<Note>> {
return Observables.combineLatest(updatingNoteSubject, allNotesSubject)
.map { (optNote, allNotes) ->
optNote.map { note ->
val noteIndex = allNotes.indexOfFirst { it.id == note.id }
allNotes.subList(0, noteIndex) + note + allNotes.subList(noteIndex + 1, allNotes.size)
}.orElseGet { allNotes }
}
}
override fun putNote(note: Note) {
updatingNoteSubject.onNext(Optional.of(note))
}
private fun onSnapshotUpdated(snapshot: QuerySnapshot) {
val allNotes = snapshot
.map { document -> documentToNotes(document) }
allNotesSubject.onNext(allNotes)
}
private fun setNoteDocument(note: Note) {
val noteData = hashMapOf(
FIELD_TEXT to note.text,
FIELD_COLOR to note.color.color,
FIELD_POSITION_X to note.position.x.toString(),
FIELD_POSITION_Y to note.position.y.toString()
)
firestore.collection(COLLECTION_NOTES)
.document(note.id)
.set(noteData)
}
private fun documentToNotes(document: QueryDocumentSnapshot): Note {
val data: Map<String, Any> = document.data
val text = data[FIELD_TEXT] as String
val color = YBColor(data[FIELD_COLOR] as Long)
val positionX = data[FIELD_POSITION_X] as String? ?: "0"
val positionY = data[FIELD_POSITION_Y] as String? ?: "0"
val position = Position(positionX.toFloat(), positionY.toFloat())
return Note(document.id, text, position, color)
}
companion object {
const val COLLECTION_NOTES = "Notes"
const val FIELD_TEXT = "text"
const val FIELD_COLOR = "color"
const val FIELD_POSITION_X = "positionX"
const val FIELD_POSITION_Y = "positionY"
}
}
改完了程式碼之後,再重新建置運行在手機上,就會發現之前的問題已經解決了!
不知道你有沒有發現到,上述的程式碼中沒有用到之前介紹的 Flowable,那這邊為什麼使用 Observable 還是可以用 throttleLast
解決卡頓的問題呢?還有在 throttleLast
之後為什麼型別沒有轉成 Flowable 呢?這個問題留給大家去想想,明天再來解答!