摘要:所有不相關(guān)的數(shù)據(jù)會立即丟棄,由于查詢都是在一個無限的數(shù)據(jù)流中,這樣的優(yōu)勢顯而易見。基于這些監(jiān)控事件數(shù)據(jù)流,我們想要檢測出可能要過熱的機架,從而調(diào)整負載和降溫。
原文鏈接
正文隨著傳感網(wǎng)絡(luò)的普及,智能設(shè)備持續(xù)收集著越來越多的數(shù)據(jù),分析近乎實時,不斷增長的數(shù)據(jù)流是一個巨大的挑戰(zhàn)??焖賾?yīng)對變化趨勢、交付最新的 BI 應(yīng)用會成為一個公司成敗的關(guān)鍵因素。其中關(guān)鍵問題就是數(shù)據(jù)流的事件模型檢測。
Complex event processing (CEP) 要處理的就是在持續(xù)事件中匹配模式的問題。匹配結(jié)果通常就是:從輸入事件中提取的復(fù)雜事件。傳統(tǒng) DBMSs 在固定數(shù)據(jù)上執(zhí)行查詢,而 CEP 在存儲的 query 上執(zhí)行(譯者注:某個范圍)。所有不相關(guān)的數(shù)據(jù)會立即丟棄,由于 CEP 查詢都是在一個無限的數(shù)據(jù)流中,這樣的優(yōu)勢顯而易見。更重要的是,輸入實時被處理,系統(tǒng)一旦收到某一個序列的所有數(shù)據(jù),結(jié)果就會被輸出。CEP 因此有著非常高效的實時分析能力。
由此,CEP 的處理范式吸引了很多技術(shù)人員興趣,有著廣泛的應(yīng)用場景。值得注意的是,CEP 現(xiàn)在用在了金融應(yīng)用,例如:股票市場趨勢、信用卡欺詐檢測。還有基于 RFID 的追蹤和監(jiān)控,例如:庫房小偷檢測。CEP 還可以被用于基于用戶可疑行為的網(wǎng)絡(luò)入侵檢測。
Apache Flink 有著天生的真正的流處理能力,具有低延遲、高吞吐量的特性,和 CEP 簡直絕配。因此,F(xiàn)link 社區(qū)在 Flink 1.0 引入了第一個版本的 CEP library。接下來我們會使用一個數(shù)據(jù)中心監(jiān)控的案例介紹其使用。
假設(shè)這樣一個場景:數(shù)據(jù)中心有很多機架,每一個機架都有功率和溫度監(jiān)控。監(jiān)控設(shè)備會不斷產(chǎn)生功率和溫度事件。基于這些監(jiān)控事件數(shù)據(jù)流,我們想要檢測出可能要過熱的機架,從而調(diào)整負載和降溫。
針對這種場景,我們采取兩階段處理方法。首先,監(jiān)控溫度事件,當(dāng)檢測到連續(xù)兩個超過閾值的溫度事件,即生成一個當(dāng)前平均溫度的警告(warning),溫度報警不一定意味著過熱。但是如果看到兩個連續(xù)的升溫警告事件,則生成機架過熱報警(alert)。此時,需要采取措施冷卻機架。
首先,定義來源的監(jiān)控事件流,每一個 message 都包含來源 rack ID(機架 ID)。溫度事件包含當(dāng)前溫度,功率事件包含當(dāng)前電壓。我們把事件模型定義為 POJOs.
public abstract class MonitoringEvent { private int rackID; ... } public class TemperatureEvent extends MonitoringEvent { private double temperature; ... } public class PowerEvent extends MonitoringEvent { private double voltage; ... }
我們可以使用 Flink 的 connector(比如:Kafka, RabbitMQ 等),生成 DataStream
每個模式都包含了一個可以定義過濾 (filter) 條件的事件序列。模式 (pattern) 的第一個事件通常都命名為"First Event"。
Pattern.begin("First Event");
這句話會匹配每一個輸入的監(jiān)控事件(monitoring event),而我們只需要溫度大于一定閾值的溫度事件(TemperatureEvents),所以我們需要添加 subtype 和 where 語句限制。
Pattern.begin("First Event") .subtype(TemperatureEvent.class) .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD);
之前說:對于同一個機架,當(dāng)看到兩個連續(xù)的高溫事件(超過閾值)就產(chǎn)生一個溫度報警(TemperatureWarning),Pattern API 提供了 next 調(diào)用方法,來添加事件到模式定義中。next 添加的事件發(fā)生時間必須緊跟著第一個匹配事件之后,才能觸發(fā)整個模式的匹配。
PatternwarningPattern = Pattern. begin("First Event") .subtype(TemperatureEvent.class) .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD) .next("Second Event") .subtype(TemperatureEvent.class) .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD) .within(Time.seconds(10));
最后模式的定義包含有一個 within 的 API 調(diào)用,用來定義兩個連續(xù) TemperatureEvents 必須在 10s 內(nèi)發(fā)生才能匹配。時間基于 time characteristic 設(shè)置,可以是:處理時間、輸入時間或者事件時間。(譯者注 Event Time / Processing Time / Ingestion Time 解釋)
定義好事件模型之后,可以將其應(yīng)用到輸入數(shù)據(jù)流中。
PatternStreamtempPatternStream = CEP.pattern( inputEventStream.keyBy("rackID"), warningPattern);
由于告警是針對單個機架的告警,必須使用 keyBy 通過 rackID 字段對輸入事件流分流。即匹配出的事件都是同一個機架的。
PatternStream
public class TemperatureWarning { private int rackID; private double averageTemperature; ... } DataStreamwarnings = tempPatternStream.select( (Map pattern) -> { TemperatureEvent first = (TemperatureEvent) pattern.get("First Event"); TemperatureEvent second = (TemperatureEvent) pattern.get("Second Event"); return new TemperatureWarning( first.getRackID(), (first.getTemperature() + second.getTemperature()) / 2); } );
現(xiàn)在我們從原始監(jiān)控事件流(monitoring event stream)生成了一個復(fù)雜事件流 DataStream
public class TemperatureAlert { private int rackID; ... }
首先定義報警事件
PatternalertPattern = Pattern. begin("First Event") .next("Second Event") .within(Time.seconds(20));
定義描述了在 20s 內(nèi)有兩個 TemperatureWarnings 事件,并且第一個事件名稱為 "First Event",緊接著的第二個為 “Second Event”。這來了個事件都沒有 where 語句,因為需要訪問兩個事件才能判斷溫度時候增長。因此,下面我們需要在 select 語句中使用 filter 條件來提取。這里我們只是生成了 PatternStream。
PatternStreamalertPatternStream = CEP.pattern( warnings.keyBy("rackID"), alertPattern);
同樣,我們需要 keyBy 對輸入的告警數(shù)據(jù)流針對同一個機架進行分流。然后使用 flatSelect 方法訪問匹配的事件序列,當(dāng)判斷溫度上升時生成 TemperatureAlert 告警。
DataStreamalerts = alertPatternStream.flatSelect( (Map pattern, Collector out) -> { TemperatureWarning first = pattern.get("First Event"); TemperatureWarning second = pattern.get("Second Event"); if (first.getAverageTemperature() < second.getAverageTemperature()) { out.collect(new TemperatureAlert(first.getRackID())); } });
DataStream
本文描述了使用 Flink CEP library 可以很容易處理事件流。我們通過數(shù)據(jù)中心的監(jiān)控和報警案例,完成了服務(wù)器機架過熱報警的小程序。
未來 Flink 社區(qū)會持續(xù)擴展 CEP library 的功能和表述能力。接下來的 road map 是支持類正則表達式的模式實現(xiàn),包括 *, 上下限制和否定。此外,還計劃允許 where 語句訪問之前匹配的事件字段。這個特性可以讓我們提前刪除不需要的事件序列。
本內(nèi)容為譯者添加
官網(wǎng):Apache Flink
概念:Event Time / Processing Time / Ingestion Time
案例:Apache Flink example CEP program to monitor data center temperatures
API 介紹:FlinkCEP - Complex event processing for Flink
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.hztianpu.com/yun/35812.html
摘要:它的設(shè)計使得即使是大型團隊也能以高度隔離的方式應(yīng)對功能變更。獲取數(shù)據(jù)數(shù)據(jù)變更性能,都是讓人頭痛的問題。通過維護組件與數(shù)據(jù)間的依賴在依賴的數(shù)據(jù)就緒前組件不會被渲染為開發(fā)者提供更加可預(yù)測的開發(fā)環(huán)境。這杜絕了隱式的數(shù)據(jù)依賴導(dǎo)致的潛在。 關(guān)于Relay與GraphQL的介紹 原文:Introducing Relay and GraphQL 視頻地址(強烈建議觀看):https://www.y...
摘要:在前面時序列數(shù)據(jù)庫武斗大會之名錄我們已經(jīng)介紹了一些常見的,這里我們再對剩余的一些做些簡單介紹。是一個多租戶的時間序列和資源數(shù)據(jù)庫。是基于的時序列數(shù)據(jù)庫。 【編者按】劉斌,OneAPM后端研發(fā)工程師,擁有10多年編程經(jīng)驗,參與過大型金融、通信以及Android手機操作系的開發(fā),熟悉Linux及后臺開發(fā)技術(shù)。曾參與翻譯過《第一本Docker書》、《GitHub入門與實踐》、《Web應(yīng)用安全...
閱讀 1811·2023-04-26 02:30
閱讀 1130·2021-11-10 11:36
閱讀 1481·2021-10-08 10:14
閱讀 3604·2021-09-28 09:35
閱讀 1618·2021-08-23 09:47
閱讀 2639·2019-08-30 15:56
閱讀 1546·2019-08-30 15:44
閱讀 1883·2019-08-30 13:59