一、前言
隨著大數據技術的飛速發展,海量數據存儲和計算的解決方案層出不窮,生産環境和大數據環境的交互日益密切。數據倉庫作爲海量數據落地和扭轉的重要載體,承擔著數據從生産環境到大數據環境、經由大數據環境計算處理回饋生産應用或支持決策的重要角色。
數據倉庫的主題覆蓋度、性能、易用性、可擴展性及數據質量都是衡量數據倉庫解決方案好壞的重要指標。攜程機票部門數據倉庫也在不斷摸索向著這些目標砥砺前行。
二、攜程機票數據倉庫技術棧
攜程機票部門的數據倉庫建設主要基于公司公共部門的大數據基礎環境及數據調度平台,輔以部分自運維的開源存儲引擎和基于開源組件二次開發的數據同步工具和運維工具。
2.1 數倉技術演進曆史
機票部門的數據倉庫源于 2008 年,當時生産環境數據落地主要使用 SQLServer,數據倉庫處理的目標數據體量不大,因此選擇的 SQLServer、Informaticas、Kettle 這樣的數據倉庫方案,數據模型設計及報表定制使用 SAP 的商用平台 BO。
隨著機票業務系統的日益複雜,特別是生産環境引入消息中間件 Kafka 存儲日志數據後,這套方案不可擴展性的缺點日趨明顯,SQLServer 的存儲和計算能力很大程度上限制了數倉數據的主題覆蓋度及性能。
在 2014 年,公司公共部門 hadoop 集群部署上線,並且引入了 zeus 調度平台及 DataX 同步工具,各個 BU 的數據倉庫開始逐步轉爲基于 Hive 建設。
隨著生産業務對實時監控、流量回放的需求增強,2016 年機票部門部署了 ElasticSearch,用以實時落地從 Kafka 同步的各個主流程服務日志,並通過統一的交易標識 (transactionID) 串聯用戶的一次完整的搜索、下單等行爲,用于生産排障和流量回放。基于 Hive 的搜索性能一直被廣泛诟病,特別是針對 adhoc 查詢,機票部門在 2016 年調研並部署了 Facebook 開源的基于內存和 Pipeline 的查詢引擎 Presto,在沒有享受到 local 數據獲取的前提下,查詢性能較原生的 Hive 引擎或者 Spark 引擎都有很大的提升。
在 2018 年,爲了支持數倉數據的可視化運營平台,我們先後引入了 ClickHouse 和 CrateDB 作爲後台的存儲和查詢引擎,特別是引入 CrateDB 以後,億級體量的表四個維度的聚合耗時 P90 下降到了 4 秒。
實時數據處理技術也經過了 Esper,Storm,Spark Streaming 和 Flink 的叠代,並慢慢收斂到 Flink。總體的技術演進曆史如圖 1 所示。
2.2 當前技術棧
生産環境的數據可以大致分成三類:
1)業務數據,主要存儲在 MySQL 和 SQLServer,在這些關系型數據庫裏面有數以萬計的表承接著各種生産服務的業務數據寫入;
2)基礎數據,也是存儲在 MySQL 和 SQLServer 中,生産應用時一般會建立一層中心化緩存(如 Redis)或者本地緩存;
3)日志數據,這類數據的特點是”append only”,對已經生成的數據不會有更新的操作,考慮到這類數據的高吞吐量,生産環境一般會用消息隊列 Kafka 暫存;
數據倉庫在實施數據同步時,會根據需求在實時、近實時以及 T+1 天等不同的頻率執行數據同步,並且在大數據環境會用不同的載體承接不同頻率同步過來的數據。在攜程機票,實時同步的目標載體是 ElasticSearch、CrateDB 或者 HBase,近實時(一般 T+1 小時)或者 T+1 天的目標載體是 Hive。
從生産的數據載體來講,主要包括 DB 和消息隊列,他們的數據同步方案主要是:
1)生産 DB 到 Hive 的同步使用 taobao 開源的 DataX,DataX 由網站運營中心 DP 團隊做了很多擴展開發,目前支持了多種數據源之間的數據同步。實時同步的場景主要在 MySQL,使用 DBA 部門使用 Canal 解析並寫入至消息隊列的 bin log。
2)從 Kafka 到 Hive 同步使用 Camus,但是由于 Camus 的性能問題及消費記錄和消費過期較難監控的問題,我們基于 spark-sql-kafka 開發了 hamal,用于新建的 Kafka 到 Hive 的同步;Kafka 實時同步的載體主要是 ElasticSearch 或者 CrateDB,主要通過 Flink 實施。
生産數據被同步數據倉庫後,會在數倉內完成數據清洗、信息整合、聚合計算等數據扭轉流程,最終數據出倉導入到其它載體,這一系列的流程調度由公司 DP 團隊運維的調度平台 Zeus 完成。
2.3 實時 VS 離線
當前機票部門的數據倉庫建設主要基于離線數據,一方面跟 OTA 銷售産品不屬于快消品相關,實時當前並不是剛需;另一方面實時處理場景下需要對計算資源、存儲資源穩定性有更高的要求,保持數據一致性的代價很大。結合兩方面,如果業務對實時需求不高就鋪開做實時數倉,ROI 很難達標。
當然,隨著攜程業務體量的增長,數據使用方對數據實時性要求日益增高,我們團隊在 2020 年也會探索實時數據倉庫的實施方案,並在一兩個重要的數據主題域上先行試點。
三、數據倉庫建設時涉及的共性問題
從團隊職能上來講,數據倉庫團隊需要負責從生産環境同步數據,在內部完成各層級的扭轉計算,參與所有數倉流程及報表的運維,並基于數倉公共數據層和應用數據層數據開發相關應用。
3.1 數據同步
爲了保持數倉數據主題覆蓋足夠全面,我們部門幾乎將所有生産表和 Kafka topics 都同步到了 Hive。以下會對同步最常見的兩種場景 DB->Hive 和 Kafka->Hive 相關的實踐做介紹。
3.1.1 DB 同步到 Hive
特別對生産表到 Hive 的同步,人工配置腳本的方式顯然不能處理數以萬計的表,因此需要一個自動化的同步方案。自動同步方案需要不僅僅要解決自動創建表腳本、創建對應的同步腳本問題,還需要在當表結構發生變更的時候,能夠自動地感知表結構的變化,並且修改表結構和對應的同步腳本。
DB 到 Hive 同步需要依賴兩個數據源,1)Schema 表的元數據信息,簡單地包括各個字段信息、字段類型及主鍵定義;2)統計數據,它主要描述的是這個表在數據産生後有沒有 UPDATE 和 DELETE,這個決定著後續表的分區方式。
對業務型數據,一條數據生成後可能會有 Update,因爲在數倉裏絕大部分場景需要用到數據的最新狀態,所以我們會用一個分區存放所有曆史數據的最新狀態,這類表我們稱之爲曆史切片表。對日志型數據,生産上數據産生後就不會有任何修改,我們會選擇使用增量分區,每個分區會放當天的增量數據。對基礎數據,整個表的數據增加、更新的頻率都非常低,在 ods 層我們會每天全量同步一份到最新數據分區,並且會建立一個無分區的下遊維表,將數據狀態爲有效的數據放到這張下遊無分區維表中方便流程使用。
有了上述這兩個數據源以後,我們會根據 DBA Schema 服務返回的元數據信息生成 Hive 表的腳本,並調度執行生成新的 Hive 表,再依據統計數據決定表的分區方式,進而生成對應新建表的同步腳本。當表創建或者表結構發生變更的時候,通過 Schema 服務兩天輸出的比對,我們會發現表結構的變更並映射到對應 Hive 表結構變更,同時可以改變對應的同步腳本。還有一種思路是可以通過 DB 發布系統的日志,獲知每天 DB 創建、表創建以及表結構變化的增量。
有一個坑點就是生産物理刪除,如果出現了物理刪除並且需要在 Hive 表裏將刪除數據識別並標記出來,當前可能需要通過全量同步的方法(考慮到從生産環境取數的代價,全量同步業務主鍵字段即可)解決,特別對 SQLServer。因此可以跟生産的開發協商盡量使用邏輯刪除,這樣數倉對刪除數據的感知代價會小很多。
3.1.2 Kafka 同步到 Hive
當前我們非實時同步主要在使用 Linkedin 很久以前的一個工具 Camus,當然 DP 團隊經過優化和企業本地化二次開發。但從使用感受來看,Camus 會有如下可能不足的地方:
1)基于 mapreduce,mapreduce 在 yarn 集群上搶占資源的能力較弱,在資源競爭高峰會有同步變慢的情況發生;
2)消費記錄存儲在 HDFS 各個文件裏,這樣對消費記錄的獲取和針對消費過期的監控都很不方便;
3)Kafka Topic 和 Hive 表的血緣關系獲取不方便;
因此,我們基于 spark-sql-kafka 開發 hamal,旨在解決如上痛點並且讓配置更加的簡潔。實現的過程大概包括,spark-sql-kafka 會根據輸入的任務從 Kafka 各個 Partition 消費出 payload 數據,對每條 payload 執行解編碼、解壓、magic code 等操作,此時會將 payload 數據轉化成 json 字符串,這個 json 字符串可以直接作爲一個字段寫入到 Hive 表裏,也可以根據事先配置提取出對應的節點和值作爲列和列值寫入到 Hive 中,甚至可以通過 Json 的 Schema 推斷出 Hive 表結構,並將 Json 各節點對應寫到 Hive 表的各列中。
如果選擇推斷的模式,實現的時候可以使用 sampling 的方式,類似 spark jsonRDD 第二個參數,比如說 0.001,Hamal 可以直接指定采樣數據條數,從 Kafka topic 中拉取出來,通過 jsonRDD 推斷出 StructType,並映射成 Hive 建表語句。對于建好的表,通過表的字段匹配獲取數據,最終寫入 Hive 表,最後會提交消費記錄到一張 Hive 的 ConsumerRecord 表裏面。這樣其實基于這個表,我們既可以獲取 Kafka topic 和 Hive 表的血緣,也可以方便地監控每次同步的數據量。
3.2 數倉分層
分層設計主要參考公司推行的數據規範,將數據倉庫的流程分成了生産鏡像層 (ods)、中間層 (edw)、公共數據層 (cdm) 及應用數據層 (adm)。在中間層對 ods 表做異常數據剔除、NULL 值處理、枚舉值統一等數據清理和綁定維表信息工作,在公共數據層對中間層表進行進一步的整合,豐富表主題的維度和度量,一般以寬表的形式呈現,用以後續的 adhoc 取數、報表。
根據機票本身的業務特點,我們將數據劃分成流量、産量、收益、生産 KPI、業務考核等幾大主題域,對數據表的業務分類和有效管理有重要意義。
3.3 數據解析
數據在同步至數據 ods 層後,産品經常會提的一個需求是將 ods 層某個含報文字段的表按照字段設計展開,如果要支持此類需求,數據開發就需要了解生産上這個表各個字段含義及報文字段的契約定義,而這些對應表的寫入開發非常熟悉。因此,爲了提高整體的工作效率,我們開發了一套數據解析框架,對業務開發封裝了大數據組件的 API 調用及相關參數調整,讓業務開發更高效地完成熟悉的單條數據解析開發。
3.4 數倉運維工具
數據倉庫擁有所有生産表的鏡像表、數以萬計的生産數據同步流程、數據扭轉流程以及後續報表,對如此規模的數倉實體的管理和運維需要一個不斷叠代的系統支持,從而可以大幅度提高數據工程師的效率。
我們根據數倉建設中遇到的一些費力度較高且需要重複做的操作,開發了一套運維工具集合,目前還在持續叠代中。運維工具集功能主要包括數據實體通用搜索,報表收件人批量變更,維表導入,Oncall 錄入,腳本模板生成,序列化與反序列化等等。工具開發難度不大,但對提高效率的幫助很大。
四、數據質量體系
對龐大的數據倉庫實體建設完善的數據質量監控體系,光靠人工 one by one 設置檢驗規則是不夠的,需要對幾乎所有的實體建立相應的監控,並且不能給大數據集群帶來很多額外的計算代價。當這樣的覆蓋面很廣的監控完善後,配合著元數據信息,就有可能在故障的 Root Cause 點第一時間發現故障,並可以清晰地知曉故障的影響範圍以及故障恢複的流程優先級調度。
因此,建立完善的數據質量體系需要完善元數據管理,建立輕量的覆蓋面廣的質量監控,並且對特別重要的流程,需要增加額外的業務相關校驗。
4.1 元數據管理
在生産環境和大數據環境存在多種實體,這些實體包括應用、各類表 (如 SQLServer、MySQL、MongoDB 的表等)、消息隊列 topic、ElasticSearch 的 index、Hive 的表等等,這些實體相互關聯,共同支撐著線上的系統,線下的分析。對這些信息的治理,實體的元數據管理至關重要。
在數倉體系中,元數據主要包含基礎信息、血緣關系以及標簽。基礎信息跟數據表相關,具體包括表的字段、存儲、分區類型等;血緣會涉及到各類的實體,表、流程、報表、郵件推送等,這些實體之間存在著上下遊調用與被調用關系,成體系地管理好這些實體之間的關系,可以清晰地了解到數倉邊界,使得對故障的 Root Cause 追溯以及該 Root Cause 帶來的影響面評估非常便捷。標簽是對實體的分類描述,如層級是屬于哪一層,安全是否有涉密,重要等級,是否有非常重要的流程在上面,業務標簽是屬于訂單、前端還是訂後。
4.2 數據質量相關因素
數據質量的問題其實一般可以在流程執行的日志中看出端倪,因爲人工排查故障的時候,除了常規通過 SQL 查詢驗證表的增量、業務主鍵、某些字段值是否正常,另外一個有效手段就是分析運行日志。
從運行日志中可以獲取以下信息,流程的開始時間、截止時間流程執行時間、完成狀態、每天增量的字節數、增量條數,引擎執行的參數,在用 Spark 或者 MapReduce 執行時消耗資源的情況等等一系列特征。通過對各類計算引擎産生日志的分析,可以獲得各類引擎下記錄日志數據的 pattern,從而提取出相關的特征信息。遇到特殊的流程或者引擎,可以借用其他手段補齊特征數據,如用 SQL,用 Hadoop 的命令。
這是我們簡單的一個日志輸出,第一張是 Spark 的執行日志,下面一張是 MapReduce 的執行日志。
有了數據質量特征提取的邏輯,實時流程異常發現可以如下實施:我們可以將質量特征數據計算分成兩塊,一塊是實時的針對單個流程日志的解析出相關特征,一塊是離線的基于曆史特征數據的統計。我們從消息隊列中消費實時獲取執行完成的流程 id 和 actionid,通過運維團隊提供的詳情日志查詢接口獲取完整日志,通過特征解析邏輯,解析出實時的流程質量相關特征,匹配曆史數據,應用規則。當滿足異常規則,可以通過元數據信息中的血緣判斷影響的範圍,推送告警信息。
五、應用案例
攜程作爲平台方,對機票價格沒有定價權,價格由産品提供方來提供。在每年航班計劃換季的時候,産品提供方會有一小部分概率將價格錄入錯。錯誤的運價,特別是很低的錯誤運價會讓航司或供應商蒙受超大的損失。本著公平交易的原則,攜程作爲銷售平台,做了機票價格監控系統。上線至今,發現了數十起價格異常事件。
在生産的消息隊列中,我們落地了用戶查詢返回的所有航班組合和價格信息,數據倉庫完成近實時同步,將數據解析處理成異常價格相關特征集,每天的增量在百億級別。我們從 Kafka 實時消費兩類日志數據,一類是查詢日志,一類是下單日志,建立匹配,建立規則集發現可疑的低價交易標識,並且進一步監控跟交易標識是否進入下單流程。當某個疑似異常特征帶來的訂單超過一定阈值時,系統會對這疑似異常特征對應的查詢進行自動禁售。
六、小結
一套完整的數據倉庫實施方案應該包括但不局限于上面介紹的數據同步方案、數據存儲方案、數據規範、元數據建設、數據質量體系、運維工具等,每個實施團隊應該根據面臨的實際情況選擇針對每個點的具體技術方案。
攜程機票數據倉庫團隊也正朝著建設全面、規範、易用、高效、精准的數倉路上探索前行,當前在數據同步、數倉數據扭轉以及出倉應用方面的實踐方案還在隨著需求的變化而叠代。接下來,我們團隊會著重在數據倉庫規範徹底落地以及實時數倉實施這些方向上努力。
本文轉載自公衆號攜程技術(ID:ctriptech)。