DataStream API概述

Flink中的DataStream程序是对数据流进行转换的常规操作(例如,过滤,更新状态,定义窗口,聚合)。

数据流最初是从各种来源(例如,消息队列,套接字流,文件)创建的。 通过接收器返回结果,例如将数据写入文件或标准输出(例如命令行终端)。

Flink程序以各种上下文运行,独立或嵌入其他程序中。 Flink程序运行在本地JVM或许多机器的集群上。

示例代码

流式处理WordCount的代码:

Java:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}

Scala:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WindowWordCount {
  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)

    counts.print

    env.execute("Window Stream WordCount")
  }
}

运行方法:

开一个netcat窗口,并输入一些字符:

nc -lk 9999

正在运行的WordCount程序会接收到端口9999的消息并计算。

数据流转换

注:由于GitBook表格中不能贴代码,所以以下内容没有代码示例。省略内容请参考:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html

转换 描述
Map DataStream->DataStream,将一个元素转化成另一个元素
FlatMap DataStream->DataStream,可以将一个元素转化为一个或多个元素,例如,可以将句子转化成单个单词
Filter DataStream->DataStream,过滤
KeyBy DataStream->KeyedStream,将数据按key分区
Reduce KeyedStream->DataStream,合并数据
Fold KeyedStream->DataStream,另一种迭代函数
Aggragations KeyedStream->DataStream,各种聚合函数
Window KeyedStream->WindowedStream,KeyedStream数据类型时间窗口内的计算
WindowAll DataStream->AllWindowedStream,对所有数据按窗口分组
Window Apply WindowedStream/AllWindowedStream->DataStream,可以在窗口内定义迭代函数
Window Reduce WindowedStream->DataStream,对WindowedStream类型进行Reduce
Window Fold WindowedStream->DataStream,窗口内的增量迭代
Aggregations on windows WindowedStream->DataStream,窗口内的聚合计算:sum,min,max...
Union DataStream * N ->DataStream,多个DataStream合并
Window Join DataStream,DataStream->DataStream,类似db的join
Window GoGroup DataStream,DataStream->DataStream,类似db的group
Connect DataStream,DataStream->ConnectedStreams 合并流
CoMap,CoFlatMap ConnectedStreams中的Map和FlatMap
Split DataStream->SplitStream,根据条件切割流
Select SplitStream->DataStream,从分割出来的流中选择
Iterate 迭代反馈
Extract TimeStamps 从流数据中抽取时间戳

物理分区

Flink提供了一系列分区API,如下:

转换 描述
Custom partitioning 用户自定义分区
Random partitioning 随机均匀分布
Rebalancing (Round-Robin partitioning) 分区元素循环,为每个分区创建相等的负载。 在数据偏移的情况下用于性能优化。
Rescaling 按照上下游操作来自动分配操作,如下图,上游有两个操作,下游有6个操作,那么会自动平衡分配下游的操作
Broadcasting 广播到每一个分区

任务链和资源组

链接两个后续转换意味着将它们定位在同一个线程中,以获得更好的性能。 Flink会按照默认链接方式链接两个操作。可以通过API对链接进行粒度的控制。

如果要在整个作业中禁用链接,请使用StreamExecutionEnvironment.disableOperatorChaining()。 对于更细粒度的控制,可以使用以下功能。 请注意,这些函数只能在DataStream转换后才可以使用,因为它们的操作对象是之前的转换。 例如,您可以使用someStream.map(...).startNewChain(),但不能使用someStream.startNewChain()。

转换 描述
start new chain someStream.filter(...).map(...).startNewChain().map(...);
disable chaining someStream.map(...).disableChaining();
set slot sharing group someStream.filter(...).slotSharingGroup("name");

资源组是Flink中的一个插槽,请参见插槽。

Flink Data Sources

数据源是Flink读取数据的来源。你可以通过StreamExecutionEnvironment.addSource(sourceFunction) 获取一个数据源。Flink读取数据源的方式有如下几种:

基于文件:

  • readTextFile(path)
  • readFile(fileInputFormat, path)
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)

基于Socket:

  • socketTextStream

基于集合:

  • fromCollection(Collection)
  • fromCollection(Iterator, Class)
  • fromElements(T ...)
  • fromParallelCollection(SplittableIterator, Class)
  • generateSequence(from, to)

常用:

  • addSource 例如获取kafka数据源:addSource(new FlinkKafkaConsumer08<>(...)) 在Connector中会有更多介绍。

Flink Data Sinks

Flink提供如下方式保存数据:

  • writeAsText() / TextOutputFormat
  • writeAsCsv(...) / CsvOutputFormat
  • print() / printToErr()
  • writeUsingOutputFormat() / FileOutputFormat
  • writeToSocket
  • addSink

results matching ""

    No results matching ""