摘要:如果到齊了,就可以開始統(tǒng)計出這個時間窗口內的指標了。這種里會遇到兩個難題多個流的速度不一樣,如何判斷一個時間窗口內的都到齊了。
在本文發(fā)出之后不久,老外就寫了一篇類似內容的。人家比我寫得好,推薦大家讀這篇
http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101....
流式統(tǒng)計聽著挺容易的一個事情,說到底不就是數(shù)數(shù)嘛,每個告警系統(tǒng)里基本上都有一個簡單的流式統(tǒng)計模塊。但是當時基于storm做的時候,這幾個問題還是困擾了我很長時間的。沒有用過spark streaming/flink,不知道下面這些問題在spark streaming/flink里是不是都已經(jīng)解決得很好了。
時間窗口切分問題做流式統(tǒng)計首要的問題是把一個時間窗口內的數(shù)據(jù)統(tǒng)計到一起。問題是,什么是時間窗口?有兩種選擇
日志時間(event timestamp)
墻上時間(wall clock)
最簡單的時間窗口統(tǒng)計的是基于“墻上時間”的,每過1分鐘就切分出一個新窗口出來。比如statsd,它的窗口切分就是這樣的。這種基于“墻上時間”的統(tǒng)計有一個非常嚴重的問題是不能回放數(shù)據(jù)流。當數(shù)據(jù)流是實時產(chǎn)生的時候,“墻上時間”的一分鐘也就只會有一分鐘的event被產(chǎn)生出來。但是如果統(tǒng)計的數(shù)據(jù)流是基于歷史event的,那么一分鐘可以產(chǎn)生消費的event數(shù)量只受限于數(shù)據(jù)處理速度。另外event在分布式采集的時候也遇到有快有慢的問題,一分鐘內產(chǎn)生的event未必可以在一分鐘內精確到達統(tǒng)計端,這樣就會因為采集的延遲波動影響統(tǒng)計數(shù)據(jù)的準確性。實際上基于“墻上時間”統(tǒng)計需要
collection latency = wall clock - event timestamp
基于“墻上時間”的統(tǒng)計需要采集延遲非常小,波動也很小才可以工作良好。大部分時候更現(xiàn)實的選擇是需要基于“日志時間”來進行窗口統(tǒng)計的。
使用“日志時間”就會引入數(shù)據(jù)亂序的問題,對于一個實時event stream流,其每個event的timestamp未必是嚴格遞增的。這種亂序有兩種因素引入:
event產(chǎn)生的機器的時鐘不完全同步(NTP有100ms左右的不同步)
event從采集到到達kafka的速度不均衡(不同的網(wǎng)絡線路有快有慢)
我們希望的流式統(tǒng)計是這樣的:
但是實際上數(shù)據(jù)只是基本有序的,也就是在時間窗口的邊緣會有一些event需要跨到另外一個窗口去:
最簡單的分發(fā)event到時間窗口代碼是這樣的
window index = event timestamp / window size
對1分鐘的時間窗口 window size 就是60,timestamp除以60為相同window index的event就是在同一個時間窗口的。問題的關鍵是,什么時候我可以確信這個時間窗口內的event都已經(jīng)到齊了。如果到齊了,就可以開始統(tǒng)計出這個時間窗口內的指標了。然后突然又有一個落后于大伙的event落到這個已經(jīng)被計算過的時間窗口如何處理?
對于大部分統(tǒng)計而言,一個時間窗口統(tǒng)計出多條結果存入db并不是什么大的問題,從db里查詢的時候把多條結果再合并就可以了。
對于一些類型的統(tǒng)計(非monad),比如平均值,時間窗口內的event分為兩批統(tǒng)計出來的結果是沒有辦法被再次匯總的。
實時類的計算對時間敏感,來晚了的數(shù)據(jù)就沒有意義了。比如告警,一個時間窗過去了就沒有必要再理會這個時間窗口了。
所以對于來晚了的數(shù)據(jù)就兩種策略:要么再統(tǒng)計一條結果出來,要么直接丟棄。要確定什么時候一個時間窗口內的event已經(jīng)到齊了,有幾種策略:
sleep 等待一段時間(墻上時間)
event timestamp超過了時間窗口一點點不關閉當前時間窗口,而是要等event timestamp大幅超出時間窗口的時候才關閉窗口。比如12:05:30秒的event到了才關閉12:04:00 ~ 12:05:00的時間窗口。
一兩個event超出了時間窗口不關閉,只有當“大量”的event超出時間窗口才關閉。比如1個event超過12:05分不關閉,如果有100個event超過了12:05的時間窗口就關閉它。
三種策略其實都是“等”,只是等的依據(jù)不同。實踐中,第二種策略也就是根據(jù)“日志時間”的等待是最容易實現(xiàn)的。如果對于過期的event不是丟棄,而是要再次統(tǒng)計一條結果出來,那么過期的窗口要重新打開,又要經(jīng)過一輪“等待”去判斷這個過去的窗口什么時候再被關閉。
在spark上已經(jīng)有人做類似的嘗試了:Building Big Data Operational Intelligence platform with Apache Spark - Eric Carr (Guavus)
多流合并的問題一個kafka的partition就是一個流,一個kafka topic的多個partition就是多個獨立的流(offset彼此獨立增長)。多個kafka topic顯然是多個獨立的流。流式統(tǒng)計經(jīng)常需要把多個流合并統(tǒng)計到一起。這種里會遇到兩個難題
多個流的速度不一樣,如何判斷一個時間窗口內的event都到齊了。如果按照前面的等待策略,可能處理一個流內部的基本有序局部亂序是有效的,但是對于多個流速差異很大的流就無能為力了。一個很快的流很容易把時間窗口往后推得很遠,把其他流遠遠跑到后面。
流速不均不能靠下游兜著,下游的內存是有限的。根本上是需要一種“背壓”的機制,讓下游通知流速過快的上游,你慢點產(chǎn)生新的event,等等其他人。
舉一個具體的例子:
spout 1 emit 12:05 spout 1 emit 12:06 spout 2 emit 12:04 spout 1 emit 12:07 spout 2 emit 12:05 // this is when 12:05 is ready
要想知道12:05這個時間窗的event都到齊了,首先要知道相關的流有幾個(在這例子里是spout1和spout2兩個流),然后要知道什么時候spout1產(chǎn)生了12:05的數(shù)據(jù),什么時候spout2產(chǎn)生了12:05的數(shù)據(jù),最后才可以判斷出來12:05的數(shù)據(jù)是到齊了的。在某個地方要存一份這樣的流速的數(shù)據(jù)去跟蹤,在窗口內數(shù)據(jù)到齊之后發(fā)出信號讓相關的下游往前推動時間窗口。考慮到一個分布式的系統(tǒng),這個跟蹤要放在哪個地方做,怎么去通知所有的相關方。
極端一些的例子
spout 1 emit 13:05 spout 2 emit 12:31 spout 1 emit 13:06 spout 2 emit 12:32
多個流的流速可能會相差到半個小時以上。考慮到如果用歷史的數(shù)據(jù)匯入到實時統(tǒng)計系統(tǒng)里時,很容易因為計算速度不同導致不同節(jié)點之間的處理進度不一致。要計算出正確的結果,下游需要緩存這些差異的半個小時內的所有數(shù)據(jù),這樣很容易爆內存。但是上游如何感知到下游要處理不過來了呢?多個上游之間又如何感知彼此之間的速度差異呢?又有誰來仲裁誰應該流慢一些呢?
一個相對簡單的做法是在整個流式統(tǒng)計的分布式系統(tǒng)里引入一個coordinator的角色。它負責跟蹤不同流的流速,在時間窗口的數(shù)據(jù)到齊之后通知下游flush,在一些上游流速過快的時候(比如最快的流相比最慢的流差距大于10分鐘)由coordinator發(fā)送backoff指令給流速過快的上游,然后接到指令之后sleep一段時間。一段基本堪用的跟蹤不同流流速的代碼:https://gist.github.com/taowen/2d0b3bcc0a4bfaecd404
數(shù)據(jù)一致性問題低檔一些的說法是這樣的。假設統(tǒng)計出來的曲線是這樣的:
如果中間,比如08:35左右重啟了統(tǒng)計程序,那么曲線能否還是連續(xù)的?
高檔一些的說法是,可以把流式統(tǒng)計理解為主數(shù)據(jù)庫與分析數(shù)據(jù)庫之間通過kafka消息隊列進行異步同步。主數(shù)據(jù)庫與分析數(shù)據(jù)庫之間應該保持eventual consistency。
要保證數(shù)據(jù)不重不丟,就要做到生產(chǎn)到kafka的時候,在主數(shù)據(jù)庫和kafka消息隊列之間保持一個事務一致性。舉一個簡單的例子:
用戶下了一個訂單 主數(shù)據(jù)庫里插入了一條訂單的數(shù)據(jù)記錄 kafka消息隊列里多了一條OrderPlaced的event
這個流程中一個問題就是,主數(shù)據(jù)插入成功了之后,可能往kafka消息隊列里enqueue event失敗。如果把這個操作反過來
用戶下了一個訂單 kafka消息隊列里多了一條OrderPlaced的event 主數(shù)據(jù)庫里插入了一條訂單的數(shù)據(jù)記錄
又可能出現(xiàn)kafka消息隊列里enqueue了,但是主數(shù)據(jù)庫插入失敗的情況。就kafka隊列的目前的設計而言,對這個問題是無解的。一旦enqueue的event,除非過期是無法刪除的。
在消費端,當我們從kafka里取出數(shù)據(jù)之后,去更新分析數(shù)據(jù)庫的過程也要保持一個分布式事務的一致性。
取出下一條OrderPlaced evnet(指向的offset+1) 當前時間窗的統(tǒng)計值+1 重復以上過程,直到窗口被關閉,數(shù)據(jù)寫入到分析數(shù)據(jù)庫
kafka的數(shù)據(jù)是可以重放的,只要指定offset就可以把這個offset以及之后的數(shù)據(jù)讀取出來。所謂消費的過程就是把客戶端保存的offset值加1的過程。問題是,這個offset指針保存在哪里的問題。常規(guī)的做法是把消費的offset保存到zookeeper里。那么這就有一個分布式的一致性問題了,zookeeper里offset+1了,但是分析數(shù)據(jù)庫并沒有實際把值統(tǒng)計進去??紤]到統(tǒng)計一般不是每條輸入的event都會更新分析數(shù)據(jù)庫,而是把中間狀態(tài)緩存在內存中的。那么就有可能消費了成千上萬個event,狀態(tài)都在內存里,然后“啪”的一下機器掉電了。如果每次讀取event都移動offset的話,這些event就丟掉了。如果不是每次都移動offset的話,又可能在重啟的時候導致重復統(tǒng)計。
搞統(tǒng)計的人在乎這么一兩條數(shù)據(jù)嗎?其實大部分人是不在乎的。不少團隊壓根連offset都不保存,每次開始統(tǒng)計直接seek到隊列的尾部開始。實時計算嘛,實時最重要了。準確計算?重放歷史?這個讓hadoop搞定就好了。但是如果就是要較這個真呢?或者我們不追求嚴格的強一致,只要求重啟之后曲線不斷開那么難看就好了。
別的流式計算框架不清楚,storm的ack機制是毫無幫助的。
storm的ack機制是基于每個message來做的。這就要求如果做一個每分鐘100萬個event的統(tǒng)計,一分鐘就要跟蹤100萬個message id。就算是100萬個int,也是一筆相當可觀的內存開銷。要知道,從kafka里讀出來的event都是順序offset的,處理也是順序,只要記錄一個offset就可以跟蹤整個流的消費進度了。1個int,相比100萬個int,storm的per message ack的機制對于流式處理的進度跟蹤來說,沒有利用消息處理的有序性(storm根本上假設message之間是彼此獨立處理的),而變得效率低下。
要做到強一致是很困難的,它需要把
更新保存的offset
更新插入分析數(shù)據(jù)庫
變成一個原子事務來完成。大部分分析數(shù)據(jù)庫都沒有原子性事務的能力,連插入三條數(shù)據(jù)都不能保持同時變?yōu)榭梢?,且不說還要用它來記錄offset了。考慮到kafka在生產(chǎn)端都無法提供分布式事務,event從生產(chǎn)出來就不是完全一致的(多產(chǎn)生了或者少產(chǎn)生了),真正高一致的計費場景還是用其他的技術棧。所以值得解決的問題是,如何在重啟之后,把之前重啟的時候丟棄掉的內存狀態(tài)重新恢復出來,使得統(tǒng)計出來的曲線仍然是連續(xù)的。
解決思路有三點:
上游備份策略:重啟的時候重放kafka的歷史數(shù)據(jù),恢復內存狀態(tài)
中間狀態(tài)持久化:把統(tǒng)計的狀態(tài)放到外部的持久的數(shù)據(jù)庫里,不放內存里
同時跑兩份:同時有兩個完全一樣的統(tǒng)計任務,重啟一個,另外一個還能正常運行。
內存狀態(tài)管理的問題做流式統(tǒng)計的有兩種做法:
依賴于外部存儲管理狀態(tài):比如沒收到一個event,就往redis里發(fā)incr增1
純內存統(tǒng)計:在內存里設置一個counter,每收到一個event就+1
基于外部存儲會把整個壓力全部壓到數(shù)據(jù)庫上。一般來說流式統(tǒng)計的流速是很快的,遠大于普通的關系型數(shù)據(jù)庫,甚至可能會超過單臺redis的承載。這就使得基于純內存的統(tǒng)計非常有吸引力。大部分的時候都是在更新時間窗口內的內存狀態(tài),只有當時間窗口關閉的時候才把數(shù)據(jù)刷到分析數(shù)據(jù)庫里去。刷數(shù)據(jù)出去的同時記錄一下當前流消費到的位置(offset)。
這種純內存的狀態(tài)相對來說容易管理一些。計算直接是基于這個內存狀態(tài)做的。如果重啟丟失了,重放一段歷史數(shù)據(jù)就可以重建出來。
但是內存的問題是它總是不夠用的。當統(tǒng)計的維度組合特別多的時候,比如其中某個字段是用戶的id,那么很快這個內存狀態(tài)就會超過單機的內存上限。這種情況有兩種辦法:
利用partition把輸入的input分割,一個流分成多個流,每個統(tǒng)計程序需要跟蹤的維度組合就變少了
把存儲移到外邊去
簡單地在流式統(tǒng)計程序里開關數(shù)據(jù)庫連接是可以解決這個容量問題的:
但是這種對外部數(shù)據(jù)庫使用不小心就會導致兩個問題:
處理速度慢。不用一些批量的操作,數(shù)據(jù)庫操作很快就會變成瓶頸
數(shù)據(jù)庫的狀態(tài)不一直。內存的狀態(tài)重啟了就丟失了,外部的狀態(tài)重啟之后不丟失。重放數(shù)據(jù)流就可能導致數(shù)據(jù)的重復統(tǒng)計
但是這種把窗口統(tǒng)計的中間狀態(tài)落地的好處也是顯而易見的。重啟之后不用通過重算來恢復內存狀態(tài)。如果一個時間窗口有24小時,重算24小時的歷史數(shù)據(jù)可能是很昂貴的操作。
版本跟蹤,批量等都不應該是具體的統(tǒng)計邏輯的實現(xiàn)者的責任。理論上框架應該負責把冷熱數(shù)據(jù)分離,自動把冷數(shù)據(jù)下沉到外部的存儲,以把本地內存空閑出來。同時每次小批量處理event的時候都要記錄處理的offset,而不是要等到窗口關閉等待時候。
數(shù)據(jù)庫狀態(tài)和內存狀態(tài)要變成一個緊密結合的整體??梢园褍烧叩年P系想象成操作系統(tǒng)的filesystem page cache。用mmap把狀態(tài)映射到內存里,由框架負責什么時候把內存里的變更持久化到外部存儲里。
總結基于storm做流式統(tǒng)計缺乏對以下四個基本問題的成熟解決方案。其trident框架可能可以提供一些答案,但是實踐中好像使用的人并不多,資料也太少了??梢员容^自信的說,不僅僅是storm,對于大多數(shù)流式計算平臺都是如此。
時間窗口切分的問題
多流合并的問題
數(shù)據(jù)一致性問題(重啟之后曲線斷開的問題)
內存狀態(tài)管理問題
這些問題要好好解決,還是需要一番功夫的。新一代的流式計算框架比如spark streaming/flink應該有很多改進。即便底層框架提供了支持,從這四個角度去考察一下它們是如何支持的也是非常有裨益的事情。
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://m.hztianpu.com/yun/17474.html
摘要:為了適應流式渲染技術對網(wǎng)絡高吞吐零緩沖的特點,可能需要對現(xiàn)有網(wǎng)絡協(xié)議進行改造主要針對。視頻基于的,視頻在客戶端的播放會相對較為容易。輸入信號各自隔離處理即可,瀏覽器端對常見的輸入信號幾乎都有支持。 本文首發(fā)于我的博客(點此查看),歡迎關注。 流式渲染技術,不同于傳統(tǒng)意義上前端領域的服務端渲染(即 SSR),指的是云端性能強勁的機器進行畫面渲染,將渲染完成的數(shù)據(jù)傳送至客戶端,客戶端只負責...
閱讀 958·2023-04-25 19:40
閱讀 3583·2023-04-25 17:41
閱讀 3067·2021-11-11 11:01
閱讀 2739·2019-08-30 15:55
閱讀 3284·2019-08-30 15:44
閱讀 1427·2019-08-29 14:07
閱讀 534·2019-08-29 11:23
閱讀 1384·2019-08-27 10:54