筆者將整份報(bào)告按四大模塊梳理為:
第一部分,流計(jì)算平臺(tái)的發(fā)展歷程——從2014年到現(xiàn)在,4年多的發(fā)展歷程中,蘇寧經(jīng)歷storm->spark streaming->flink的轉(zhuǎn)變,目前還在轉(zhuǎn)變中。形成storm(4000~虛機(jī)節(jié)點(diǎn)),flink&spark streaming(200+物理節(jié)點(diǎn),on yarn模式)的規(guī)模,同時(shí)介紹了各引擎發(fā)展過程中的問題以及解決路徑;
第二部分,storm及spark streaming的缺點(diǎn),從兼顧吞吐量和延時(shí)、高效的狀態(tài)管理、Exactly-Once的保證及Event-Time等要點(diǎn)闡述了蘇寧選擇flink的理由;
第三部分,蘇寧基于flink框架所做的具體工作。(1)平臺(tái)層功能豐富:sql語法豐富(distinct,流表join),算子自動(dòng)擴(kuò)縮容,connector(mysql, hbase,kafka1.0)以及sink降速(2)工具層:統(tǒng)一日志收集及展示、平臺(tái)層和業(yè)務(wù)層的統(tǒng)一監(jiān)控管理平臺(tái)(3)服務(wù)層:Dlink 一站式開發(fā)平臺(tái);
第四部分是在數(shù)據(jù)集成、機(jī)器學(xué)習(xí)和CEP等方面,談?wù)勌K寧對(duì)未來的展望。
目前,陳豐主要負(fù)責(zé)蘇寧易購集團(tuán)大數(shù)據(jù)流計(jì)算平臺(tái)建設(shè),包括Storm、SparkStreaming、Flink等組件,經(jīng)歷了流計(jì)算從組件化到平臺(tái)服務(wù)化到智能化的發(fā)展過程。對(duì)于大數(shù)據(jù)開源框架有較為豐富的經(jīng)驗(yàn),在分布式計(jì)算架構(gòu)設(shè)計(jì)和系統(tǒng)優(yōu)化方面有自己的思考和領(lǐng)悟:
既然說到前世今生,首先介紹一下流計(jì)算平臺(tái)在蘇寧的整個(gè)發(fā)展歷程,怎么從Storm到目前很火的Flink,以及它的現(xiàn)狀,談?wù)務(wù)w的架構(gòu)以及它的整體集群規(guī)模。2018年上半年,蘇寧把主要精力都投向了Flink。
首先看一下平臺(tái)的發(fā)展歷程。
最早2014年蘇寧上線了第一個(gè)Storm的大屏展示任務(wù),同年Storm整體的孵化平臺(tái)上線。到了2015年因?yàn)閷?duì)于SQL開發(fā)的需求蘇寧還是比較多的,蘇寧自研了一套基于安踏做SQL的平臺(tái)。2016年基于吞吐量的上線,有了spark streaming,同年考慮到性能和流計(jì)算的痛點(diǎn),把目光投向了Flink。到了2018年,F(xiàn)link是蘇寧流計(jì)算基礎(chǔ)平臺(tái)重要的目標(biāo)項(xiàng)目,將業(yè)務(wù)推到Flink上做,比如說Flink的開發(fā)平臺(tái)、管理平臺(tái)等等一系列配套的業(yè)務(wù)上線。
再看流計(jì)算在蘇寧的配套。Storm2014年就用了,整體規(guī)模和占比比較多50%,物理機(jī)1000多,虛擬機(jī)4000+,任務(wù)數(shù)1500+。蘇寧做Flink起步較晚,但調(diào)研時(shí)間比較長,目前占比占到15%,計(jì)劃未來1-2年都會(huì)把流計(jì)算底層平臺(tái)所有的都投入到Flink上。
為什么選擇Flink?從蘇寧業(yè)務(wù)層面來看,首先Storm和2.0的spark streaming都使用的是processing time,它處理的時(shí)間遠(yuǎn)晚于數(shù)據(jù)產(chǎn)生的時(shí)間,產(chǎn)生大量的數(shù)據(jù)再1或2小時(shí)堆積后,數(shù)據(jù)是錯(cuò)誤的,沒辦法接受的。第二個(gè)就是容錯(cuò)能力,Storm只能做到 Exacly once。第三個(gè)就是中間狀態(tài)的維護(hù),Storm維護(hù)不提供state的東西,做中間狀態(tài)的維護(hù)只能依靠第三方來做,那么業(yè)務(wù)開發(fā)的時(shí)候成本相對(duì)高一些,會(huì)寫很多的代碼,效果也不是很好,因?yàn)樗玫谌浇M件的時(shí)候,有可能出現(xiàn)一致性問題,或重啟后計(jì)算結(jié)果不準(zhǔn)確等等。從蘇寧的平臺(tái)來看,兩者都沒有辦法兼顧高吞吐、低延時(shí),兩個(gè)性能互補(bǔ),但不能兼顧。
調(diào)研階段,對(duì)Flink的各個(gè)優(yōu)勢(shì)做過簡單的列表,F(xiàn)link是一個(gè)設(shè)計(jì)的比較優(yōu)雅的流計(jì)算框架,它能兼顧到低延時(shí)和高吞吐,同時(shí)支持Exacly once。
談?wù)勗诠δ軘U(kuò)展、服務(wù)平臺(tái)開發(fā)以及運(yùn)行時(shí)管理系統(tǒng)方面的經(jīng)驗(yàn)分享。
首先說一下功能擴(kuò)展。Flink sql從它出來就比較火,為什么,因?yàn)楹芎唵危琒QL對(duì)于程序員來說非常熟悉,開發(fā)成本非常低,同時(shí)由于SQL是一個(gè)統(tǒng)一的標(biāo)準(zhǔn),它的遷移成本非常低的,如果今天用了subeg SQL,明天出的新的組件,可以非常輕松的遷移到其它的組件上,它是通用的語法。所以蘇寧FlinkSQL上做了一系列的語法擴(kuò)展。另外Connectors,可以打通不同組件的聯(lián)系。
最后結(jié)合業(yè)務(wù)痛點(diǎn),聊一下在運(yùn)行時(shí)的它的算子動(dòng)態(tài)擴(kuò)容縮容,以及Checkpoint動(dòng)態(tài)調(diào)整,我們?cè)趺磳?shí)現(xiàn)怎么把它做出來的。
(1)首先看一下語法擴(kuò)展。因?yàn)槲覀儚腟tormSQL開始就做了純SQL的開發(fā),純SQL開發(fā)起碼要支持DDL和DML,但是Flink社區(qū)明確的講述它不會(huì)做DDL的事情,這件事由我們自己做出來。然后是DML語言,對(duì)于電商領(lǐng)域來說很典型的事情就是統(tǒng)計(jì)UV,對(duì)于這種聚合也做了大量支持,支持on? group by,over? window,group by window。同時(shí)Flink版本有它的局限性,它的流數(shù)據(jù)和靜態(tài)數(shù)據(jù)是沒有辦法去做互相操作的,然后最后說一個(gè)batch window,后面會(huì)具體的說說。
count distinct,這個(gè)當(dāng)時(shí)基于0.003做的,當(dāng)時(shí)社區(qū)沒有提供,我個(gè)人認(rèn)為是由于整體代碼的抽象上的問題,它沒有去做指導(dǎo),只能1.7去實(shí)現(xiàn)了,方式和現(xiàn)代社區(qū)幾乎是基本上一致的。
多介紹一下Approx count distinct,它的目的其實(shí)和count distinct語法是一樣,也是去重復(fù)的結(jié)構(gòu),但是它的目的是用較小的計(jì)算精度的誤差換取巨大的計(jì)算資源的節(jié)省,比如說內(nèi)存。同時(shí)這個(gè)語法符合Calcite標(biāo)準(zhǔn)的,也就是說是通用的語法,我們可以遷移到其它的引擎上。
這邊只能粗略的講,看它怎么工作的。首先一條SQL進(jìn)來進(jìn)入Calcite做語法解析、變換。然后轉(zhuǎn)到Data program的時(shí)候,我們做定制化的基數(shù)和函數(shù),基數(shù)和函數(shù)不擴(kuò)展講了,因?yàn)槠鋵?shí)涉及的算法不少,我們實(shí)現(xiàn)了一系列的基數(shù)的函數(shù),讓用戶選擇相對(duì)應(yīng)的精度,然后對(duì)應(yīng)它的資源消耗,讓用戶自己去做選擇。然后回到我們Data program這一層,進(jìn)而轉(zhuǎn)化成用戶選擇的基礎(chǔ)方程。到了下一層,每條數(shù)據(jù)進(jìn)來的時(shí)候?qū)⑦M(jìn)行累加,輸出時(shí)可以把數(shù)據(jù)向下一層的sink進(jìn)行觸發(fā)計(jì)算,可以提供相對(duì)完美的容錯(cuò)能力。
(2)另外一個(gè)SQL Batch window,這個(gè)是蘇寧特色的一個(gè)名詞,我們看一下它業(yè)務(wù)需求的case,它需要統(tǒng)計(jì)每日PV、UV,我們?cè)诰€計(jì)算要求延時(shí)盡可能低,不可能等到每天結(jié)束的時(shí)候零點(diǎn)再看到結(jié)果,這個(gè)不能接受的。業(yè)務(wù)的需求是每秒都能實(shí)時(shí)的檢測(cè)到PV、UV的變化,這個(gè)從開始到第一秒第二秒第三秒都能看到結(jié)果,這個(gè)是業(yè)務(wù)能夠接受的case,這個(gè)是輸出的頻率,這個(gè)頻率是可以定制化的。直到這個(gè)窗口的結(jié)束,我們的結(jié)果會(huì)被reset,重新開始被計(jì)算,這個(gè)是蘇寧常用的Batch window。
怎么用SQL實(shí)現(xiàn)Batch window?這點(diǎn)不難,但怎么體現(xiàn)到SQL語法,又不能破壞標(biāo)準(zhǔn)的SQL語法呢?滑動(dòng)窗口它是可以做到很短的輸出,但是不能固定窗口,窗口滑動(dòng)到下一秒。第三條就是定制trigger。第四條就是Cascading window,它的窗口是固定的,10點(diǎn)到11點(diǎn),數(shù)據(jù)沒有超出的時(shí)候是不會(huì)被滑動(dòng)的,同時(shí)做到及早的輸出,但是它的問題是每條數(shù)據(jù)都輸出,不能控制輸出頻率的。第二點(diǎn)如果TPS非常高,一秒一萬,一秒十萬DB吃不下,會(huì)造成瓶頸。
所以我們?cè)趺磳?shí)現(xiàn)蘇寧的Batch window?我們用最后講的這個(gè),加上DDL,定義輸出的間隔,然后再使用我們自己實(shí)現(xiàn)的Periodical sink,它主要的目的是把每一條進(jìn)來的數(shù)據(jù)都進(jìn)行緩存,并且能夠根據(jù)輸出的頻率和數(shù)量的閥值進(jìn)行定量的輸出,整個(gè)鏈路進(jìn)行的數(shù)據(jù)都會(huì)觸發(fā)計(jì)算,每個(gè)數(shù)據(jù)出來之后進(jìn)行緩存,舊值被新值覆蓋,直到task輸出的時(shí)候,首先滿足定性定量的輸出,第二個(gè)不會(huì)對(duì)于下層造成太多的壓力,因?yàn)槎c(diǎn)定時(shí)輸出,TBS只有2000左右。這個(gè)時(shí)間我覺得還是有進(jìn)一步擴(kuò)展或者是優(yōu)化的空間,比如說其實(shí)這個(gè)的話只在sink層面解決了需求,如果我們?cè)贐atch window里面不把數(shù)據(jù)進(jìn)行一條條處理,而是進(jìn)行批處理,我覺得計(jì)算的效果效率會(huì)真正提升,這個(gè)可能我們后面會(huì)去做這件事情。
剛才非常簡單的舉了幾個(gè)例子,說了一下SQL的擴(kuò)展。說一下Connectors。這個(gè)我不會(huì)多說,因?yàn)閮?nèi)容不多,我會(huì)舉例子來說一說。
HBase Sink實(shí)現(xiàn)兩種模式,主要是考慮它的容錯(cuò)性,現(xiàn)在不會(huì)只滿足于端到端正的容錯(cuò),我們還希望它能做到Flink和組件之間的容錯(cuò)。于是我們針對(duì)不同的業(yè)務(wù)場景做了冪的插入模式,一種是mini? wbatch,容錯(cuò),有可能會(huì)Failover,要求Failover后業(yè)務(wù)重發(fā)的數(shù)據(jù)與Fail前完全一致,同時(shí)我要求table是單版本的,這么一個(gè)sink。同時(shí)考慮到效率和實(shí)時(shí)性,我們也做了兩種寫入模式,一個(gè)是one by one的同步寫入,效率比較高。還有mini batch,異步寫入的,它的演時(shí)比較高,但是可以做到定時(shí)和定量。
剛才講的是冪的插入模式,現(xiàn)在講非冪等插入模式。Failover后寫HBase結(jié)果與fail之前不同使用的WAL機(jī)制。我們用Checkpoint時(shí),將mini batch寫入外部文件系統(tǒng)。Checkpoint完成,將mini batch寫HBase。
下面來說一說業(yè)務(wù)上也經(jīng)常面對(duì)的這么一個(gè)問題,就是擴(kuò)容縮容的問題。我們看一下流程的分析,首先業(yè)務(wù)開發(fā)兩種模式,一種是寫SQL,還有一種是寫Flink SQL,還有一種是用Datastream API而進(jìn)行開發(fā)。上線之后發(fā)現(xiàn)并行不夠,需要擴(kuò)容,擴(kuò)容的話對(duì)于SQL來說,我能做到的是什么,我可以用原生的Flink提供的去進(jìn)行工作,把鏈路上的節(jié)點(diǎn)都進(jìn)行擴(kuò)容或者縮容,同時(shí)對(duì)于重新打包發(fā)布,然后再去重新上線的那些,這兩種開發(fā)模式都有問題,SQL的開發(fā)面對(duì)雙十一零點(diǎn)的大促,我們需要改代碼,并且還需要有高的延遲,業(yè)務(wù)才能上線,這個(gè)我們不能接受。總體對(duì)于SQL開發(fā)它的擴(kuò)容是任務(wù)級(jí)別的,而對(duì)于Datastream成本太高了。
我們做了Operator,我們一開始考慮是從wrong time考慮這個(gè)事情的。如果說我們要從這一層做的話,首先對(duì)元碼改動(dòng)比較多,第二個(gè)任務(wù)相對(duì)比較復(fù)雜,我們需要重新生成不同的job,同時(shí)還要有我們自己運(yùn)行時(shí)的管理服務(wù)系統(tǒng),我們會(huì)把某個(gè)需要去RESCALE的 job拿過來進(jìn)行修改,再把提交新的JOB graph,做真正擴(kuò)縮容的事情。這邊著重說一下這個(gè)DO RESCALE會(huì)再領(lǐng)任務(wù),資源不會(huì)釋放的,資源部釋放意味著響應(yīng)的時(shí)間非??欤覀円沧鲞^實(shí)驗(yàn),基本上到達(dá)秒級(jí)別甚至百毫秒級(jí)別做到擴(kuò)容縮容,這個(gè)就是我們的解決方案。
剛才介紹了基礎(chǔ)的組建的擴(kuò)展或者優(yōu)化,現(xiàn)在來聊聊平臺(tái)服務(wù)化。首先看一下流計(jì)算平臺(tái)的架構(gòu),從左往右看,這邊是數(shù)據(jù)元,底層進(jìn)來之后有Storm,然后是Flink streaming和spark streaming,上面有我們運(yùn)行時(shí)管理系統(tǒng),主要的作用是對(duì)業(yè)務(wù)進(jìn)行監(jiān)控、運(yùn)維、報(bào)警一系列的事情。再往上一層是自己開發(fā)的一個(gè)開發(fā)者平臺(tái)或者工具層,對(duì)于Storm來說有Storm SQL LIBRO,還有Stream SQL 還有,可視化流程開發(fā),Datastream。再網(wǎng)上就是支持我們的業(yè)務(wù),體育、易購、風(fēng)控、物流、BI等業(yè)務(wù)層。
我是做平臺(tái)的,主要介紹一下平臺(tái)層,也是工具層,下面運(yùn)維的這么一個(gè)系統(tǒng)。
平臺(tái)服務(wù)首先是Stream SQL開發(fā)平臺(tái),還有就是這個(gè)可視化流程開發(fā)平臺(tái)。
我們的Stream SQL是元數(shù)據(jù)處理,通過拖拉拽動(dòng)態(tài)的生成我們的語句,可以支持整個(gè)的流程開發(fā),從編寫到測(cè)試到業(yè)務(wù)上線,都可以這個(gè)平臺(tái)去做,業(yè)務(wù)完全不用寫代碼,直接寫SQL,在上面做就行了。
第二個(gè)可視化流程開發(fā),把功能拽上來,建立它們之間的關(guān)系就可以了,同樣可以做到流程生命周期的事情,都能涵蓋。
最后任務(wù)提交,我們對(duì)于這個(gè)Flink底層的元碼也做了修改,也是覺得它很多的關(guān)于Checkpoint很多的行為要通過代碼體現(xiàn)的,我們覺得這個(gè)非常不靈活,所以我們對(duì)于底層做了相應(yīng)的修改,只需要在提交的時(shí)候?qū)τ谶M(jìn)行配置,就能做到動(dòng)態(tài)的去設(shè)置和修改它的相對(duì)應(yīng)的行為,只要一鍵提交就可以了。
下面看一下運(yùn)行時(shí)管理。運(yùn)行時(shí)管理主要解決了一下這些事情。解決了Flink運(yùn)行時(shí)以及歷史日志的問題,我們做平臺(tái)的時(shí)候,F(xiàn)link的運(yùn)行時(shí)日志可以通過原生的UI看的,但是在使用過程中去做歷史日志就相當(dāng)有問題了,它往往要通過YARN日志查看,所以業(yè)務(wù)用的時(shí)候非常頭痛。針對(duì)這一點(diǎn)我們提供了統(tǒng)一的日志解決方案,同時(shí)還有一些子代的Metric的查詢,我們也弄出來做了統(tǒng)計(jì)和展示。同時(shí)我們也把一些比較重要的事件也從我們的APP里截出來,比如說交互啟停的動(dòng)作做了展示和通集。其次就是剛才描述的運(yùn)行時(shí)的運(yùn)行調(diào)整,比如說調(diào)整Operator并行度,還有在線調(diào)整。最后還有告警。
日志查看,通過任務(wù)名查,也可以通過關(guān)鍵字搜索。Metrics監(jiān)控也是類似的,可以卡時(shí)間范圍,也可以不同維度查詢,并且做了一系列的聚合,為用戶提供相對(duì)有效的信息,提供給用戶比較有用的信息。
對(duì)于事件的接觸我們也做了相對(duì)的統(tǒng)計(jì),左邊可以看到備壓等等一系列事件的統(tǒng)計(jì),我們可以統(tǒng)計(jì)Checkpoint成功率,以及Checkpoint它的打下分布等等一些事情。動(dòng)態(tài)修改Checkpoint并行度。
最后簡單的展望一下未來,2019工作計(jì)劃。首先我們可能會(huì)考慮一下做機(jī)器學(xué)習(xí),據(jù)官方所稱,對(duì)于迭代計(jì)算, Flink應(yīng)該是比spark還要快的,看有沒有辦法實(shí)現(xiàn)流處理的機(jī)器學(xué)習(xí)的算法模型。第二點(diǎn)就是去做通用的數(shù)據(jù)集成,因?yàn)镕link首先實(shí)時(shí)計(jì)算,同時(shí)它也提供了很多sink或souser,把組件連接起來。第三個(gè)就是智能動(dòng)態(tài)擴(kuò)容,現(xiàn)在的擴(kuò)容都是手動(dòng)的,如果有可能的話可以用STM做一些算法。最后一個(gè)是CEP的事情。