成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專欄INFORMATION COLUMN

Flink實(shí)戰(zhàn)(七) - Time & Windows編程

Meils / 2713人閱讀

摘要:在這種情況下,清除僅指窗口中的數(shù)據(jù)元,而不是窗口元數(shù)據(jù)。紫色圓圈表示流的數(shù)據(jù)元,這些數(shù)據(jù)元由某個(gè)鍵在這種情況下是用戶,用戶和用戶劃分。

0 相關(guān)源碼

掌握Flink中三種常用的Time處理方式,掌握Flink中滾動(dòng)窗口以及滑動(dòng)窗口的使用,了解Flink中的watermark。

Flink 在流處理工程中支持不同的時(shí)間概念。

1 處理時(shí)間(Processing time)

執(zhí)行相應(yīng)算子操作的機(jī)器的系統(tǒng)時(shí)間.

當(dāng)流程序在處理時(shí)間運(yùn)行時(shí),所有基于時(shí)間的 算子操作(如時(shí)間窗口)將使用運(yùn)行相應(yīng)算子的機(jī)器的系統(tǒng)時(shí)鐘。每小時(shí)處理時(shí)間窗口將包括在系統(tǒng)時(shí)鐘指示整個(gè)小時(shí)之間到達(dá)特定算子的所有記錄。

例如,如果應(yīng)用程序在上午9:15開始運(yùn)行,則第一個(gè)每小時(shí)處理時(shí)間窗口將包括在上午9:15到上午10:00之間處理的事件,下一個(gè)窗口將包括在上午10:00到11:00之間處理的事件

處理時(shí)間是最簡單的時(shí)間概念,不需要流和機(jī)器之間的協(xié)調(diào)

它提供最佳性能和最低延遲。但是,在分布式和異步環(huán)境中,處理時(shí)間不提供確定性,因?yàn)樗菀资艿接涗浀竭_(dá)系統(tǒng)的速度(例如從消息隊(duì)列)到記錄在系統(tǒng)內(nèi)的算子之間流動(dòng)的速度的影響。和停電(調(diào)度或其他)。

2 事件時(shí)間(Event time)

每個(gè)多帶帶的事件在其生產(chǎn)設(shè)備上發(fā)生的時(shí)間.

此時(shí)間通常在進(jìn)入Flink之前內(nèi)置在記錄中,并且可以從每個(gè)記錄中提取該事件時(shí)間戳。

在事件時(shí)間,時(shí)間的進(jìn)展取決于數(shù)據(jù),而不是任何掛鐘。

事件時(shí)間程序必須指定如何生成事件時(shí)間水印,這是表示事件時(shí)間進(jìn)度的機(jī)制.

在一個(gè)完美的世界中,事件時(shí)間處理將產(chǎn)生完全一致和確定的結(jié)果,無論事件何時(shí)到達(dá),或者順序.

但是,除非事件已知按順序到達(dá)(按時(shí)間戳),否則事件時(shí)間處理會(huì)在等待無序事件時(shí)產(chǎn)生一些延遲。由于只能等待一段有限的時(shí)間,因此限制了確定性事件時(shí)間應(yīng)用程序的可能性。

假設(shè)所有數(shù)據(jù)都已到達(dá),算子操作將按預(yù)期運(yùn)行,即使在處理無序或延遲事件或重新處理歷史數(shù)據(jù)時(shí)也會(huì)產(chǎn)生正確且一致的結(jié)果。

例如,每小時(shí)事件時(shí)間窗口將包含帶有落入該小時(shí)的事件時(shí)間戳的所有記錄,無論它們到達(dá)的順序如何,或者何時(shí)處理它們。(有關(guān)更多信息,請(qǐng)參閱有關(guān)遲發(fā)事件的部分。)

請(qǐng)注意,有時(shí)當(dāng)事件時(shí)間程序?qū)崟r(shí)處理實(shí)時(shí)數(shù)據(jù)時(shí),它們將使用一些處理時(shí)間 算子操作,以確保它們及時(shí)進(jìn)行。

3 攝取時(shí)間(Ingestion time)

事件進(jìn)入Flink的時(shí)間.

在源算子處,每個(gè)記錄將源的當(dāng)前時(shí)間作為時(shí)間戳,并且基于時(shí)間的算子操作(如時(shí)間窗口)引用該時(shí)間戳。

在概念上位于事件時(shí)間處理時(shí)間之間。

與處理時(shí)間相比 ,它成本稍微高一些,但可以提供更可預(yù)測的結(jié)果。因?yàn)槭褂梅€(wěn)定的時(shí)間戳(在源處分配一次),所以對(duì)記錄的不同窗口 算子操作將引用相同的時(shí)間戳,而在處理時(shí)間中,每個(gè)窗口算子可以將記錄分配給不同的窗口(基于本地系統(tǒng)時(shí)鐘和任何運(yùn)輸延誤)

與事件時(shí)間相比,無法處理任何無序事件或后期數(shù)據(jù),但程序不必指定如何生成水印。

在內(nèi)部,攝取時(shí)間與事件時(shí)間非常相似,但具有自動(dòng)時(shí)間戳分配和自動(dòng)水印生成函數(shù)

4 設(shè)置時(shí)間特性

Flink DataStream程序的第一部分通常設(shè)置基本時(shí)間特性

顯然,在Flink的流式處理環(huán)境中,默認(rèn)使用處理時(shí)間

該設(shè)置定義了數(shù)據(jù)流源的行為方式(例如,它們是否將分配時(shí)間戳),以及窗口 算子操作應(yīng)該使用的時(shí)間概念,比如

KeyedStream.timeWindow(Time.seconds(30))。

以下示例顯示了一個(gè)Flink程序,該程序在每小時(shí)時(shí)間窗口中聚合事件。窗口的行為適應(yīng)時(shí)間特征。

Java

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

// 可選的:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream stream = env.addSource(new FlinkKafkaConsumer09(topic, schema, props));

stream
    .keyBy( (event) -> event.getUser() )
    .timeWindow(Time.hours(1))
    .reduce( (a, b) -> a.add(b) )
    .addSink(...);

Scala

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props))

stream
    .keyBy( _.getUser )
    .timeWindow(Time.hours(1))
    .reduce( (a, b) => a.add(b) )
    .addSink(...)

請(qǐng)注意,為了在事件時(shí)間運(yùn)行此示例,程序需要使用直接為數(shù)據(jù)定義事件時(shí)間的源并自行發(fā)出水印,或者程序必須在源之后注入時(shí)間戳分配器和水印生成器。這些函數(shù)描述了如何訪問事件時(shí)間戳,以及事件流表現(xiàn)出的無序程度。

5 Windows 5.1 簡介

Windows是處理無限流的核心。Windows將流拆分為有限大小的“桶”,我們可以在其上應(yīng)用計(jì)算。我們重點(diǎn)介紹如何在Flink中執(zhí)行窗口,以及程序員如何從其提供的函數(shù)中獲益最大化。

窗口Flink程序的一般結(jié)構(gòu)如下所示

第一個(gè)片段指的是被Keys化流

而第二個(gè)片段指的是非被Keys化流

正如所看到的,唯一的區(qū)別是keyBy(...)呼吁Keys流和window(...)成為windowAll(...)非被Key化的數(shù)據(jù)流。這也將作為頁面其余部分的路線圖。

Keyed Windows

Non-Keyed Windows

在上面,方括號(hào)(...)中的命令是可選的。這表明Flink允許您以多種不同方式自定義窗口邏輯,以便最適合您的需求。

5.2 窗口生命周期

簡而言之,只要應(yīng)該屬于此窗口的第一個(gè)數(shù)據(jù)元到達(dá),就會(huì)創(chuàng)建一個(gè)窗口,當(dāng)時(shí)間(事件或處理時(shí)間)超過其結(jié)束時(shí)間戳加上用戶指定 時(shí),窗口將被完全刪除allowed lateness(請(qǐng)參閱允許的延遲))。Flink保證僅刪除基于時(shí)間的窗口而不是其他類型,例如全局窗口(請(qǐng)參閱窗口分配器)。例如,使用基于事件時(shí)間的窗口策略,每5分鐘創(chuàng)建一個(gè)非重疊(或翻滾)的窗口,并允許延遲1分鐘,F(xiàn)link將創(chuàng)建一個(gè)新窗口,用于間隔12:00和12:05當(dāng)具有落入此間隔的時(shí)間戳的第一個(gè)數(shù)據(jù)元到達(dá)時(shí),當(dāng)水印通過12:06 時(shí)間戳?xí)r它將刪除它。

此外,每個(gè)窗口將具有Trigger和一個(gè)函數(shù)(ProcessWindowFunction,ReduceFunction, AggregateFunction或FoldFunction)連接到它。該函數(shù)將包含要應(yīng)用于窗口內(nèi)容的計(jì)算,而Trigger指定窗口被認(rèn)為準(zhǔn)備好應(yīng)用該函數(shù)的條件。

觸發(fā)策略可能類似于“當(dāng)窗口中的數(shù)據(jù)元數(shù)量大于4”時(shí),或“當(dāng)水印通過窗口結(jié)束時(shí)”。

觸發(fā)器還可以決定在創(chuàng)建和刪除之間的任何時(shí)間清除窗口的內(nèi)容。在這種情況下,清除僅指窗口中的數(shù)據(jù)元,而不是窗口元數(shù)據(jù)。這意味著仍然可以將新數(shù)據(jù)添加到該窗口。

除了上述內(nèi)容之外,您還可以指定一個(gè)Evictor,它可以在觸發(fā)器觸發(fā)后以及應(yīng)用函數(shù)之前和/或之后從窗口中刪除數(shù)據(jù)元。

5.3 被Keys化與非被Keys化Windows

要指定的第一件事是您的流是否應(yīng)該鍵入。必須在定義窗口之前完成此 算子操作。使用the keyBy(...)將您的無限流分成邏輯被Key化的數(shù)據(jù)流。如果keyBy(...)未調(diào)用,則表示您的流不是被Keys化的。

對(duì)于被Key化的數(shù)據(jù)流,可以將傳入事件的任何屬性用作鍵(此處有更多詳細(xì)信息)。擁有被Key化的數(shù)據(jù)流將允許您的窗口計(jì)算由多個(gè)任務(wù)并行執(zhí)行,因?yàn)槊總€(gè)邏輯被Key化的數(shù)據(jù)流可以獨(dú)立于其余任務(wù)進(jìn)行處理。引用相同Keys的所有數(shù)據(jù)元將被發(fā)送到同一個(gè)并行任務(wù)。

在非被Key化的數(shù)據(jù)流的情況下,您的原始流將不會(huì)被拆分為多個(gè)邏輯流,并且所有窗口邏輯將由單個(gè)任務(wù)執(zhí)行,即并行度為1。

6 窗口分配器

指定流是否已鍵入后,下一步是定義一個(gè)窗口分配器.

窗口分配器定義如何將數(shù)據(jù)元分配給窗口,這是通過WindowAssigner 在window(...)(對(duì)于被Keys化流)或windowAll()(對(duì)于非被Keys化流)調(diào)用中指定您的選擇來完成的

WindowAssigner負(fù)責(zé)將每個(gè)傳入數(shù)據(jù)元分配給一個(gè)或多個(gè)窗口

Flink帶有預(yù)定義的窗口分配器,用于最常見的用例,即

滾動(dòng)窗口

滑動(dòng)窗口

會(huì)話窗口

全局窗口

還可以通過擴(kuò)展WindowAssigner類來實(shí)現(xiàn)自定義窗口分配器。所有內(nèi)置窗口分配器(全局窗口除外)都根據(jù)時(shí)間為窗口分配數(shù)據(jù)元,這可以是處理時(shí)間或事件時(shí)間。請(qǐng)查看我們關(guān)于活動(dòng)時(shí)間的部分,了解處理時(shí)間和事件時(shí)間之間的差異以及時(shí)間戳和水印的生成方式。

基于時(shí)間的窗口具有開始時(shí)間戳(包括)和結(jié)束時(shí)間戳(不包括),它們一起描述窗口的大小。

在代碼中,F(xiàn)link在使用TimeWindow基于時(shí)間的窗口時(shí)使用,該窗口具有查詢開始和結(jié)束時(shí)間戳的方法maxTimestamp()返回給定窗口的最大允許時(shí)間戳

下圖顯示了每個(gè)分配者的工作情況。紫色圓圈表示流的數(shù)據(jù)元,這些數(shù)據(jù)元由某個(gè)鍵(在這種情況下是用戶1,用戶2和用戶3)劃分。x軸顯示時(shí)間的進(jìn)度。

6.1 滾動(dòng)窗口

一個(gè)滾動(dòng)窗口分配器的每個(gè)數(shù)據(jù)元分配給指定的窗口的窗口大小。滾動(dòng)窗口具有固定的尺寸,不重疊.

例如,如果指定大小為5分鐘的翻滾窗口,則將評(píng)估當(dāng)前窗口,并且每五分鐘將啟動(dòng)一個(gè)新窗口,如下圖所示

以下代碼段顯示了如何使用滾動(dòng)窗口。

Java

DataStream input = ...;

// tumbling event-time windows
input
    .keyBy()
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .();

// tumbling processing-time windows
input
    .keyBy()
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .();

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy()
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .();

Scala

val input: DataStream[T] = ...

// tumbling event-time windows
input
    .keyBy()
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .()

// tumbling processing-time windows
input
    .keyBy()
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .()

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy()
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .()

Scala

Java

6.2 滑動(dòng)窗口

該滑動(dòng)窗口分配器分配元件以固定長度的窗口。與滾動(dòng)窗口分配器類似,窗口大小由窗口大小參數(shù)配置

附加的窗口滑動(dòng)參數(shù)控制滑動(dòng)窗口的啟動(dòng)頻率。因此,如果幻燈片小于窗口大小,則滑動(dòng)窗口可以重疊。在這種情況下,數(shù)據(jù)元被分配給多個(gè)窗口。

例如,您可以將大小為10分鐘的窗口滑動(dòng)5分鐘。有了這個(gè),你每隔5分鐘就會(huì)得到一個(gè)窗口,其中包含過去10分鐘內(nèi)到達(dá)的事件,如下圖所示。

以下代碼段顯示了如何使用滑動(dòng)窗口

Java

DataStream input = ...;

// 滑動(dòng) 事件時(shí)間 窗口
input
    .keyBy()
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .();

//  滑動(dòng) 處理時(shí)間 窗口
input
    .keyBy()
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .();

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy()
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .();

Scala

val input: DataStream[T] = ...

// tumbling event-time windows
input
    .keyBy()
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .()

// tumbling processing-time windows
input
    .keyBy()
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .()

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy()
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .()
7 窗口函數(shù)

定義窗口分配器后,我們需要指定要在每個(gè)窗口上執(zhí)行的計(jì)算。這是窗口函數(shù)的職責(zé),窗口函數(shù)用于在系統(tǒng)確定窗口準(zhǔn)備好進(jìn)行處理后處理每個(gè)(可能是被Keys化的)窗口的數(shù)據(jù)元

的窗函數(shù)可以是一個(gè)ReduceFunction,AggregateFunction,F(xiàn)oldFunction或ProcessWindowFunction。前兩個(gè)可以更有效地執(zhí)行,因?yàn)镕link可以在每個(gè)窗口到達(dá)時(shí)遞增地聚合它們的數(shù)據(jù)元.

ProcessWindowFunction獲取Iterable窗口中包含的所有數(shù)據(jù)元以及有關(guān)數(shù)據(jù)元所屬窗口的其他元信息。

具有ProcessWindowFunction的窗口轉(zhuǎn)換不能像其他情況一樣有效地執(zhí)行,因?yàn)镕link必須在調(diào)用函數(shù)之前在內(nèi)部緩沖窗口的所有數(shù)據(jù)元。這可以通過組合來減輕ProcessWindowFunction與ReduceFunction,AggregateFunction或FoldFunction以獲得兩個(gè)窗口元件的增量聚合并且該附加元數(shù)據(jù)窗口 ProcessWindowFunction接收。我們將查看每個(gè)變體的示例。

7.1 ReduceFunction

指定如何組合輸入中的兩個(gè)數(shù)據(jù)元以生成相同類型的輸出數(shù)據(jù)元.

Flink使用ReduceFunction來遞增地聚合窗口的數(shù)據(jù)元.

定義和使用

Java

DataStream> input = ...;

input
    .keyBy()
    .window()
    .reduce(new ReduceFunction> {
      public Tuple2 reduce(Tuple2 v1, Tuple2 v2) {
        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
      }
    });

Scala

val input: DataStream[(String, Long)] = ...

input
    .keyBy()
    .window()
    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }

原來傳遞進(jìn)來的數(shù)據(jù)是字符串,此處我們就使用數(shù)值類型,通過數(shù)值類型來演示增量的效果

這里不是等待窗口所有的數(shù)據(jù)進(jìn)行一次性處理,而是數(shù)據(jù)兩兩處理

輸入

增量輸出

Java

7.2 聚合函數(shù)An AggregateFunction是一個(gè)通用版本,ReduceFunction它有三種類型:輸入類型(IN),累加器類型(ACC)和輸出類型(OUT)。輸入類型是輸入流中數(shù)據(jù)元的類型,并且AggregateFunction具有將一個(gè)輸入數(shù)據(jù)元添加到累加器的方法。該接口還具有用于創(chuàng)建初始累加器的方法,用于將兩個(gè)累加器合并到一個(gè)累加器中以及用于OUT從累加器提取輸出(類型)。我們將在下面的示例中看到它的工作原理。

與之相同ReduceFunction,F(xiàn)link將在窗口到達(dá)時(shí)遞增地聚合窗口的輸入數(shù)據(jù)元。

一個(gè)AggregateFunction可以被定義并這樣使用:

/**
 * The accumulator is used to keep a running sum and a count. The {@code getResult} method
 * computes the average.
 */
private static class AverageAggregate
    implements AggregateFunction, Tuple2, Double> {
  @Override
  public Tuple2 createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  @Override
  public Tuple2 add(Tuple2 value, Tuple2 accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2 accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2 merge(Tuple2 a, Tuple2 b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

DataStream> input = ...;

input
    .keyBy()
    .window()
    .aggregate(new AverageAggregate());

Scala

The accumulator is used to keep a running sum and a count. The [getResult] method
 * computes the average.
 */
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
  override def createAccumulator() = (0L, 0L)

  override def add(value: (String, Long), accumulator: (Long, Long)) =
    (accumulator.\_1 + value.\_2, accumulator.\_2 + 1L)

  override def getResult(accumulator: (Long, Long)) = accumulator.\_1 / accumulator.\_2

  override def merge(a: (Long, Long), b: (Long, Long)) =
    (a.\_1 + b.\_1, a.\_2 + b.\_2)
}

val input: DataStream[(String, Long)] = ...

input
    .keyBy()
    .window()
    .aggregate(new AverageAggregate)
7.3 ProcessWindowFunction

ProcessWindowFunction獲取包含窗口的所有數(shù)據(jù)元的Iterable,以及可訪問時(shí)間和狀態(tài)信息的Context對(duì)象,這使其能夠提供比其他窗口函數(shù)更多的靈活性。這是以性能和資源消耗為代價(jià)的,因?yàn)閿?shù)據(jù)元不能以遞增方式聚合,而是需要在內(nèi)部進(jìn)行緩沖,直到窗口被認(rèn)為已準(zhǔn)備好進(jìn)行處理。

ProcessWindowFunction外觀簽名如下:

public abstract class ProcessWindowFunction implements Function {

    /**
     * Evaluates the window and outputs none or several elements.
     *
     * @param key The key for which this window is evaluated.
     * @param context The context in which the window is being evaluated.
     * @param elements The elements in the window being evaluated.
     * @param out A collector for emitting elements.
     *
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    public abstract void process(
            KEY key,
            Context context,
            Iterable elements,
            Collector out) throws Exception;

       /**
        * The context holding window metadata.
        */
       public abstract class Context implements java.io.Serializable {
           /**
            * Returns the window that is being evaluated.
            */
           public abstract W window();

           /** Returns the current processing time. */
           public abstract long currentProcessingTime();

           /** Returns the current event-time watermark. */
           public abstract long currentWatermark();

           /**
            * State accessor for per-key and per-window state.
            *
            * 

NOTE:If you use per-window state you have to ensure that you clean it up * by implementing {@link ProcessWindowFunction#clear(Context)}. */ public abstract KeyedStateStore windowState(); /** * State accessor for per-key global state. */ public abstract KeyedStateStore globalState(); } }

abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {

  /**
    * Evaluates the window and outputs none or several elements.
    *
    * @param key      The key for which this window is evaluated.
    * @param context  The context in which the window is being evaluated.
    * @param elements The elements in the window being evaluated.
    * @param out      A collector for emitting elements.
    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    */
  def process(
      key: KEY,
      context: Context,
      elements: Iterable[IN],
      out: Collector[OUT])

  /**
    * The context holding window metadata
    */
  abstract class Context {
    /**
      * Returns the window that is being evaluated.
      */
    def window: W

    /**
      * Returns the current processing time.
      */
    def currentProcessingTime: Long

    /**
      * Returns the current event-time watermark.
      */
    def currentWatermark: Long

    /**
      * State accessor for per-key and per-window state.
      */
    def windowState: KeyedStateStore

    /**
      * State accessor for per-key global state.
      */
    def globalState: KeyedStateStore
  }

}

該key參數(shù)是通過KeySelector為keyBy()調(diào)用指定的Keys提取的Keys。在元組索引鍵或字符串字段引用的情況下,此鍵類型始終是Tuple,您必須手動(dòng)將其轉(zhuǎn)換為正確大小的元組以提取鍵字段。

A ProcessWindowFunction可以像這樣定義和使用:

DataStream> input = ...;

input
  .keyBy(t -> t.f0)
  .timeWindow(Time.minutes(5))
  .process(new MyProcessWindowFunction());

/* ... */

public class MyProcessWindowFunction
    extends ProcessWindowFunction, String, String, TimeWindow> {

  @Override
  public void process(String key, Context context, Iterable> input, Collector out) {
    long count = 0;
    for (Tuple2 in: input) {
      count++;
    }
    out.collect("Window: " + context.window() + "count: " + count);
  }
}
val input: DataStream[(String, Long)] = ...

input
  .keyBy(_._1)
  .timeWindow(Time.minutes(5))
  .process(new MyProcessWindowFunction())

/* ... */

class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {

  def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = {
    var count = 0L
    for (in <- input) {
      count = count + 1
    }
    out.collect(s"Window ${context.window} count: $count")
  }
}

該示例顯示了ProcessWindowFunction對(duì)窗口中的數(shù)據(jù)元進(jìn)行計(jì)數(shù)的情況。此外,窗口函數(shù)將有關(guān)窗口的信息添加到輸出。

注意注意,使用ProcessWindowFunction簡單的聚合(例如count)是非常低效的

8 水印

推薦閱讀

Flink流計(jì)算編程--watermark(水位線)簡介

參考

Event Time

Windows

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/75548.html

相關(guān)文章

  • Flink實(shí)戰(zhàn)(六) - Table API &amp; SQL編程

    摘要:每個(gè)在簡潔性和表達(dá)性之間提供不同的權(quán)衡,并針對(duì)不同的用例。在這些中處理的數(shù)據(jù)類型在相應(yīng)的編程語言中表示為類。該是為中心的聲明性表,其可被動(dòng)態(tài)地改變的表表示流時(shí)。這種抽象在語義和表達(dá)方面類似于,但是將程序表示為查詢表達(dá)式。 1 意義 1.1 分層的 APIs & 抽象層次 Flink提供三層API。 每個(gè)API在簡潔性和表達(dá)性之間提供不同的權(quán)衡,并針對(duì)不同的用例。 showImg(ht...

    lifefriend_007 評(píng)論0 收藏0
  • Flink實(shí)戰(zhàn)(八) - Streaming Connectors 編程

    摘要:默認(rèn)情況下,當(dāng)數(shù)據(jù)元到達(dá)時(shí),分段接收器將按當(dāng)前系統(tǒng)時(shí)間拆分,并使用日期時(shí)間模式命名存儲(chǔ)區(qū)。如果需要,可以使用數(shù)據(jù)元或元組的屬性來確定目錄。這將調(diào)用傳入的數(shù)據(jù)元并將它們寫入部分文件,由換行符分隔。消費(fèi)者的消費(fèi)者被稱為或等。 1 概覽 1.1 預(yù)定義的源和接收器 Flink內(nèi)置了一些基本數(shù)據(jù)源和接收器,并且始終可用。該預(yù)定義的數(shù)據(jù)源包括文件,目錄和插socket,并從集合和迭代器攝取數(shù)據(jù)...

    beita 評(píng)論0 收藏0
  • 《從0到1學(xué)習(xí)Flink》—— 介紹Flink中的Stream Windows

    摘要:在每個(gè)事件上,觸發(fā)器都可以決定觸發(fā)即清除刪除窗口并丟棄其內(nèi)容,或者啟動(dòng)并清除窗口。請(qǐng)注意,指定的觸發(fā)器不會(huì)添加其他觸發(fā)條件,但會(huì)替換當(dāng)前觸發(fā)器。結(jié)論對(duì)于現(xiàn)代流處理器來說,支持連續(xù)數(shù)據(jù)流上的各種類型的窗口是必不可少的。 showImg(https://segmentfault.com/img/remote/1460000017892799?w=1280&h=720); 前言 目前有許多數(shù)...

    jifei 評(píng)論0 收藏0
  • Apache Flink 1.9 重大特性提前解讀

    showImg(https://segmentfault.com/img/remote/1460000019961426); 今天在 Apache Flink meetup ·北京站進(jìn)行 Flink 1.9 重大新特性進(jìn)行了講解,兩位講師分別是 戴資力/楊克特,zhisheng 我也從看完了整個(gè) 1.9 特性解讀的直播,預(yù)計(jì) Flink 1.9 版本正式發(fā)布時(shí)間大概是 7 月底 8 月初左右正式發(fā)...

    wall2flower 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看
<