编写第一个Java程序(Word Count)
注意:由于官方文档过于繁琐,本节内容不根据官方翻译,而是作者本人实践。
新建一个Maven项目
pom.xml文件主要配置:
<properties>
<flink.version>1.2.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
注意:要运行本章节代码,只需要以上依赖足够。
WordCount类
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
public class WordCount {
public static void main(String[] args) throws Exception {
// 初始化Flink运行环境
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 测试数据
DataSet<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);
// 执行WordCount
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new LineSplitter())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// 打印结果
counts.print();
}
}
其中用到一个类叫做LineSplitter,它实现了FlatMapFunction接口。代码如下:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// 统一大小写,并切割
String[] tokens = value.toLowerCase().split("\\W+");
// 构造成(word,1)的格式
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
运行
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ExecutionEnvironment).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1749327851]
04/25/2017 11:13:41 Job execution switched to status RUNNING.
04/25/2017 11:13:41 CHAIN DataSource (at main(WordCount.java:13) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:22)) -> Combine(SUM(1), at main(WordCount.java:25)(1/1) switched to SCHEDULED
04/25/2017 11:13:41 CHAIN DataSource (at main(WordCount.java:13) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:22)) -> Combine(SUM(1), at main(WordCount.java:25)(1/1) switched to DEPLOYING
04/25/2017 11:13:41 CHAIN DataSource (at main(WordCount.java:13) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:22)) -> Combine(SUM(1), at main(WordCount.java:25)(1/1) switched to RUNNING
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(2/8) switched to SCHEDULED
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(1/8) switched to SCHEDULED
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(3/8) switched to SCHEDULED
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(4/8) switched to SCHEDULED
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(2/8) switched to DEPLOYING
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(5/8) switched to SCHEDULED
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(4/8) switched to DEPLOYING
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(5/8) switched to DEPLOYING
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(6/8) switched to SCHEDULED
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(3/8) switched to DEPLOYING
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(6/8) switched to DEPLOYING
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(1/8) switched to DEPLOYING
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(7/8) switched to SCHEDULED
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(7/8) switched to DEPLOYING
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(8/8) switched to SCHEDULED
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(8/8) switched to DEPLOYING
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(2/8) switched to RUNNING
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(5/8) switched to RUNNING
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(1/8) switched to RUNNING
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(4/8) switched to RUNNING
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(7/8) switched to RUNNING
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(8/8) switched to RUNNING
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(3/8) switched to RUNNING
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(6/8) switched to RUNNING
04/25/2017 11:13:41 CHAIN DataSource (at main(WordCount.java:13) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:22)) -> Combine(SUM(1), at main(WordCount.java:25)(1/1) switched to FINISHED
04/25/2017 11:13:41 DataSink (collect())(1/8) switched to SCHEDULED
04/25/2017 11:13:41 DataSink (collect())(1/8) switched to DEPLOYING
04/25/2017 11:13:41 DataSink (collect())(1/8) switched to RUNNING
04/25/2017 11:13:41 DataSink (collect())(6/8) switched to SCHEDULED
04/25/2017 11:13:41 DataSink (collect())(6/8) switched to DEPLOYING
04/25/2017 11:13:41 DataSink (collect())(6/8) switched to RUNNING
04/25/2017 11:13:41 DataSink (collect())(3/8) switched to SCHEDULED
04/25/2017 11:13:41 DataSink (collect())(3/8) switched to DEPLOYING
04/25/2017 11:13:41 DataSink (collect())(3/8) switched to RUNNING
04/25/2017 11:13:41 DataSink (collect())(8/8) switched to SCHEDULED
04/25/2017 11:13:41 DataSink (collect())(8/8) switched to DEPLOYING
04/25/2017 11:13:41 DataSink (collect())(8/8) switched to RUNNING
04/25/2017 11:13:41 DataSink (collect())(7/8) switched to SCHEDULED
04/25/2017 11:13:41 DataSink (collect())(7/8) switched to DEPLOYING
04/25/2017 11:13:41 DataSink (collect())(7/8) switched to RUNNING
04/25/2017 11:13:41 DataSink (collect())(4/8) switched to SCHEDULED
04/25/2017 11:13:41 DataSink (collect())(4/8) switched to DEPLOYING
04/25/2017 11:13:41 DataSink (collect())(4/8) switched to RUNNING
04/25/2017 11:13:41 DataSink (collect())(2/8) switched to SCHEDULED
04/25/2017 11:13:41 DataSink (collect())(2/8) switched to DEPLOYING
04/25/2017 11:13:41 DataSink (collect())(2/8) switched to RUNNING
04/25/2017 11:13:41 DataSink (collect())(5/8) switched to SCHEDULED
04/25/2017 11:13:41 DataSink (collect())(5/8) switched to DEPLOYING
04/25/2017 11:13:41 DataSink (collect())(5/8) switched to RUNNING
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(8/8) switched to FINISHED
04/25/2017 11:13:41 DataSink (collect())(3/8) switched to FINISHED
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(3/8) switched to FINISHED
04/25/2017 11:13:41 DataSink (collect())(8/8) switched to FINISHED
04/25/2017 11:13:41 DataSink (collect())(7/8) switched to FINISHED
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(1/8) switched to FINISHED
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(7/8) switched to FINISHED
04/25/2017 11:13:41 DataSink (collect())(2/8) switched to FINISHED
04/25/2017 11:13:41 DataSink (collect())(1/8) switched to FINISHED
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(2/8) switched to FINISHED
04/25/2017 11:13:41 DataSink (collect())(5/8) switched to FINISHED
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(5/8) switched to FINISHED
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(6/8) switched to FINISHED
04/25/2017 11:13:41 DataSink (collect())(6/8) switched to FINISHED
04/25/2017 11:13:41 Reduce (SUM(1), at main(WordCount.java:25)(4/8) switched to FINISHED
04/25/2017 11:13:41 DataSink (collect())(4/8) switched to FINISHED
04/25/2017 11:13:41 Job execution switched to status FINISHED.
(is,1)
(a,1)
(in,1)
(mind,1)
(or,2)
(against,1)
(arms,1)
(not,1)
(sea,1)
(the,3)
(troubles,1)
(fortune,1)
(take,1)
(to,4)
(and,1)
(arrows,1)
(be,2)
(nobler,1)
(of,2)
(slings,1)
(suffer,1)
(outrageous,1)
(tis,1)
(whether,1)
(question,1)
(that,1)
Process finished with exit code 0
由于没有设置日志级别,日志较多,不过可以清楚的看到Flink的Reduce和DataSink过程,最后打印出了运算结果。