文章目录
-
- 一.简介
- 二.窗口Join
-
- 2.1 翻滚窗口(Tumbling Window Join)
- 2.2 滑动窗口Join(Sliding Window Join)
-
- 2.3 会话窗口Join(Session Window Join)
- 2.4.小结
- 三.间隔Join
- 四.示例
-
- 4.1 间隔Join
- 4.2 窗口Join
一.简介
Flink DataStream API中内置有两个可以根据实际条件对数据流进行Join算子:基于间隔的Join和基于窗口的Join。
语义注意事项
- 创建两个流元素的成对组合的行为类似内连接,如果来自一个流的元素与另一个流没有相对应要连接的元素,则不会发出该元素。
- 结合在一起的那些元素将其时间戳设置为位于各自窗口中最大时间戳。例如:以[5,10]为边界的窗口将产生连接的元素的时间戳为9。
二.窗口Join
2.1 翻滚窗口(Tumbling Window Join)
执行滚动窗口连接(Tumbling Window Join)时,具有公共Key和公共Tumbling Window的所有元素都以成对组合形式进行连接,并传递给JoinFunction或FlatJoinFunction。因为这就像一个内连接,在滚动窗口中没有来自另一个流的元素的流的元素不会被输出。
如图所示,我们定义了一个大小为2毫秒的滚动窗口,其结果为[0,1],[2,3], …。该图像显示了每个窗口中所有元素的成对组合,这些元素将传递给JoinFunction。注意,在翻滚窗口[6,7]中没有发出任何内容,因为在绿色流中没有元素与橙色元素⑥、⑦连接。
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream.join(greenStream)
.where(elem => )
.equalTo(elem => )
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
.apply { (e1, e2) => e1 + "," + e2 }
2.2 滑动窗口Join(Sliding Window Join)
在执行滑动窗口连接(Sliding Window Join)时,具有公共Key和公共滑动窗口(Sliding Window )的所有元素都作为成对组合进行连接,并传递给JoinFunction或FlatJoinFunction。当前滑动窗口中没有来自另一个流的元素的流的元素不会被发出。
注意,有些元素可能会在一个滑动窗口中连接,但不会在另一个窗口中连接!
在本例中,我们使用的滑动窗口大小为2毫秒,滑动1毫秒,滑动窗口结果[1,0],[0,1],[1,2],[2、3],… x轴以下是每个滑动窗口的Join结果将被传递给JoinFunction的元素。在这里你还可以看到橙②与绿色③窗口Join(2、3),但不与任何窗口Join[1,2]。
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream.join(greenStream)
.where(elem => )
.equalTo(elem => )
.window(SlidingEventTimeWindows.of(Time.milliseconds(2) , Time.milliseconds(1) ))
.apply { (e1, e2) => e1 + "," + e2 }
2.3 会话窗口Join(Session Window Join)
在执行会话窗口连接时,具有相同键的所有元素(当“组合”时满足会话条件)都以成对的组合进行连接,并传递给JoinFunction或FlatJoinFunction。再次执行内部连接,因此如果会话窗口只包含来自一个流的元素,则不会发出任何输出。
在这里,定义一个会话窗口连接,其中每个会话被至少1ms的间隔所分割。有三个会话,在前两个会话中,来自两个流的连接元素被传递给JoinFunction。在第三次会话中绿色流没有元素,所以⑧⑨不会Join。
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream.join(greenStream)
.where(elem => )
.equalTo(elem => )
.window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
.apply { (e1, e2) => e1 + "," + e2 }
2.4.小结
除了对窗口中两条流进行Join,你还可以对它们进行Cogroup,只需将算子定义开始位置的Join()改为coGroup()即可,Join和Cogroup的总体逻辑相同。
二者区别:Join会为两侧输入中每个事件对调用JoinFunction;而Cogroup中CoGroupFunction会以两个输入的元素遍历器为参数,只在每个窗口中被调用一次。
三.间隔Join
interval join用一个公共Key连接两个流的元素(将它们称为A & B),其中流B的元素的时间戳具有相对于流A中的元素的时间戳。 这也可以更正式地表示为b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] or a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
其中a和b是A和B中共享一个公钥的元素。下界和上界都可以是负的或正的,只要下界小于或等于上界。interval连接目前只执行内部连接。
当将一对元素传递给ProcessJoinFunction时,它们将给两个元素分配更大的时间戳(可以通过ProcessJoinFunction.Context访问)。
注意:间隔连接目前只支持事件时间。
在上面的示例中,我们将“橙色”和“绿色”两个流连接起来,它们的下界为-2毫秒,上界为+1毫秒。默认情况下,这些是包含边界的,但是可以通过.lowerboundexclusive()和. upperboundexclusive()进行设置。
再用更正式的符号来表示angeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound 如三角形所示。
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream
.keyBy(elem => )
.intervalJoin(greenStream.keyBy(elem => ))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process(new ProcessJoinFunction[Integer, Integer, String] {
override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {
out.collect(left + "," + right);
}
});
});
四.示例
4.1 间隔Join
package com.lm.flink.datastream.join
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
object IntervalJoin {
def main(args: Array[String]): Unit = {
//设置至少一次或仅此一次语义
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置至少一次或仅此一次语义
env.enableCheckpointing(20000,CheckpointingMode.EXACTLY_ONCE)
//设置
env.getCheckpointConfig
.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//设置重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,50000))
env.setParallelism(1)
val dataStream1 = env.socketTextStream("localhost",9999)
val dataStream2 = env.socketTextStream("localhost",9998)
import org.apache.flink.api.scala._
val dataStreamMap1 = dataStream1.map(f=>{
val tokens = f.split(",")
StockTransaction(tokens(0),tokens(1),tokens(2).toDouble)
}).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[StockTransaction]{
var currentTimestamp = 0L
val maxOutOfOrderness = 1000L
override def getCurrentWatermark: Watermark = {
val tmpTimestamp = currentTimestamp - maxOutOfOrderness
println(s"wall clock is ${System.currentTimeMillis()} new watermark ${tmpTimestamp}")
new Watermark(tmpTimestamp)
}
override def extractTimestamp(element: StockTransaction, previousElementTimestamp: Long): Long = {
val timestamp = element.txTime.toLong
currentTimestamp = Math.max(timestamp,currentTimestamp)
println(s"get timestamp is $timestamp currentMaxTimestamp $currentTimestamp")
currentTimestamp
}
})
val dataStreamMap2 = dataStream2.map(f=>{
val tokens = f.split(",")
StockSnapshot(tokens(0),tokens(1),tokens(2).toDouble)
}).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[StockSnapshot]{
var currentTimestamp = 0L
val maxOutOfOrderness = 1000L
override def getCurrentWatermark: Watermark = {
val tmpTimestamp = currentTimestamp - maxOutOfOrderness
println(s"wall clock is ${System.currentTimeMillis()} new watermark ${tmpTimestamp}")
new Watermark(tmpTimestamp)
}
override def extractTimestamp(element: StockSnapshot, previousElementTimestamp: Long): Long = {
val timestamp = element.mdTime.toLong
currentTimestamp = Math.max(timestamp,currentTimestamp)
println(s"get timestamp is $timestamp currentMaxTimestamp $currentTimestamp")
currentTimestamp
}
})
dataStreamMap1.print("dataStreamMap1")
dataStreamMap2.print("dataStreamMap2")
dataStreamMap1.keyBy(_.txCode)
.intervalJoin(dataStreamMap2.keyBy(_.mdCode))
.between(Time.minutes(-10),Time.seconds(0))
.process(new ProcessJoinFunction[StockTransaction,StockSnapshot,String] {
override def processElement(left: StockTransaction, right: StockSnapshot, ctx: ProcessJoinFunction[StockTransaction, StockSnapshot, String]#Context, out: Collector[String]): Unit = {
out.collect(left.toString +" =Interval Join=> "+right.toString)
}
}).print()
env.execute("IntervalJoin")
}
case class StockTransaction(txTime:String,txCode:String,txValue:Double) extends Serializable{
override def toString: String = txTime +"#"+txCode+"#"+txValue
}
case class StockSnapshot(mdTime:String,mdCode:String,mdValue:Double) extends Serializable {
override def toString: String = mdTime +"#"+mdCode+"#"+mdValue
}
}
结果
get timestamp is 1603708942 currentMaxTimestamp 1603708942
dataStreamMap1> 1603708942#000001#10.4
get timestamp is 1603708942 currentMaxTimestamp 1603708942
dataStreamMap2> 1603708942#000001#10.4
1603708942#000001#10.4 =Interval Join=> 1603708942#000001#10.4
4.2 窗口Join
package com.lm.flink.datastream.join
import java.lang
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
object InnerLeftRightJoinTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//每9秒发出一个watermark
env.setParallelism(1)
env.getConfig.setAutoWatermarkInterval(9000)
val dataStream1 = env.socketTextStream("localhost", 9999)
val dataStream2 = env.socketTextStream("localhost", 9998)
import org.apache.flink.api.scala._
val dataStreamMap1 = dataStream1
.map(f => {
val tokens = f.split(",")
StockTransaction(tokens(0), tokens(1), tokens(2).toDouble)
}).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[StockTransaction] {
var currentTimestamp = 0L
val maxOutOfOrderness = 1000L
override def getCurrentWatermark: Watermark = {
val tmpTimestamp = currentTimestamp - maxOutOfOrderness
println(s"wall clock is ${System.currentTimeMillis()} new watermark ${tmpTimestamp}")
new Watermark(tmpTimestamp)
}
override def extractTimestamp(element: StockTransaction, previousElementTimestamp: Long): Long = {
val timestamp = element.txTime.toLong
currentTimestamp = Math.max(timestamp, currentTimestamp)
println(s"get timestamp is $timestamp currentMaxTimestamp $currentTimestamp")
currentTimestamp
}
})
val dataStreamMap2 = dataStream2
.map(f => {
val tokens = f.split(",")
StockSnapshot(tokens(0), tokens(1), tokens(2).toDouble)
}).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[StockSnapshot] {
var currentTimestamp = 0L
val maxOutOfOrderness = 1000L
override def getCurrentWatermark: Watermark = {
val tmpTimestamp = currentTimestamp - maxOutOfOrderness
println(s"wall clock is ${System.currentTimeMillis()} new watermark ${tmpTimestamp}")
new Watermark(tmpTimestamp)
}
override def extractTimestamp(element: StockSnapshot, previousElementTimestamp: Long): Long = {
val timestamp = element.mdTime.toLong
currentTimestamp = Math.max(timestamp, currentTimestamp)
println(s"get timestamp is $timestamp currentMaxTimestamp $currentTimestamp")
currentTimestamp
}
})
dataStreamMap1.print("dataStreamMap1")
dataStreamMap2.print("dataStreamMap2")
val joinedStream = dataStreamMap1.coGroup(dataStreamMap2)
.where(_.txCode)
.equalTo(_.mdCode)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
val innerJoinedStream = joinedStream.apply(new InnerJoinFunction)
val leftJoinedStream = joinedStream.apply(new LeftJoinFunction)
val rightJoinedStream = joinedStream.apply(new RightJoinFunction)
innerJoinedStream.name("InnerJoinedStream").print()
leftJoinedStream.name("LeftJoinedStream").print()
rightJoinedStream.name("RightJoinedStream").print()
env.execute("InnerLeftRightJoinTest")
}
class InnerJoinFunction extends CoGroupFunction[StockTransaction, StockSnapshot, (String, String, String, Double, Double, String)] {
override def coGroup(first: lang.Iterable[StockTransaction], second: lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double, String)]): Unit = {
import scala.collection.JavaConverters._
val scalaT1 = first.asScala.toList
val scalaT2 = second.asScala.toList
println(scalaT1.size)
println(scalaT2.size)
if (scalaT1.nonEmpty && scalaT2.nonEmpty) {
for (transaction <- scalaT1) {
for (snapshot <- scalaT2) {
out.collect(transaction.txCode, transaction.txTime, snapshot.mdTime, transaction.txValue, snapshot.mdValue, "Inner Join Test")
}
}
}
}
}
class LeftJoinFunction extends CoGroupFunction[StockTransaction, StockSnapshot, (String, String, String, Double, Double, String)] {
override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double, String)]): Unit = {
import scala.collection.JavaConverters._
val scalaT1 = T1.asScala.toList
val scalaT2 = T2.asScala.toList
if (scalaT1.nonEmpty && scalaT2.isEmpty) {
for (transaction <- scalaT1) {
out.collect(transaction.txCode, transaction.txTime, "", transaction.txValue, 0, "Left Join Test")
}
}
}
}
class RightJoinFunction extends CoGroupFunction[StockTransaction, StockSnapshot, (String, String, String, Double, Double, String)] {
override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double, String)]): Unit = {
import scala.collection.JavaConverters._
val scalaT1 = T1.asScala.toList
val scalaT2 = T2.asScala.toList
if (scalaT1.isEmpty && scalaT2.nonEmpty) {
for (snapshot <- scalaT2) {
out.collect(snapshot.mdCode, "", snapshot.mdTime, 0, snapshot.mdValue, "Right Join Test")
}
}
}
}
case class StockTransaction(txTime: String, txCode: String, txValue: Double)
case class StockSnapshot(mdTime: String, mdCode: String, mdValue: Double)
}
参考
https://www.jianshu.com/p/ba19e4d1d802
公众号
名称:大数据计算
微信号:bigdata_limeng