flink的streaming api 统计文本中的字段个数

发布时间:2026/6/26 1:56:14

flink的streaming api 统计文本中的字段个数 1.flink 的streaming api初步学习有界数据流处理文件数据处理。package com.ycl; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class WordCountStreamDemo { public static void main(String[] args) throws Exception { //1.创建执行环境 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); //2.读取数据 DataStreamSourceString lineDS env.readTextFile(input/word.txt); //3.处理数据 切分转换分组聚合。 SingleOutputStreamOperatorTuple2String, Integer wordAndOneDS lineDS.flatMap(new FlatMapFunctionString, Tuple2String, Integer() { Override public void flatMap(String value, CollectorTuple2String, Integer out) throws Exception { //按照 空格切分 String[] words value.split( ); for (String word : words) { //转换成 二元组 word,1 Tuple2String, Integer wordAndOne Tuple2.of(word, 1); //通过采集器向下游发送数据 out.collect(wordAndOne); } } }); //3.2 分组 KeyedStreamTuple2String, Integer, String wordAndOneKS wordAndOneDS.keyBy(new KeySelectorTuple2String, Integer, String() { Override public String getKey(Tuple2String, Integer value) throws Exception { return value.f0; } }); //3.3聚合 SingleOutputStreamOperatorTuple2String, Integer sumDS wordAndOneKS.sum(1); //4.输出数据 sumDS.print(); //5.执行:类似 sparkstreaming 最后ssc.start(); env.execute(); } } /** 接口 A,里面有一个方法a(); 正常写法定义个 class B,实现接口A,方法a() B b new B(); 匿名实现类: new A(){ a(){} } */输出结果如下;前面的编号是并行度线程编号。在 DataSet API 里面分组使用的groupBy ; 在streaming里面使用的分组函数是: keyBy;执行环境: DataSet 是:ExecutionEnvironment, Streaming 是: StreamExecutionEnvironment调用: DataStream里面 env.execute();是必须调用的。DataSet不用去调用。

相关新闻