Java 8
Java 8引入了一些新的语言特性,旨在更快更清晰的编码。 最重要的功能是所谓的“Lambda表达式”,Java 8为功能编程打开了大门。 Lambda表达式允许以直接的方式实现和传递函数,而不必声明额外的(匿名)类。
Flink的最新版本支持对Java API的所有运算符使用Lambda表达式。 本文档显示了如何使用Lambda表达式。
示例
以下示例说明如何实现一个简单的 map() 函数,它使用Lambda表达式对其输入进行平方。 不需要声明map()函数的输入i和输出参数的类型,因为它们由Java 8编译器推断出来。
env.fromElements(1, 2, 3)
// returns the squared i
.map(i -> i*i)
.print();
接下来的两个示例显示了使用Collector进行输出函数的不同实现。 函数(如flatMap())需要为Collector定义一个输出类型(在这种情况下为String)才能保证类型安全。 如果Collector类型不能从周围的上下文中推断出来,则需要手动在Lambda表达式的参数列表中声明。否则输出将被视为类型Object,这可能会导致异常。
DataSet<Integer> input = env.fromElements(1, 2, 3);
// collector type must be declared
input.flatMap((Integer number, Collector<String> out) -> {
StringBuilder builder = new StringBuilder();
for(int i = 0; i < number; i++) {
builder.append("a");
out.collect(builder.toString());
}
})
// returns (on separate lines) "a", "a", "aa", "a", "aa", "aaa"
.print();
DataSet<Integer> input = env.fromElements(1, 2, 3);
// collector type must not be declared, it is inferred from the type of the dataset
DataSet<String> manyALetters = input.flatMap((number, out) -> {
StringBuilder builder = new StringBuilder();
for(int i = 0; i < number; i++) {
builder.append("a");
out.collect(builder.toString());
}
});
// returns (on separate lines) "a", "a", "aa", "a", "aa", "aaa"
manyALetters.print();
以下代码展示如何使用Lambda表达式实现WordCount:
DataSet<String> input = env.fromElements("Please count", "the words", "but not this");
// filter out strings that contain "not"
input.filter(line -> !line.contains("not"))
// split each line by space
.map(line -> line.split(" "))
// emit a pair <word,1> for each array element
.flatMap((String[] wordArray, Collector<Tuple2<String, Integer>> out)
-> Arrays.stream(wordArray).forEach(t -> out.collect(new Tuple2<>(t, 1)))
)
// group and sum up
.groupBy(0).sum(1)
// print
.print();
注:剩余文档介绍如何让Eclipse支持代码转换为Lambda表达式,及如何在Eclipse中编译、Debug Flink程序,略。