快速入门Flink(8)——Flink中的流式处理Transformation操作

   日期:2020-09-28     浏览:101    评论:0    
核心提示:        上篇博客给大家讲解了DataSource与DataSink本篇文章准备给大家讲解下Stream中的最长用的几种Transformation操作(收藏,收藏,收藏重要事情说三遍)。一、KeyBy逻辑上将一个流分成不相交的分区,每个分区包含相同键的元素。在内部,这是通过散 列分区来实现的import org.apache.flink.streaming.api.scala._/** * @author.


        上篇博客给大家讲解了DataSource与DataSink本篇文章准备给大家讲解下Stream中的最长用的几种Transformation操作(收藏,收藏,收藏重要事情说三遍)。

一、KeyBy

逻辑上将一个流分成不相交的分区,每个分区包含相同键的元素。在内部,这是通过散 列分区来实现的


import org.apache.flink.streaming.api.scala._


object StreamKeyBy { 
  def main(args: Array[String]): Unit = { 
    //1.构建流处理运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //2.使用socket构建数据源
    val socketDataSource = env.socketTextStream("node01", 9999)
    //3.处理数据
    val keyBy = socketDataSource.flatMap(_.split(" ")).map((_, 1)).keyBy(0)
    //4.输出
    keyBy.print("StreamKeyBy")
    //5.任务执行
    env.execute("StreamKeyBy")
  }
}

二、Connect

用来将两个 dataStream
组装成一个 ConnectedStreams 而且这个 connectedStream 的组成结构就是保留原有的 dataStream 的结构体;这样我们 就可以把不同的数据组装成同一个结构


import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._


object StreamConnect { 
  def main(args: Array[String]): Unit = { 
    //1.构建批处理运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //2.构建2个数据流
    val source1 = env.addSource(new MyNoParallelSource).setParallelism(1)
    val source2 = env.addSource(new MyNoParallelSource).setParallelism(1)
    //3.使用合并流
    val connectStream = source1.connect(source2)
    val result = connectStream.map(function1 => { 
      "function1" + function1
    }, function2 => { 
      "function2" + function2
    })
    //4.输出
    result.print()
    //5.任务启动
    env.execute("StreamConnect")
  }

  class MyNoParallelSource() extends SourceFunction[Long] { 
    var count = 1L
    var isRunning = true

    override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = { 
      while (isRunning) { 
        sourceContext.collect(count)
        count += 1
        Thread.sleep(1000)
        if (count > 5) { 
          cancel()
        }
      }
    }

    override def cancel(): Unit = { 
      isRunning = false
    }
  }
}

三、Split 和 select


Split 就是将一个 DataStream 分成两个或者多个 DataStream Select 就是获取分流后对应的数据
需求: 给出数据 1, 2, 3, 4, 5, 6, 7
请使用 split 和 select 把数据中的奇偶数分开,并打印出奇数


import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment


object StreamSplit { 
  def main(args: Array[String]): Unit = { 
    //1.构建流处理运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //2.构建数据集
    val source = env.generateSequence(1, 10)
    //3.使用split将数据进行切分
    val splitStream = source.split(x => { 
      (x % 2) match { 
        case 0 => List("偶数")
        case 1 => List("奇数")
      }
    })
    //4.获取奇数并打印
    val result = splitStream.select("奇数")
    result.print()
    //5.任务执行
    env.execute("StreamSplit")
  }
}

 
打赏
 本文转载自:网络 
所有权利归属于原作者,如文章来源标示错误或侵犯了您的权利请联系微信13520258486
更多>最近资讯中心
更多>最新资讯中心
0相关评论

推荐图文
推荐资讯中心
点击排行
最新信息
新手指南
采购商服务
供应商服务
交易安全
关注我们
手机网站:
新浪微博:
微信关注:

13520258486

周一至周五 9:00-18:00
(其他时间联系在线客服)

24小时在线客服