构建第一个Flink应用

编写Flink程序

开发环境准备

JDK 1.8

➜  [/Users/russ/workspace] java -version
java version "1.8.0_191"
Java(TM) SE Runtime Environment (build 1.8.0_191-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)

maven环境

➜  [/Users/russ/workspace] mvn -version
Apache Maven 3.6.0 (97c98ec64a1fdfee7767ce5ffb20918da4f719f3; 2018-10-25T02:41:47+08:00)
Maven home: /Users/russ/devtools/apache-maven-3.6.0
Java version: 1.8.0_191, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_191.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"

IDE推荐使用ItelliJ IDEA。

创建maven项目

使用maven模版创建项目

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.6.1 \
    -DgroupId=com.xzy.demo \
    -DartifactId=flink-hello \
    -Dversion=0.1 \
    -Dpackage=com.xzy.demo \
    -DinteractiveMode=false

关于mvn archetype:generate的相关参数,含义如下:

项目相关参数:

参数 含义
groupId 当前应用程序隶属的Group的ID
artifactId 当前应用程序的ID
package 代码生成时使用的根包的名字,如果没有给出,默认使用archetypeGroupId

原型有关参数:

参数 含义
archetypeGroupId 原型(archetype)的Group ID
archetypeArtifactId 原型(archetype)ID
archetypeVersion 原型(archetype)版本
archetypeRepository 包含原型(archetype)的资源库
archetypeCatalog archetype分类,这里按位置分类有:
‘local’ 本地,通常是本地仓库的archetype-catalog.xml文件
‘remote’ 远程,是maven的中央仓库
file://…’ 直接指定本地文件位置archetype-catalog.xml
http://…’ or ‘https://…’ 网络上的文件位置 archetype-catalog.xml
‘internal’
默认值是remote,local
archetypeVersion 原型(archetype)版本

生成的代码如下:

➜  [/Users/russ/workspace] tree flink-hello
flink-hello
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── xzy
        │           └── demo
        │               ├── BatchJob.java
        │               └── StreamingJob.java
        └── resources
            └── log4j.properties

编写第一个Flink程序

代码可以参照Flink的官方demo。

链接:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/datastream_api.html

以下是代码示例

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {
        //获得执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //定义数据源,这里是一个本地的9999端口的sock数据源
        //对数据源做分组、开窗、聚合操作
        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);
        //打印流数据 便于调试
        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }
}

在9999端口上启动netcat,准备输入

nc -l 9999

运行WindowWordCount的main方法。

然后输入一些单词,回车,再输入新一行的单词。这些输入将作为示例程序的输入。如果要使得某个单词的计数结果大于1,请在5秒钟内重复输入相同的单词(如果5秒钟输入相同单词对你来说太快,请把示例程序中的窗口大小从5秒调大)。

flink_hello