亚洲成Av人片在线观看不卡|中文字幕在线精品无码一区|国产精品福利午夜h视频|手机看片AV永久免费,91天堂在线视频,最新2021年偷拍精品视频,国产成人一区二区在线视频

您當(dāng)前的位置 :寧夏資訊網(wǎng) > 資訊 >  內(nèi)容正文
投稿

Flink教程:DataStream上的Join操作!

寧夏資訊網(wǎng) 2020-11-19 05:35:10 來源: 閱讀:-

批處理經(jīng)常要解決的問題是將兩個(gè)數(shù)據(jù)源做關(guān)聯(lián)Join操作。比如,很多手機(jī)APP都有一個(gè)用戶數(shù)據(jù)源User,同時(shí)APP會記錄用戶的行為,我們稱之為Behavior,兩個(gè)表按照userId來進(jìn)行Join。在流處理場景下,F(xiàn)link也支持了Join,只不過Flink是在一個(gè)時(shí)間窗口上來進(jìn)行兩個(gè)表的Join。

Flink教程:DataStream上的Join操作

Join示例圖

目前,F(xiàn)link支持了兩種Join:Window Join(窗口連接)和Interval Join(時(shí)間間隔連接。

Window Join

從名字中能猜到,Window Join主要在Flink的窗口上進(jìn)行操作,它將兩個(gè)流中落在相同窗口的元素按照某個(gè)Key進(jìn)行Join。一個(gè)Window Join的大致骨架結(jié)構(gòu)為:

input1.join(input2)    .where()      <- input1使用哪個(gè)字段作為Key    .equalTo()    <- input2使用哪個(gè)字段作為Key    .window()  <- 指定WindowAssigner    [.trigger()]      <- 指定Trigger(可選)    [.evictor()]      <- 指定Evictor(可選)    .apply()     <- 指定JoinFunction

下圖展示了Join的大致過程。兩個(gè)輸入數(shù)據(jù)流先分別按Key進(jìn)行分組,然后將元素劃分到窗口中。窗口的劃分需要使用WindowAssigner來定義,這里可以使用Flink提供的滾動窗口、滑動窗口或會話窗口等默認(rèn)的WindowAssigner。隨后兩個(gè)數(shù)據(jù)流中的元素會被分配到各個(gè)窗口上,也就是說一個(gè)窗口會包含來自兩個(gè)數(shù)據(jù)流的元素。相同窗口內(nèi)的數(shù)據(jù)會以INNER JOIN的語義來相互關(guān)聯(lián),形成一個(gè)數(shù)據(jù)對。當(dāng)窗口的時(shí)間結(jié)束,F(xiàn)link會調(diào)用JoinFunction來對窗口內(nèi)的數(shù)據(jù)對進(jìn)行處理。當(dāng)然,我們也可以使用Trigger或Evictor做一些自定義優(yōu)化,他們的使用方法和普通窗口的使用方法一樣。

Flink教程:DataStream上的Join操作

Join的大致流程

接下來我們重點(diǎn)分析一下兩個(gè)數(shù)據(jù)流是如何INNER JOIN的:

Flink教程:DataStream上的Join操作

窗口內(nèi)數(shù)據(jù)INNER JOIN示意圖

一般滴,INNER JOIN只對兩個(gè)數(shù)據(jù)源都出現(xiàn)的元素做Join,形成一個(gè)數(shù)據(jù)對,即數(shù)據(jù)源input1中的某個(gè)元素與數(shù)據(jù)源input2中的所有元素逐個(gè)配對。當(dāng)數(shù)據(jù)源某個(gè)窗口內(nèi)沒數(shù)據(jù)時(shí),比如圖中的第三個(gè)窗口,Join的結(jié)果也是空的。

class MyJoinFunction extends JoinFunction[(String, Int), (String, Int), String] {  override def join(input1: (String, Int), input2: (String, Int)): String = {    "input 1 :" + input1._2 + ", input 2 :" + input2._2  }}val input1: DataStream[(String, Int)] = ...val input2: DataStream[(String, Int)] = ...val joinResult = input1.join(input2)      .where(i1 => i1._1)      .equalTo(i2 => i2._1)      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))      .apply(new MyJoinFunction)

上面的代碼自定義了JoinFunction,并將Join結(jié)果打印出來。無論代碼中演示的滾動窗口,還是滑動窗口或會話窗口,其原理都是一樣的。除了JoinFunction,F(xiàn)link還提供了FlatJoinFunction,其功能是輸出零到多個(gè)結(jié)果。

如果INNER JOIN不能滿足我們的需求,CoGroupFunction提供了更多可自定義的功能。需要注意的是,在調(diào)用時(shí),要寫成input1.coGroup(input2).where().equalTo()。

class MyCoGroupFunction extends CoGroupFunction[(String, Int), (String, Int), String] {  // 這里的類型是Java的Iterable,需要引用 collection.JavaConverters._ 并轉(zhuǎn)成Scala  override def coGroup(input1: lang.Iterable[(String, Int)], input2: lang.Iterable[(String, Int)], out: Collector[String]): Unit = {    input1.asScala.foreach(element => out.collect("input1 :" + element.toString()))    input2.asScala.foreach(element => out.collect("input2 :" + element.toString()))  }}val input1: DataStream[(String, Int)] = ...val input2: DataStream[(String, Int)] = ...val coGroupResult = input1.coGroup(input2)      .where(i1 => i1._1)      .equalTo(i2 => i2._1)      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))      .apply(new MyCoGroupFunction)

Interval Join

與Window Join不同,Interval Join不依賴Flink的WindowAssigner,而是根據(jù)一個(gè)時(shí)間間隔(Interval)界定時(shí)間。Interval需要一個(gè)時(shí)間下界(lower bound)和上界(upper bound),如果我們將input1和input2進(jìn)行Interval Join,input1中的某個(gè)元素為input1.element1,時(shí)間戳為input1.element1.ts,那么一個(gè)Interval就是[input1.element1.ts + lower bound, input1.element1.ts + upper bound],input2中落在這個(gè)時(shí)間段內(nèi)的元素將會和input1.element1組成一個(gè)數(shù)據(jù)對。用數(shù)學(xué)公式表達(dá)為,凡是符合下面公式input1.element1.ts + lower bound <= input2.elementx.ts <=input1.element1.ts + upper bound的元素使用INNER JOIN語義,兩兩組合在一起。上下界可以是正數(shù)也可以是負(fù)數(shù)。

注意,目前Flink(1.9)的Interval Join只支持Event Time語義。

Flink教程:DataStream上的Join操作

Interval Join示意圖

下面的代碼展示了如何對兩個(gè)數(shù)據(jù)流進(jìn)行Interval Join:

class MyProcessFunction extends ProcessJoinFunction[(String, Long, Int), (String, Long, Int), String] {  override def processElement(input1: (String, Long, Int),                              input2: (String, Long, Int),                              context: ProcessJoinFunction[(String, Long, Int), (String, Long, Int), String]#Context,                              out: Collector[String]): Unit = {    out.collect("input 1: " + input1.toString() + ", input 2: " + input2.toString)  }}// 數(shù)據(jù)流有三個(gè)字段:(key, 時(shí)間戳, 數(shù)值)val input1: DataStream[(String, Long, Int)] = ...val input2: DataStream[(String, Long, Int)] = ...val intervalJoinResult = input1.keyBy(_._1)      .intervalJoin(input2.keyBy(_._1))      .between(Time.milliseconds(-5), Time.milliseconds(10))      .process(new MyProcessFunction)

默認(rèn)的時(shí)間間隔是包含上下界的,我們可以使用.lowerBoundExclusive() 和.upperBoundExclusive來確定是否需要包含上下界。

val intervalJoinResult = input1.keyBy(_._1)      .intervalJoin(input2.keyBy(_._1))      .between(Time.milliseconds(-5), Time.milliseconds(10))      .upperBoundExclusive()      .lowerBoundExclusive()      .process(new MyProcessFunction)

Interval Join內(nèi)部是用緩存來存儲所有數(shù)據(jù)的,因此需要注意緩存數(shù)據(jù)不能太大,以免對內(nèi)存造成絕大壓力。

(正文已結(jié)束)

推薦閱讀:大股網(wǎng)

免責(zé)聲明及提醒:此文內(nèi)容為本網(wǎng)所轉(zhuǎn)載企業(yè)宣傳資訊,該相關(guān)信息僅為宣傳及傳遞更多信息之目的,不代表本網(wǎng)站觀點(diǎn),文章真實(shí)性請瀏覽者慎重核實(shí)!任何投資加盟均有風(fēng)險(xiǎn),提醒廣大民眾投資需謹(jǐn)慎!

網(wǎng)站簡介 - 聯(lián)系我們 - 營銷服務(wù) - XML地圖 - 版權(quán)聲明 - 網(wǎng)站地圖TXT
Copyright.2002-2019 寧夏資訊網(wǎng) 版權(quán)所有 本網(wǎng)拒絕一切非法行為 歡迎監(jiān)督舉報(bào) 如有錯(cuò)誤信息 歡迎糾正