原Greenplum集群在進行10億*1億的關聯查詢時達到極限,無法支撐更大數據量級的關聯查詢。使用DorisDB替換Greenplum構建新的集群,在進行736億*15億的超大量級數據關聯查詢時,不僅可以順利完成,并且耗時很短,對業務的整體提升巨大。
一、使用背景
1.1選用原因
我司原有業務查詢使用的數據庫為Greenplum,在數據源變更后,數據量從原來的日增千萬級別(近百G)暴增至日增千億(10T)級別,原有的12臺GP集群在數據量增長后存在以下痛點:
1、數據導入
原有的數據導入借助于gpload的工具,在有索引的情況下,數據導入隨著數據量的增加會變慢,在千億級日增情況下,有索引的表根本無法導入。即使使用先導入數據,后建索引的方式,導入過程還是不理想,建索引的時間會由于數據量的增長而增長,由于機器資源在現有的基礎上增加的的可能性不是很大,使用該方式做數據導入,整個流程耗時相當長,無法滿足業務需求。
2、數據存儲
GP在數據存儲這一塊,如果使用heap表的方式創建表,數據來說是不做任何壓縮進行存儲,比較占用存儲資源。如果采用列存表的方式,需要手動指定壓縮等級和字段,但是使用者在不清楚數據重復的具體情況下設置該參數就只能是想當然去做,然后在生產中根據數據實際情況進行更改,在查詢時,cpu會進行解壓縮操作,增加了cpu的計算耗時。
3、數據計算
計算瓶頸其實是我們數據量增長之后主要的痛點。在原有的使用過程中,針對于業務A的整體運行時長,從客戶觸發到最終顯示,需要大概100分鐘左右,數據量增長后,業務A的基本跑不動。其次,在日常的etl過程中,一些定時表關聯在原GP的處理過程中只能是對事實表按照時間粒度做切分,小部分小部分的進行關聯,然后再進行合并處理,數據量增長之后的關聯,在現有資源下也無法實現。
在GP無法承受如此巨大的數據量,滿足不了業務的需求時,我們將目光轉向其他解決方案,在測試了DorisDB,clickhouse以及其他olap產品后,結合自身的業務特點和使用上的易用性,最終選用了DorisDB作為MPP的解決方案。此文檔也是基于DorisDB進行詳細的業務測試過程中整理的文檔。
1.2集群配置
此次測試使用的機器資源如下所示(只部署了DorisDB的環境):
機器數量:10臺
機器系統:centos7.6
機器內存:256G
機器磁盤:7200轉機械硬盤,每臺機器為8T*4,做了raid0
網絡帶寬:內網萬兆光遷
CPU:2*12 core
此次部署的DorisDB的集群詳情如下(未使用spark load,沒有安裝spark的客戶端):
fe數量:3臺(1 master+2 follower)
be數量:10臺
broker數量:10臺
1.3集群配置參數
針對自身業務特點,修改了以下參數:
fe:
broker load的參數
1.允許運行的最大的broker數量
max_broker_concurrency=10
2.每個be處理的數據量
max_bytes_per_broker_scanner=32212254720
上述兩個參數影響broker load導入時be處理數據的并發數量和單個be處理的數據量
be:
文件合并的參數
1.be節點base compaction線程數量
base_compaction_num_threads_per_disk=4
2.base compaction時寫磁盤的限速,單位為M
base_compaction_write_mbytes_per_sec=20
3.be節點cumulative compaction的線程數量
cumulative_compaction_num_threads_per_disk=8
4.be節點cumulative compactiond寫磁盤的限速,單位為M
cumulative_compaction_write_mbytes_per_sec=300
5.be節點cumulative compactiond線程輪詢的間隔
cumulative_compaction_check_interval_seconds=2
上述五個參數主要控制DorisDB對于文件合并的效率,可以根據自身的硬件性能和實際業務情況調整該參數。大量數據導入到DorisDB中時,DorisDB需要根據排序key做排序,根據字段的值做壓縮合并的操作,此時會占用磁盤性能,調整該參數(業務閑時)可以加速這一過程,使DorisDB專注于計算。
以上參數僅提供參考,請根據自身資源和實際情況酌情調整
二、數據導入
由于數據源的特殊性,數據存放在文件中,原始文件為壓縮文件,因此在實際測試過程中,我們主要對以下幾種導入進行了測試(spark load未測試成功),最終選取了broker load的方式作為最終的數據導入的方案。該方案能夠實現單任務200W+/s的導入速度,并且支持并行的方式,進一步提高數據導入速度。
2.1 stream load
剛開始使用DorisDB時,我們使用的導入方式即為stream load的方式測試小批量的數據,但是在數據量增大的情況下,大概數據單次導入到100G時,發現這種微批導入的方式有數據膨脹的情況,導入前的數據和入庫后的數據量對比差異明顯(導入前100G左右,導入后DorisDB在250G左右,并且磁盤IO占用高居不下),遂放棄。
2.2 datax
datax主要是由于有豐富的使用經驗,其次是datax在對于數據接入過程中很靈活,可以增加很多豐富的transformer插件來減輕后續的數據清洗的壓力。datax使用時我們主要使用的是mysqlWriter和利用stream load實現的DorisWriter(此處艾特社區張懷北同學)。前者在我們測試時,DorisDB文檔中還未增加doriswriter的內容,利用的是mysql的jdbc連接實現數據的導入。后者則是社區利用stream load的api實現的數據導入。在使用過程中,發現導入速度并不理想(10臺一起跑,導入速度在60W/s),滿足不了我們每天的增量數據的導入的要求。也放棄datax的導入方案。(如果有對這種方式感興趣的同學可以在社區留言)。文章末尾附件有該writer實現的核心代碼。
2.3 broker load
broker load的導入方式是我們最終采用的方案。原本對spark load的方式抱有很大希望,因為我們業務中的數據另一個導入方向為hbase,使用的導入方式為bulkload的方式,利用spark合成Hfile的方式寫入hbase,該方式能夠將待導入的數據進行排序后,形成hbase底層需要的hfile的格式寫入到hdfs,hbase可以不用再將數據在內存中排序后再落盤,在進行合并形成hfile,能夠借助于spark計算集群減輕hbase排序和文件合并的壓力,使得hbase專注于業務。我們猜想DorisDB的spark load是否也采用了類似的思想,利用spark處理數據后直接生成DorisDB所需要的底層存儲文件后寫入DorisDB,但是在經過咨詢后,現有的spark load不具備這種提前排序生成底層存儲文件的導入功能,但是在未來會開發。后續開發完成后,對于DorisDB的導入應該提升很大(個人臆想_)。
broker load的時候我們測試了分別從hdfs load csv文件和parquet文件,最終發現使用parquet導入比csv性能高出兩倍到三倍的樣子(相同數據條數,字段),也剛好是parquet文件和csv文件實際存儲相差的樣子。同時在導入時,可以先將待導入的表的副本設為1,可以減少導入過程中的數據clone,加快導入速度。最終測的的導入速度大概在(300W/s左右)
2.4 insert into
insert into的方式主要應用于日常關聯后的結果數據導入到新表的操作,為了測試insert into操作的速度以及影響,我們在一個時間段內,連續大批量的導入到另外一張表來發現問題。最終發現,insert into的速度大概在780W/s,但是連續大批量的insert之后,大概連續導入了四批次,每次一百二十億的數據后(insert語句為:insert into tableA select*from tableB),cpu一段時間內占用會比較高,可能是內部數據的合并操作導致的cpu使用上升。
三、數據查詢
下面主要選取了業務測試中比較重要的場景A作為測試,該測試主要測試日常事實和維度之間的關聯性能和面向業務的單表聚合查詢的性能。
3.1表模型選取
此次測試結合實際業務,我們主要測試的是明細表模型。DUPLICATE KEY選用的也是業務上常用來作為過濾條件的字段,采用的是按照天創建動態分區的方式建表,分布鍵根據業務特點的關系,基本上所有的表的分布鍵都是用一個字段。
3.2表創建方式
example:
create table ods.table1(
col1 datetime not null comment"time1",
col2 varchar(128)not null comment"str1",
col3 varchar(64)not null comment"str2",
col4 TINYINT not null comment"0,1,2",
col5 varchar(128)not null comment"str3",
col6 datetime not null comment"time2",
col7 TINYINT not null comment"-1,0,1,2,3"
)
DUPLICATE KEY(col1,col2,col3)
PARTITION BY RANGE(col1)(
PARTITION p20210101 values less THAN("2021-01-02 00:00:00"),
PARTITION p20210102 values less THAN("2021-01-03 00:00:00"),.
PARTITION p20210330 values less THAN("2021-03-31 00:00:00"),
PARTITION p20210331 values less THAN("2021-04-01 00:00:00")
)
DISTRIBUTED BY HASH(col3)BUCKETS 128
PROPERTIES(
"replication_num"="1",
"dynamic_partition.enable"="true",
"dynamic_partition.time_unit"="DAY",
"dynamic_partition.start"="-110",
"dynamic_partition.end"="2",
"dynamic_partition.prefix"="p",
"dynamic_partition.buckets"="128"
);
其余的表的創建方式類似,字段不同。
在后續的join過程中,由于預先沒有給表設置屬性Colocation Group,因此我們使用的alter方式修改每個表的Colocation Group屬性。如下:
ALTER TABLE table1 SET("colocate_with"="cg_col3");
3.3查詢測試
3.3.1關聯測試
1.hash join
默認的join的方式為hash join,會使用JOIN(BROADCAST)的方式
2.shuffle join
在join的后邊顯示的指定[shuffle]的方式,會不采用廣播,而是用shuffle的方式進行join。
如果某些情況下使用默認的join時,右表數據量較大,廣播到多個be節點時會造成不可忽略的性能開銷,或者查詢直接oom導致be掛掉,可以嘗試使用此方式進行查詢優化。
3.colocation join
如果待關聯的兩張表的分布鍵和buckets數量一致,同時join的key是分布鍵,那么可以使用colocation join的方式進行本地join。
由于數據會根據分布鍵進行hash分布,相同分布鍵的數據處于同一個機器上,在join的時候數據只會在本地進行join,避免跨網絡IO。
4.性能對比(全數據join之后的count)(大概的均值)
DorisDB:
左表數量:736億右表數量:15億
默認的join:oom
shuffle join:90S
colocation join:60S
GP(極限是不到十億join不到一億,耗時近1800s):
跑不動!!!
3.3.2單表查詢測試
DorisDB:
group by的字段為DUPLICATE KEY中的部分或者全部字段。單表數據量為736億。
邏輯為:全數據量下的select count(a.col1)as num from(select col1 as col1,col2 as col2 from table1 group by col1,col2)a;去重后的col1數據量為125億
耗時:600s
3.4參數優化
以下參數在使用過程中需要根據實際情況進行具體調整:
1.exec_mem_limit
該參數影響的地方很多,導入,查詢oom時可加大。建議可以設為機器內存資源的70%-80%(只有doris進程情況下)
2.is_report_success
該參數設為true后可以比較方便的查看物理執行計劃
3.parallel_fragment_exec_instance_num
該參數影響查詢時的并行度,建議為機器core數的一半,查詢并發小的情況下可以酌情增加
4.query_timeout
查詢或者insert的超時時間,數據量大的情況下可以增加該參數
5.disable_storage_page_cache
在內存資源充足的情況下,可以開啟page cache,啟用DorisDB自己維護的page cache,加速查詢
6.storage_page_cache_limit
開啟page cache占用的內存大小,酌情設置。
在經過測試后,DorisDB能夠滿足我司替換原有greenplum集群,解決原有業務。
作者:劉志亮,就職于西安某安全公司,負責大數據相關內容,專注于大數據方向研究
行業資訊、企業動態、業界觀點、峰會活動可發送郵件至news#citmt.cn(把#換成@)。
海報生成中...