从零开始构建第一个Flink项目

Posted by 拜占庭 on 2021-02-19

1、从零开始构建第一个Flink项目

1.1、构建项目

Flink项目依赖于Jdk8和maven环境,我们可以通过flink官方提供的maven archetype模板快速生成第一个项目

1
2
3
4
5
6
7
8
9
10
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.11.0 \
-DgroupId=first-flink-project \
-DartifactId=first-flink-project \
-Dversion=0.1 \
-Dpackage=com.first.flink \
-DinteractiveMode=false \
-DarchetypeCatalog=internal

mvn archetype:generate 构建模板项目速度慢得令人发指,增加-X显示debug级别的调试信息
[INFO] Generating project in Batch mode
[DEBUG] Searching for remote catalog: https://repo.maven.apache.org/maven2/archetype-catalog.xml

加上-DarchetypeCatalog=internal 运行参数,archetype-catalog.xml从本地获取就能解决这个问题
通过上诉命令生成的项目结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
192:first-flink-project root$ tree
.
├── pom.xml
└── src
└── main
├── java
│ └── com
│ └── first
│ └── flink
│ ├── BatchJob.java
│ └── StreamingJob.java
└── resources
└── log4j2.properties

7 directories, 4 files

pom文件中已包含flink项目所需版本的依赖Jar包,打开Intellij IEDA编辑器根据提示导入项目:
在这里插入图片描述

1.2、编写第一个项目

在第一个项目中我们以FLINK提供的wordcount为例,可以学习到Flink 核心API提供的流式Stream和批式Batch编程的基本结构。

1.2.1、批式编程

在目录 src/main/com.first.flink.examples 下创建 WordCountBatchByJava 类,首先创建执行环境ExecutionEnvironment,执行环境提供了续作的方法用于控制任务的执行,比如设置parallelism等

1
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

下一步创建数据源,Flink 提供了DataSet API 用于处理批量数据,

1
DataSet<String> text = env.fromElements("i live flink", "i love java", "i love scala");

接下来将DataSet通过flink提供的各种算子如flatMap、grouby、sum等进行转换

1
DataSet<Tuple2<String, Integer>> ds = text.flatMap(new LineSplitter()).groupBy(0).sum(1);

LineSplitter 实现了 Flink 提供的 FlatMapFunction 接口。本例中我们统计每个单词出现的次数,因此需要将输入的字符串解析成单词和次数(Tuple2<String, Integer>),Tuple2 的第一个字段是单词,第二个字段是次数。通过 flatMap 算子来解析是因为输入的每一行字符串可能包含多个单词,字符串按照空格(这里比较简单)进行分解。接着grouBy(0)算子按照第一个字段(0是索引)也就是单词进行分组,sum(1)是按照第二个字段求和统计次数

1
2
3
4
5
6
7
8
9
static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String line,
Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word : line.split(" ")) {
collector.collect(new Tuple2<>(word, 1));
}
}
}

最后将数据输出到目的地控制台
ds.print();
输出结果如下:
(scala,1)
(flink,1)
(love,2)
(live,1)
(i,3)
(java,1)

1.2.2、流式编程

在目录 src/main/com.first.flink.examples 下创建 WordCountStreamByJava 类,首先创建流式Stream任务的执行环境StreamExecutionEnvironment,执行环境提供了续作的方法用于控制任务的执行,比如设置parallelism、容错机制配置、checkpoint配置等

1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

下一步创建流式数据源,Flink 提供了DataStream API 用于处理流式数据,这里我们设置socket数据源,从9999端口读取数据,每一条数据以 “\n”为结尾

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
       DataStreamSource<String> source = env.socketTextStream("127.0.0.1", 9999, "\n");
// 转化处理数据
DataStream<WordWithCount> dataStream = source
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String line,
Collector<WordWithCount> collector) throws Exception {
for (String word : line.split(" ")) {
collector.collect(new WordWithCount(word, 1));
}
}
}).keyBy(event -> event.getWord())//以key分组统计
.timeWindow(Time.seconds(5))//设置一个窗口函数,模拟数据流动
.sum("count");//计算时间窗口内的词语个数

// 输出数据到目的端
dataStream.print();

// 执行任务操作
env.execute("Flink Streaming Word Count By Java");

public static class WordWithCount {
public String word;
public int count;
//省略getter/setter

keyBy(0)按照WordWithCount类的word字段也就是单词字段进行分组,timeWindow用于设置了一个2S的固定窗口,sum(“count”)聚合函数对单词出现的次数进行求和。
最后,输出的结果是每隔5秒一次性输出5秒内每个单词出现的次数。
如果要运行程序,首先要通过netcat监听9999端口:nc -lk 9999。然后执行main函数后,在监听的端口上输入:

1
i love java

输出如下结果:

1
2
3
2> WordWithCount{word='i', count=1}
6> WordWithCount{word='love', count=1}
2> WordWithCount{word='java', count=1}

个人博客:http://www.geek-make.com