基础API
Flink程序可以执行分布式集合转换(例如,filtering, mapping, updating state, joining, grouping, defining windows, aggregating)。集合最初是从资源创建的(例如,通过从文件读取,kafka主题,或从本地,内存中集合)。通过接收器返回结果,例如将数据写入(分布式)文件或标准输出(例如,命令行终端)。 Flink程序以各种上下文运行,独立或嵌入其他程序中。可以执行在本地JVM或许多机器的集群上。
根据数据源的类型,即有界或无界源,用户可以编写批处理程序或流式程序,其中DataSet API用于批处理,DataStream API用于流式传输。本指南将介绍两种API通用的基本概念,但请参阅“流数据指南和批数据指南”,了解有关使用每个API编写程序的具体信息。
注意:当显示如何使用API的实际示例时,我们将使用StreamingExecutionEnvironment和DataStream API。这些概念在DataSet API中完全相同,只需由ExecutionEnvironment和DataSet替换。
DataSet 和 DataStream
Flink用特殊的类DataSet和DataStream来表示程序中的数据。 可以将它们视为不可变的数据集合,它们可以包含重复的数据。 在DataSet中,数据是有限的,而对于DataStream,元素的数量可以是无限制的。
这些集合在一些关键方面与常规Java集合不同。 首先,它们是不可变的,这意味着一旦创建它们就不能添加或删除元素。 你也不能简单地检查里面的元素。
可以在Flink程序中添加一个源来创建一个集合,并且通过使用API方法(如map,filter等)来转换它们,从这些集合中导出新集合。
Flink 程序解析
Flink程序程序看起来像转换数据集合的常规程序。 每个程序由相同的基本部分组成:
- 获取执行环境
- 加载/创建初始数据
- 指定对此数据的转换
- 指定计算结果的位置
- 触发程序执行
接下来是具体的程序剖析。
注意:以下是Flink的Java和Scala API位置
Java | Scala | |
---|---|---|
DataSet API | org.apache.flink.api.java | org.apache.flink.api.scala |
DataStream API | org.apache.flink.streaming.api | org.apache.flink.streaming.api.scala |
Flink程序需要StreamExecutionEnvironment来执行,获取方法有如下几种:
Java:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
Scala:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
通常情况下,只需要使用getExecutionEnvironment(),因为这将根据上下文执行正确的操作:如果正在IDE中执行程序或作为常规Java程序,它将创建一个本地执行环境。 如果从程序中创建了一个JAR文件,并通过命令行调用它,Flink集群管理器将执行main方法,而getExecutionEnvironment()将返回一个在集群上的执行环境。
为了指定数据源,执行环境有几种从文件中读取的方法:可以逐行获取数据,如CSV文件,或使用完全自定义的数据输入格式。 要只读一个文本文件作为一系列行,可以使用:
Java:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/to/file");
Scala:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val text: DataStream[String] = env.readTextFile("file:///path/to/file")
这将返回一个DataStream,然后可以应用转换来创建新的数据流。
可以通过使用转换函数调用DataStream上的方法来应用转换。 例如,Map变换如下所示:
Java:
DataStream<String> input = ...;
DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) {
return Integer.parseInt(value);
}
});
Scala:
val input: DataSet[String] = ...
val mapped = input.map { x => x.toInt }
这里返回的是一个经过Map转换的DataStream,每一个String都被转换成了Integer。
一旦你有一个包含最终结果的DataStream,可以通过创建一个sink来将它写入外部系统。 这些只是创建接收器的一些示例方法:
writeAsText(String path)
print()
一旦指定了完整的程序,需要通过调用StreamExecutionEnvironment上的execute()来触发程序执行。 根据ExecutionEnvironment的类型,执行将在本地机器上触发或提交程序以在集群上执行。
execute()方法返回一个JobExecutionResult,它包含执行时间和累加器结果。
指定Key
某些变换操作(join, coGroup, keyBy, groupBy)需要指定Key,另外一些操作(Reduce, GroupReduce, Aggregate, Windows)可以根据Key做聚合。
在DataSet中指定key:
DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/*define key here*/)
.reduceGroup(/*do something*/);
在DataStream中指定key:
DataStream<...> input = // [...]
DataStream<...> windowed = input
.keyBy(/*define key here*/)
.window(/*window specification*/);
Flink 中的数据结构并不是必须按照<key,value>的结构存储,所以不必刻意将数据转化成这样的结构。
为Tuple指定Key
最简单的情况是在元组的一个或多个字段上分组,根据第一个字段分组:
Java:
DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
Scala:
val input: DataStream[(Int, String, Long)] = // [...]
val keyed = input.keyBy(0)
根据第一、第二个字段分组:
Java:
DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
Scala:
val input: DataSet[(Int, String, Long)] = // [...]
val grouped = input.groupBy(0,1)
如果是嵌套Tuple,比如下面这种:
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
使用keyBy(0)的话,会直接指定Tuple2作为key。要想使用Tuple2中的某一项作为key,需要使用下列方法。
使用Field表达式定义Key
可以使用Field表达式来定义key,比如下面的WC这个POJO,有两个field分别是word和count,为了根据word进行分组,只需要将它的name值传递给keybBy()函数:
Java::
// some ordinary POJO (Plain old Java Object)
public class WC {
public String word;
public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
/*
Java Field表达式语法:
1.按其字段名称选择POJO字段。 例如,“用户”是指POJO类型的“用户”字段。
2.通过字段名称或字段索引选择元组字段。 例如,“f0”和“5”分别指Java元组类型的第一和第六字段。
3.可以在POJO和元组中选择嵌套字段。 例如,“user.zip”是指存储在POJO类型的“用户”字段中的POJO的“zip”字段。
支持POJO和Tuples的任意嵌套和混合,如“f1.user.zip”或“user.f3.1.zip”。
4.可以使用“*”通配符表达式选择完整类型。 这也适用于不是元组或POJO类型的类型。
*/
Scala:
// some ordinary POJO (Plain old Scala Object)
class WC(var word: String, var count: Int) {
def this() { this("", 0L) }
}
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)
// or, as a case class, which is less typing
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)
/*
Scala Field表达式语法:
1.按其字段名称选择POJO字段。 例如,“user”是指POJO类型的“user”字段。
2.通过其偏移字段名称或索引选择元组字段。 例如,“_1”和“_5”分别指Scala元组类型的第一个和第六个字段。
3.可以在POJO和元组中选择嵌套字段。 例如,“user.zip”是指存储在POJO类型的“user”字段中的POJO的“zip”字段。
支持POJO和Tuples的任意嵌套和混合,例如“_2.user.zip”或“user._4.1.zip”。
4.可以使用“_”通配符表达式选择完整类型。 这也适用于不是元组或POJO类型的类型。
*/
Field表达式举例
Java:
public static class WC {
public ComplexNestedClass complex; //nested POJO
private int count;
// getter / setter for private field (count)
public int getCount() {
return count;
}
public void setCount(int c) {
this.count = c;
}
}
public static class ComplexNestedClass {
public Integer someNumber;
public float someFloat;
public Tuple3<Long, Long, String> word;
public IntWritable hadoopCitizen;
}
/*
"count": 类WC中的count字段.
"complex": 递归选择POJO类型ComplexNestedClass的字段中的所有字段.
"complex.word.f2": 选择嵌套的Tuple3的最后一个字段.
"complex.hadoopCitizen": 选择hadoopCitizen.
*/
Scala:
class WC(var complex: ComplexNestedClass, var count: Int) {
def this() { this(null, 0) }
}
class ComplexNestedClass(
var someNumber: Int,
someFloat: Float,
word: (Long, Long, String),
hadoopCitizen: IntWritable) {
def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
}
/*
"complex.word._3": 选择Tuple中的最后一个字段.
*/
使用Key Selector函数选择Key
Java:
// some ordinary POJO
public class WC {public String word; public int count;}
DataStream<WC> words = // [...]
KeyedStream<WC> kyed = words
.keyBy(new KeySelector<WC, String>() {
public String getKey(WC wc) { return wc.word; }
});
Scala:
// some ordinary case class
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val keyed = words.keyBy( _.word )
指定转换函数
大多数情况下的转换操作都需要用户自定义,下面给出Java和Scala的几种方法。
针对Java
实现接口
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
});
data.map(new MyMapFunction());
匿名类
data.map(new MapFunction<String, Integer> () {
public Integer map(String value) { return Integer.parseInt(value); }
});
Java 8 Lambda表达式
data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);
完整函数
//定义函数
class MyMapFunction extends RichMapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
});
//使用
data.map(new MyMapFunction());
//完整函数也可以作为匿名类使用
data.map (new RichMapFunction<String, Integer>() {
public Integer map(String value) { return Integer.parseInt(value); }
});
针对Scala
Lambda
example 1:
val data: DataSet[String] = // [...]
data.filter { _.startsWith("http://") }
example 2:
val data: DataSet[Int] = // [...]
data.reduce { (i1,i2) => i1 + i2 }
// or
data.reduce { _ + _ }
完整函数
//lambda原始代码
data.map { x => x.toInt }
//使用完整函数
class MyMapFunction extends RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
})
data.map(new MyMapFunction())
//使用匿名类
data.map (new RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
})
完整函数除了用户定义的功能(map,reduce等)外,还提供了四种方法:open,close,getRuntimeContext和setRuntimeContext。 这些功能可用于参数化功能(参见传递函数),创建和完成本地状态,访问广播变量(请参阅广播变量)以及访问运行时信息(如累加器和计数器)以及进行迭代(参见迭代)。
支持的数据类型
- Java Tuples/Scala Case/Classes
- Java POJOs
- Primitive Types(Integer,String,Double)
- Regular Classes
- Values(org.apache.flinktypes.Value)
- Hadoop Writables
- Special Types
计数器
略