大数据Flink进阶(六):Flink入门案例
Flink入门案例
需求:读取本地数据文件,统计文件中每个单词出现的次数。
一、IDEA Project创建及配置
本案例编写Flink代码选择语言为Java和Scala,所以这里我们通过IntelliJ IDEA创建一个目录,其中包括Java项目模块和Scala项目模块,将Flink Java api和Flink Scala api分别在不同项目模块中实现。步骤如下:
1、打开IDEA,创建空项目
2、在IntelliJ IDEA 中安装Scala插件
使用IntelliJ IDEA开发Flink,如果使用Scala api 那么还需在IntelliJ IDEA中安装Scala的插件,如果已经安装可以忽略此步骤,下图为以安装Scala插件。
【资料图】
3、打开Structure,创建项目新模块
创建Java模块:
继续点击"+",创建Scala模块:
创建好"FlinkScalaCode"模块后,右键该模块添加Scala框架支持,并修改该模块中的"java"src源为"scala":
在"FlinkScalaCode"模块Maven pom.xml中引入Scala依赖包,这里使用的Scala版本为2.12.10。
org.scala-lang scala-library 2.12.10 org.scala-lang scala-compiler 2.12.10 org.scala-lang scala-reflect 2.12.10
4、Log4j日志配置
为了方便查看项目运行过程中的日志,需要在两个项目模块中配置log4j.properties配置文件,并放在各自项目src/main/resources资源目录下,没有resources资源目录需要手动创建并设置成资源目录。log4j.properties配置文件内容如下:
log4j.rootLogger=ERROR, consolelog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.target=System.outlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss} %p %c{2}: %m%n
复制
并在两个项目中的Maven pom.xml中添加对应的log4j需要的依赖包,使代码运行时能正常打印结果:
org.slf4j slf4j-log4j12 1.7.36 org.apache.logging.log4j log4j-to-slf4j 2.17.2
5、分别在两个项目模块中导入Flink Maven依赖
"FlinkJavaCode"模块导入Flink Maven依赖如下:
UTF-8 1.8 1.8 1.16.0 1.7.36 2.17.2 org.apache.flink flink-clients ${flink.version} org.slf4j slf4j-log4j12 ${slf4j.version} org.apache.logging.log4j log4j-to-slf4j ${log4j.version}
"FlinkScalaCode"模块导入Flink Maven依赖如下:
UTF-8 1.8 1.8 1.16.0 1.7.31 2.17.1 2.12.10 2.12 org.apache.flink flink-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-clients ${flink.version} org.scala-lang scala-library ${scala.version} org.scala-lang scala-compiler ${scala.version} org.scala-lang scala-reflect ${scala.version} org.slf4j slf4j-log4j12 ${slf4j.version} org.apache.logging.log4j log4j-to-slf4j ${log4j.version}
注意:在后续实现WordCount需求时,Flink Java Api只需要在Maven中导入"flink-clients"依赖包即可,而Flink Scala Api 需要导入以下三个依赖包:
flink-scala_${scala.binary.version}flink-streaming-scala_${scala.binary.version}flink-clients
主要是因为在Flink1.15版本后,Flink添加对opting-out(排除)Scala的支持,如果你只使用Flink的Java api,导入包不必包含scala后缀,如果使用Flink的Scala api,需要选择匹配的Scala版本。
二、案例数据准备
在项目"MyFlinkCode"中创建"data"目录,在目录中创建"words.txt"文件,向文件中写入以下内容,方便后续使用Flink编写WordCount实现代码。
hello Flinkhello MapReducehello Sparkhello Flinkhello Flinkhello Flinkhello Flinkhello Javahello Scalahello Flinkhello Javahello Flinkhello Scalahello Flinkhello Flinkhello Flink
三、案例实现
数据源分为有界和无界之分,有界数据源可以编写批处理程序,无界数据源可以编写流式程序。DataSet API用于批处理,DataStream API用于流式处理。
批处理使用ExecutionEnvironment和DataSet,流式处理使用StreamingExecutionEnvironment和DataStream。DataSet和DataStream是Flink中表示数据的特殊类,DataSet处理的数据是有界的,DataStream处理的数据是无界的,这两个类都是不可变的,一旦创建出来就无法添加或者删除数据元。
1、Flink 批数据处理案例
Java版本WordCount使用Flink Java Dataset api实现WordCount具体代码如下:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//1.读取文件DataSource linesDS = env.readTextFile("./data/words.txt");//2.切分单词FlatMapOperator wordsDS = linesDS.flatMap((String lines, Collector collector) -> { String[] arr = lines.split(" "); for (String word : arr) { collector.collect(word); }}).returns(Types.STRING);//3.将单词转换成Tuple2 KV 类型MapOperator> kvWordsDS = wordsDS.map(word -> new Tuple2<>(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));//4.按照key 进行分组处理得到最后结果并打印kvWordsDS.groupBy(0).sum(1).print();
Scala版本WordCount
使用Flink Scala Dataset api实现WordCount具体代码如下:
//1.准备环境,注意是Scala中对应的Flink环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//2.导入隐式转换,使用Scala API 时需要隐式转换来推断函数操作后的类型import org.apache.flink.api.scala._//3.读取数据文件val linesDS: DataSet[String] = env.readTextFile("./data/words.txt")//4.进行 WordCount 统计并打印linesDS.flatMap(line => { line.split(" ")}) .map((_, 1)) .groupBy(0) .sum(1) .print()
以上无论是Java api 或者是Scala api 输出结果如下,显示的最终结果是统计好的单词个数。
(hello,15)(Spark,1)(Scala,2)(Java,2)(MapReduce,1)(Flink,10)
2、Flink流式数据处理案例
Java版本WordCount使用Flink Java DataStream api实现WordCount具体代码如下:
//1.创建流式处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.读取文件数据DataStreamSource lines = env.readTextFile("./data/words.txt");//3.切分单词,设置KV格式数据SingleOutputStreamOperator> kvWordsDS = lines.flatMap((String line, Collector> collector) -> { String[] words = line.split(" "); for (String word : words) { collector.collect(Tuple2.of(word, 1L)); }}).returns(Types.TUPLE(Types.STRING, Types.LONG));//4.分组统计获取 WordCount 结果kvWordsDS.keyBy(tp->tp.f0).sum(1).print();//5.流式计算中需要最后执行execute方法env.execute();
Scala版本WordCount使用Flink Scala DataStream api实现WordCount具体代码如下:
//1.创建环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.导入隐式转换,使用Scala API 时需要隐式转换来推断函数操作后的类型import org.apache.flink.streaming.api.scala._//3.读取文件val ds: DataStream[String] = env.readTextFile("./data/words.txt")//4.进行wordCount统计ds.flatMap(line=>{line.split(" ")}) .map((_,1)) .keyBy(_._1) .sum(1) .print()//5.最后使用execute 方法触发执行env.execute()
以上输出结果开头展示的是处理当前数据的线程,一个Flink应用程序执行时默认的线程数与当前节点cpu的总线程数有关。
3、DataStream BATCH模式
下面使用Java代码使用DataStream API 的Batch 模式来处理批WordCount代码,方式如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置批运行模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);DataStreamSource linesDS = env.readTextFile("./data/words.txt");SingleOutputStreamOperator> wordsDS = linesDS.flatMap(new FlatMapFunction>() { @Override public void flatMap(String lines, Collector> out) throws Exception { String[] words = lines.split(" "); for (String word : words) { out.collect(new Tuple2<>(word, 1L)); } }});wordsDS.keyBy(tp -> tp.f0).sum(1).print();env.execute();
以上代码运行完成之后结果如下,可以看到结果与批处理结果类似,只是多了对应的处理线程号。
3> (hello,15)8> (Flink,10)8> (Spark,1)7> (Java,2)7> (Scala,2)7> (MapReduce,1)
此外,Stream API 中除了可以设置Batch批处理模式之外,还可以设置 AUTOMATIC、STREAMING模式,STREAMING 模式是流模式,AUTOMATIC模式会根据数据是有界流/无界流自动决定采用BATCH/STREAMING模式来读取数据,设置方式如下:
//BATCH 设置批处理模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);//AUTOMATIC 会根据有界流/无界流自动决定采用BATCH/STREAMING模式env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//STREAMING 设置流处理模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
除了在代码中设置处理模式外,还可以在Flink配置文件(flink-conf.yaml)中设置execution.runtime-mode参数来指定对应的模式,也可以在集群中提交Flink任务时指定execution.runtime-mode来指定,Flink官方建议在提交Flink任务时指定执行模式,这样减少了代码配置给Flink Application提供了更大的灵活性,提交任务指定参数如下:
$FLINK_HOME/bin/flink run -Dexecution.runtime-mode=BATCH -c xxx xxx.jar
-
大数据Flink进阶(六):Flink入门案例
腾讯云 2023-03-21
-
开心的反义词_冷静的反义词
互联网 2023-03-21
-
每日热点:亚太股份:3月20日融券卖出3.02万股,融资融券余额1.26亿元
证券之星 2023-03-21
-
环球时讯:双重生男主挽回女主的虐文_双重生男主挽回女主
互联网 2023-03-21
-
天丝棉是什么面料_天丝棉-天天关注
元宇宙网 2023-03-21
-
兴发集团(600141):湖北兴发化工集团股份有限公司关于召开2023年第二次临时股东大会,审议关于修订《公司章程》的议案等议案
自选股智能写手 2023-03-20
-
三岁小孩眼睛散光可以治好吗_眼睛散光可以治好吗
互联网 2023-03-20
-
天天快看点丨山西省财政加力支持全省经济良好开局
山西新闻网 2023-03-20
-
3月LPR“按兵不动” 银行净息差承压或限制调降空间
新华财经 2023-03-20
-
“文心一言”率先上车,长安汽车要抢占智能化下半场制高点 世界快看点
澎湃新闻 2023-03-20
-
大数据Flink进阶(六):Flink入门案例
腾讯云 2023-03-21
-
开心的反义词_冷静的反义词
互联网 2023-03-21
-
每日热点:亚太股份:3月20日融券卖出3.02万股,融资融券余额1.26亿元
证券之星 2023-03-21
-
环球时讯:双重生男主挽回女主的虐文_双重生男主挽回女主
互联网 2023-03-21
-
天丝棉是什么面料_天丝棉-天天关注
元宇宙网 2023-03-21
-
兴发集团(600141):湖北兴发化工集团股份有限公司关于召开2023年第二次临时股东大会,审议关于修订《公司章程》的议案等议案
自选股智能写手 2023-03-20
-
三岁小孩眼睛散光可以治好吗_眼睛散光可以治好吗
互联网 2023-03-20
-
天天快看点丨山西省财政加力支持全省经济良好开局
山西新闻网 2023-03-20
-
3月LPR“按兵不动” 银行净息差承压或限制调降空间
新华财经 2023-03-20
-
“文心一言”率先上车,长安汽车要抢占智能化下半场制高点 世界快看点
澎湃新闻 2023-03-20
-
如果说《炎龙骑士团》是最好玩的战棋游戏,大家没什么意见吧!
街机时代 2023-03-20
-
和讯个股快报:2023年03月20日 长鸿高科 (605008),该股股价成功突破年线压力位-当前消息
自选股智能写手 2023-03-20
-
设备接地线怎么接图片_地线怎么接图片
元宇宙网 2023-03-20
-
有容乃大_无欲则刚的意思 环球微资讯
科学教育网 2023-03-20
-
峰平谷电价怎么计算_峰平谷电费计算表-天天速递
互联网 2023-03-19
-
全球球精选!rna复制酶_rna复制
科学教育网 2023-03-19
-
世界今日讯!漫威:钢铁侠是电影宇宙最重要,并非漫画最重要,漫威亲儿子是他
娱乐知长街 2023-03-19
-
连续6周净买入!
中国基金报 2023-03-19
-
焦点简讯:网络优化工程师有多累_网络优化工程师
互联网 2023-03-19
-
全球微头条丨台湾电价大涨 舆论批当局错误能源政策冲击产业和民生
中国网 2023-03-19
-
【全球新要闻】水浒传108位好汉的名字和绰号及事件_水浒传108位好汉的名字和绰号
互联网 2023-03-18
-
面试失败后我才明白,有些离职理由不能瞎说 全球热推荐
企业管理 2023-03-18
-
男过女人关唯一一位通关者_男过女-今日报
互联网 2023-03-18
-
口碑和销量都是一流的 静态体验逸动
汽车之家 2023-03-18
-
全球快看:新增一条车道!西苑医院西门前的路变畅通啦
北京海淀 2023-03-18
-
3月17日基金净值:中银顺泽回报一年持有期混合A最新净值0.9028,跌0.06%
证券之星 2023-03-18
-
世界视讯!美的集团:接受中信证券等机构调研
每日经济新闻 2023-03-17
-
英国蓝皮书:英国发展报告_2020-2021_关于英国蓝皮书:英国发展报告_2020-2021的简介
互联网 2023-03-17
-
鸭子的吃法大全(鸭子怎么做好吃法大全)-即时看
乐拇指 2023-03-17
-
天天快资讯:博创科技(300548)3月17日主力资金净买入255.64万元
证券之星 2023-03-17