Grab 是一家總部位于新加坡的東南亞網約車和送餐平台公司,業務遍及東南亞大部分地區,爲 8 個國家的 350 多座城市的 1.87 億多用戶提供服務。Grab 當前提供包括網約車、送餐、酒店預訂、網上銀行、移動支付和保險服務。是東南亞的“美團”。Grab Engineering 分享了他們對搜索索引進行優化的方法與心得,InfoQ 中文站翻譯並分享。
當今的應用程序通常使用各種數據庫引擎,每個引擎服務于特定的需求。對于 Grab Deliveries,MySQL 數據庫是用來存儲典型數據格式的,而 Elasticsearch 則提供高級搜索功能。MySQL 是原始數據的主要數據存儲,而 Elasticsearch 是派生存儲。
搜索數據流
對于 MySQL 和 Elasticsearch 之間的數據同步進行了很多工作。本文介紹了如何優化增量搜索數據索引的一系列技術。
背景
從主數據存儲到派生數據存儲的數據同步是由數據同步平台(Data Synchronisation Platform,DSP)Food-Puxian 處理的。就搜索服務而言,它是 MySQL 和 Elasticsearch 之間的數據同步。
當 MySQL 的每一次實時數據更新時觸發數據同步過程,它將向 Kafka 傳遞更新的數據。數據同步平台使用 Kafka 流列表,並在 Elasticsearch 中增量更新相應的搜索索引。此過程也稱爲增量同步。
Kafka 到數據同步平台
利用 Kafka 流,數據同步平台實現增量同步。“流”是一種沒有邊界的、持續更新的數據集,它是有序的、可重放的和容錯的。
利用 Kafaka 的數據同步過程
上圖描述了使用 Kafka 進行數據同步的過程。數據生産器爲 MySQL 上的每一個操作創建一個 Kafka 流,並實時將其發送到 Kafka。數據同步平台爲每個 Kafka 流創建一個流消費器,消費器從各自的 Kafka 流中讀取數據更新,並將其同步到 Elasticsearch。
MySQL 到 Elasticsearch
Elasticsearch 中的索引與 MySQL 表對應。MySQL 的數據存儲在表中,而 Elasticsearch 的數據則存儲在索引中。多個 MySQL 表被連接起來,形成一個 Elasticsearch 索引。以下代碼段展示了 MySQL 和 Elasticsearch 中的實體-關系映射。實體 A 與實體 B 有一對多的關系。實體 A 在 MySQL 中有多個相關的表,即表 A1 和 A2,它們被連接成一個 Elasticsearch 索引 A。
MySQL 和 Elasticsearch 中的 ER 映射
有時,一個搜索索引同時包含實體 A 和實體 B。對于該索引的關鍵字搜索查詢,例如“Burger”,實體 A 和實體 B 中名稱包含“Burger”的對象都會在搜索響應中返回。
原始增量同步
原始 Kafaka 流
在上面所示的 ER 圖中,數據生産器爲每個 MySQL 表都會創建一個 Kafaka 流。每當 MySQL 發生插入、更新或刪除操作時,執行操作之後的數據副本會被發送到其 Kafka 流中。對于每個 Kafaka 流,數據同步平台都會創建不同的流消費器(Stream Consumer),因爲它們具有不同的數據結構。
流消費器基礎設施
流消費器由 3 個組件組成。
- 事件調度器(Event Dispatcher):監聽並從 Kafka 流中獲取事件,將它們推送到事件緩沖區,並啓動一個 goroutine,在事件緩沖區中爲不存在 ID 的每個事件運行事件處理器。
- 事件緩沖區(Event Buffer):事件通過主鍵(aID、bID 等)緩存在內存中。一個事件被緩存在緩沖區中,直到它被一個 goroutine 選中,或者當一個具有相同主鍵的新事件被推入緩沖區時被替換。
- 事件處理器(Event Handler):從事件緩沖區中讀取事件,由事件調度器啓動的 goroutine 會對其進行處理。
流消費器基礎設施
事件緩沖區過程
事件緩沖區由許多子緩沖區組成,每個子緩沖區具有一個唯一的 ID,該 ID 是緩沖區中事件的主鍵。一個子緩沖區的最大尺寸爲 1。這樣,事件緩沖區就可以重複處理緩沖區中具有相同 ID 的事件。
下圖展示了將事件推送到事件緩沖區的過程。在將新事件推送到緩沖區時,將替換共享相同 ID 的舊事件。結果,被替換的事件不會被處理。
將事件推送到事件緩沖區
事件處理器過程
下面的流程圖顯示了由事件處理器執行的程序。其中包括公共處理器流程(白色),以及針對對象 B 事件的附加過程(綠色)。當通過從數據庫中加載的數據創建一個新的 Elasticsearch 文檔時,它會從 Elasticsearch 獲取原始文檔,比較是否有更改字段,並決定是否需要向 Elasticsearch 發送新文檔。
在處理對象 B 事件時,它還根據公共處理器級聯更新到 Elasticsearch 索引中的相關對象 A。我們將這種操作命名爲“級聯更新”(Cascade Update)。
事件處理器執行的過程
原始基礎設施存在的問題
Elasticsearch 索引中的數據可以來自多個 MySQL 表,如下所示。
Elasticsearch 索引中的數據
原始基礎設施存在一些問題。
- 繁重的數據庫負載:消費器從 Kafka 流中讀取數據,將流事件視爲通知,然後使用 ID 從數據庫中加載數據,創建新的 Elasticsearch 文檔。流事件中的數據並沒有得到很好的利用。每次從數據庫加載數據,然後創建新的 Elasticsearch 文檔,都會導致大量的數據庫流量。數據庫成爲一個瓶頸。
- 數據丟失:生産器在應用程序代碼中向 Kafka 發送數據副本。通過 MySQL 命令行工具(command-line tool,CLT)或其他數據庫管理工具進行的數據更改會丟失。
- 與 MySQL 表結構的緊密耦合:如果生産器在 MySQL 中的現有表中添加了一個新的列,並且這個列需要同步到 Elasticsearch,那麽數據同步平台就無法捕捉到這個列的數據變化,直到生産器進行代碼修改並將這個列添加到相關的 Kafka 流中。
- 冗余的 Elasticsearch 更新:Elasticsearch 數據是 MySQL 數據的一個子集。生産器將數據發布到 Kafka 流中,即使對與 Elasticsearch 無關的字段進行了修改。這些與 Elasticsearch 無關的流事件仍會被拾取。
- 重複的級聯更新:考慮一種情況,即搜索索引同時包含對象 A 和對象 B,在很短的時間內對對象 B 産生大量的更新。所有的更新將被級聯到同時包含對象 A 和 B 的索引,這會爲數據庫帶來大量流量。
優化增量同步
MySQL 二進制日志
MySQL 二進制日志(Binlog)是一組日志文件,其中包含對 MySQL 服務器實例進行的數據修改信息。它包含所有更新數據的語句。二進制日志有兩種類型。
- 基于語句的日志記錄:事件包含産生數據更改(插入、更新、刪除)的 SQL 語句。
- 基于行的日志記錄:事件描述了單個行的更改。
Grab Caspian 團隊(Data Tech)構建了一個基于 MySQL 基于行的二進制日志的變更數據捕獲(Change Data Capture,CDC)系統。它能夠捕獲所有 MySQL 表的所有數據修改。
當前 Kafaka 流
二進制日志流事件定義是一種普通的數據結構,包含三個主要字段:Operation、PayloadBefore 和 PayloadAfter。Operation 的枚舉是創建、刪除和更新。Payload 是 JSON 字符串格式的數據。所有二進制日志流都遵循相同的流事件定義。利用二進制日志事件中的 PayloadBefore 和 PayloadAfter,在數據同步平台上對增量同步進行優化成爲可能。
二進制日志流事件主要字段
流消費器優化
事件處理器優化
優化 1
請記住,上面提到過 Elasticsearch 存在冗余更新問題,Elasticsearch 數據是 MySQL 數據的一個子集。第一個優化是通過檢查 PayloadBefore 和 PayloadAfter 之間的不同字段是否位于 Elasticsearch 數據子集中,從而過濾掉無關的流事件。
二進制日志事件中的 Payload 是 JSON 字符串,所以定義了一個數據結構來解析 PayloadBefore 和 PayloadAfter,其中僅包含 Elasticsearch 數據中存在的字段。對比解析後的 Payload,我們很容易知道這個更改是否與 Elasticsearch 相關。
下圖顯示了經過優化的事件處理器流。從藍色流程可以看出,在處理事件時,首先對 PayloadBefore 和 PayloadAfter 進行比較。僅在 PayloadBefore 和 PayloadAfter 之間存在差異時,才處理該事件。因爲無關的事件已經被過濾掉,所以沒有必要從 Elasticsearch 中獲取原始文件。
事件處理器優化 1
成效
- 沒有數據丟失。使用 MySQL CLT 或其他數據庫管理工具進行的更改可以被捕獲。
- 對 MySQL 表的定義沒有依賴性。所有的數據都是 JSON 字符串格式。
- 不存在多余的 Elasticsearch 更新和數據庫讀取。
- Elasticsearch 讀取流量減少 90%。
- 不再需要從 Elasticsearch 獲取原始文檔與新創建的文檔進行比較。
- 過濾掉 55% 的不相關流事件。
- 數據庫負載降低 55%。
針對優化 1 的 Elasticsearch 事件更新
優化 2
事件中的 PayloadAfter 提供了更新的數據。因此,我們開始思考是否需要一種全新的從多個 MySQL 表讀取的 Elasticsearch 文檔。第二個優化是利用二進制日志事件的數據差異,改爲部分更新。
下圖展示了部分更新的事件處理程序流程。如紅色流所示,沒有爲每個事件創建一個新的 Elasticsearch 文檔,而是首先檢查該文檔是否存在。加入文檔存在(大部分時間都存在),則在此事件中更改數據,只要 PayloadBefore 和 PayloadAfter 之間的比較就會更新到現有的 Elasticsearch 文檔。
事件處理器優化 2
成效
- 將大部分 Elasticsearch 相關事件更改爲部分更新:使用流事件中的數據來更新 Elasticsearch。
- Elasticsearch 負載減少:只將 Elasticsearch 發送修改的字段。
- 數據庫負載減少:基于優化 1,數據庫負載減少 80%。
事件緩沖區優化
在把新事件推送到事件緩沖區的時候,我們不會替換舊事件,而會把新事件和舊事件合並。
事件緩沖區中每個子緩沖區的尺寸爲 1。在這種優化中,流事件不再被視爲通知。我們使用事件中的 Payload 來執行部分更新。替換舊事件的舊過程已經不再適用于二進制日志流。
當事件調度器將一個新的事件推送到事件緩沖區的一個非空的子緩沖區時,它會將把子緩沖區中的事件 A 和新的事件 B 合並成一個新的二進制日志事件 C,其 PayloadBefore 來自事件 A,而 PayloadAfter 來自事件 B。
合並事件緩沖區優化的操作
級聯更新優化
優化
我們使用一個新的流來處理級聯更新事件。當生産器發送數據到 Kafka 流時,共享相同 ID 的數據將被存儲在同一個分區上。每一個數據同步平台服務實例只有一個流消費器。在消費器消費 Kafaka 流時,一個分區僅由一個消費器消費。因此,共享相同 ID 的級聯更新事件將由同一個 EC2 實例上的一個流消費器所消費。有了這種特殊的機制,內存中的事件緩沖區能夠重複使用大部分共享相同 ID 的級聯更新事件。
以下流程圖展示了優化後的事件處理程序。綠色顯示的是原始流,而紫色顯示的是當前流,帶有級聯更新事件。在處理對象 B 的事件時,事件處理器不會直接級聯更新相關對象 A,而是發送一個級聯更新事件到新的流。這個新流的消費器將處理級聯更新事件,並將對象 A 的數據同步到 Elasticsearch 中。
帶有級聯更新的事件處理器
成效
- 級聯更新事件消除了 80% 的重複數據。
- 級聯更新引入的數據庫負載減少。
級聯更新事件
總結
本文介紹了四種不同的數據同步平台優化方法。在改用 Coban 團隊提供的 MySQL 二進制日志流並對流消費器進行優化後,數據同步平台節省了約 91% 的數據庫讀取和 90% 的 Elasticsearch 讀取,流消費器處理的流流量的平均查詢次數(Queries Per Second,QPS)從 200 次增加到 800 次。高峰時段的平均查詢次數最大可達到 1000 次以上。隨著平均查詢次數的提高,處理數據的時間和從 MySQL 到 Elasticsearch 的數據同步的延遲都有所減少。經過優化,數據同步平台的數據同步能力得到顯著的提高。