编写第一个Scala程序(Word Count)
注意:由于官方文档过于繁琐,本节内容不根据官方翻译,而是作者本人实践。
使用SBT
由于众所周知的原因,SBT下载依赖特别慢,在此感谢方校长及它的团队。本节不讨论SBT下载。
创建一个Maven项目
pom.xml文件如下:
<properties>
<flink.version>1.2.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
注意:要运行本章节代码,只需要以上依赖足够。
WordCount类
import org.apache.flink.api.scala._
object WordCountScala {
def main(args: Array[String]) {
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// get input data
val text = env.fromElements("To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
val counts = text.flatMap { _.toLowerCase.split("\\W+") }
.map { (_,1) }
.groupBy(0)
.sum(1)
// emit result and print result
counts.print()
}
}
注意:需要在IDE里设置好Scala版本,此处为2.11.x,否则进行.map()操作时会api冲突。
运行
/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/bin/java -Didea.launcher.port=7533 "-Didea.launcher.bin.path=/Applications/IntelliJ IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath "/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/lib/tools.jar:/Users/zehui/IdeaProjects/learnflink/target/classes:/Users/zehui/.ivy2/cache/org.scala-lang/scala-library/jars/scala-library-2.11.8.jar:/Users/zehui/.ivy2/cache/org.scala-lang/scala-reflect/jars/scala-reflect-2.11.8.jar:/Users/zehui/.m2/repository/org/apache/flink/flink-clients_2.11/1.2.0/flink-clients_2.11-1.2.0.jar:/Users/zehui/.m2/repository/org/apache/flink/flink-core/1.2.0/flink-core-1.2.0.jar:/Users/zehui/.m2/repository/org/apache/flink/flink-annotations/1.2.0/flink-annotations-1.2.0.jar:/Users/zehui/.m2/repository/org/apache/flink/flink-metrics-core/1.2.0/flink-metrics-core-1.2.0.jar:/Users/zehui/.m2/repository/org/apache/commons/commons-lang3/3.3.2/commons-lang3-3.3.2.jar:/Users/zehui/.m2/repository/com/esotericsoftware/kryo/kryo/2.24.0/kryo-2.24.0.jar:/Users/zehui/.m2/repository/com/esotericsoftware/minlog/minlog/1.2/minlog-1.2.jar:/Users/zehui/.m2/repository/org/objenesis/objenesis/2.1/objenesis-2.1.jar:/Users/zehui/.m2/repository/commons-collections/commons-collections/3.2.2/commons-collections-3.2.2.jar:/Users/zehui/.m2/repository/org/apache/commons/commons-compress/1.4.1/commons-compress-1.4.1.jar:/Users/zehui/.m2/repository/org/tukaani/xz/1.0/xz-1.0.jar:/Users/zehui/.m2/repository/org/apache/avro/avro/1.7.7/avro-1.7.7.jar:/Users/zehui/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar:/Users/zehui/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar:/Users/zehui/.m2/repository/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar:/Users/zehui/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar:/Users/zehui/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/Users/zehui/.m2/repository/org/apache/flink/flink-runtime_2.11/1.2.0/flink-runtime_2.11-1.2.0.jar:/Users/zehui/.m2/repository/org/apache/flink/flink-shaded-hadoop2/1.2.0/flink-shaded-hadoop2-1.2.0.jar:/Users/zehui/.m2/repository/xmlenc/xmlenc/0.52/xmlenc-0.52.jar:/Users/zehui/.m2/repository/commons-codec/commons-codec/1.4/commons-codec-1.4.jar:/Users/zehui/.m2/repository/commons-io/commons-io/2.4/commons-io-2.4.jar:/Users/zehui/.m2/repository/commons-net/commons-net/3.1/commons-net-3.1.jar:/Users/zehui/.m2/repository/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar:/Users/zehui/.m2/repository/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar:/Users/zehui/.m2/repository/com/sun/jersey/jersey-core/1.9/jersey-core-1.9.jar:/Users/zehui/.m2/repository/commons-el/commons-el/1.0/commons-el-1.0.jar:/Users/zehui/.m2/repository/commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar:/Users/zehui/.m2/repository/com/jamesmurty/utils/java-xmlbuilder/0.4/java-xmlbuilder-0.4.jar:/Users/zehui/.m2/repository/commons-lang/commons-lang/2.6/commons-lang-2.6.jar:/Users/zehui/.m2/repository/commons-configuration/commons-configuration/1.7/commons-configuration-1.7.jar:/Users/zehui/.m2/repository/commons-digester/commons-digester/1.8.1/commons-digester-1.8.1.jar:/Users/zehui/.m2/repository/org/xerial/snappy/snappy-java/1.0.5/snappy-java-1.0.5.jar:/Users/zehui/.m2/repository/com/jcraft/jsch/0.1.42/jsch-0.1.42.jar:/Users/zehui/.m2/repository/commons-beanutils/commons-beanutils-bean-collections/1.8.3/commons-beanutils-bean-collections-1.8.3.jar:/Users/zehui/.m2/repository/commons-daemon/commons-daemon/1.0.13/commons-daemon-1.0.13.jar:/Users/zehui/.m2/repository/javax/xml/bind/jaxb-api/2.2.2/jaxb-api-2.2.2.jar:/Users/zehui/.m2/repository/javax/xml/stream/stax-api/1.0-2/stax-api-1.0-2.jar:/Users/zehui/.m2/repository/javax/activation/activation/1.1/activation-1.1.jar:/Users/zehui/.m2/repository/io/netty/netty-all/4.0.27.Final/netty-all-4.0.27.Final.jar:/Users/zehui/.m2/repository/org/javassist/javassist/3.18.2-GA/javassist-3.18.2-GA.jar:/Users/zehui/.m2/repository/com/data-artisans/flakka-actor_2.11/2.3-custom/flakka-actor_2.11-2.3-custom.jar:/Users/zehui/.m2/repository/com/typesafe/config/1.2.1/config-1.2.1.jar:/Users/zehui/.m2/repository/com/data-artisans/flakka-remote_2.11/2.3-custom/flakka-remote_2.11-2.3-custom.jar:/Users/zehui/.m2/repository/io/netty/netty/3.8.0.Final/netty-3.8.0.Final.jar:/Users/zehui/.m2/repository/com/google/protobuf/protobuf-java/2.5.0/protobuf-java-2.5.0.jar:/Users/zehui/.m2/repository/org/uncommons/maths/uncommons-maths/1.2.2a/uncommons-maths-1.2.2a.jar:/Users/zehui/.m2/repository/com/data-artisans/flakka-slf4j_2.11/2.3-custom/flakka-slf4j_2.11-2.3-custom.jar:/Users/zehui/.m2/repository/org/clapper/grizzled-slf4j_2.11/1.0.2/grizzled-slf4j_2.11-1.0.2.jar:/Users/zehui/.m2/repository/com/github/scopt/scopt_2.11/3.2.0/scopt_2.11-3.2.0.jar:/Users/zehui/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.7.4/jackson-core-2.7.4.jar:/Users/zehui/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.7.4/jackson-databind-2.7.4.jar:/Users/zehui/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.7.0/jackson-annotations-2.7.0.jar:/Users/zehui/.m2/repository/org/apache/zookeeper/zookeeper/3.4.6/zookeeper-3.4.6.jar:/Users/zehui/.m2/repository/jline/jline/0.9.94/jline-0.9.94.jar:/Users/zehui/.m2/repository/junit/junit/3.8.1/junit-3.8.1.jar:/Users/zehui/.m2/repository/com/twitter/chill_2.11/0.7.4/chill_2.11-0.7.4.jar:/Users/zehui/.m2/repository/com/twitter/chill-java/0.7.4/chill-java-0.7.4.jar:/Users/zehui/.m2/repository/org/apache/flink/flink-optimizer_2.11/1.2.0/flink-optimizer_2.11-1.2.0.jar:/Users/zehui/.m2/repository/org/apache/flink/flink-java/1.2.0/flink-java-1.2.0.jar:/Users/zehui/.m2/repository/org/apache/commons/commons-math3/3.5/commons-math3-3.5.jar:/Users/zehui/.m2/repository/commons-cli/commons-cli/1.3.1/commons-cli-1.3.1.jar:/Users/zehui/.m2/repository/org/slf4j/slf4j-api/1.7.7/slf4j-api-1.7.7.jar:/Users/zehui/.m2/repository/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar:/Users/zehui/.m2/repository/org/apache/flink/force-shading/1.2.0/force-shading-1.2.0.jar:/Users/zehui/.m2/repository/org/apache/flink/flink-scala_2.11/1.2.0/flink-scala_2.11-1.2.0.jar:/Users/zehui/.m2/repository/org/scala-lang/scala-reflect/2.11.7/scala-reflect-2.11.7.jar:/Users/zehui/.m2/repository/org/scala-lang/scala-library/2.11.7/scala-library-2.11.7.jar:/Users/zehui/.m2/repository/org/scala-lang/scala-compiler/2.11.7/scala-compiler-2.11.7.jar:/Users/zehui/.m2/repository/org/scala-lang/modules/scala-xml_2.11/1.0.4/scala-xml_2.11-1.0.4.jar:/Users/zehui/.m2/repository/org/scala-lang/modules/scala-parser-combinators_2.11/1.0.4/scala-parser-combinators_2.11-1.0.4.jar:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar" com.intellij.rt.execution.application.AppMain WordCountScala
log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#139297197]
04/25/2017 14:41:06 Job execution switched to status RUNNING.
04/25/2017 14:41:06 CHAIN DataSource (at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:601) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at WordCountScala$.main(WordCountScala.scala:15)) -> Map (Map at WordCountScala$.main(WordCountScala.scala:16)) -> Combine(SUM(1))(1/1) switched to SCHEDULED
04/25/2017 14:41:07 CHAIN DataSource (at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:601) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at WordCountScala$.main(WordCountScala.scala:15)) -> Map (Map at WordCountScala$.main(WordCountScala.scala:16)) -> Combine(SUM(1))(1/1) switched to DEPLOYING
04/25/2017 14:41:07 CHAIN DataSource (at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:601) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at WordCountScala$.main(WordCountScala.scala:15)) -> Map (Map at WordCountScala$.main(WordCountScala.scala:16)) -> Combine(SUM(1))(1/1) switched to RUNNING
04/25/2017 14:41:07 Reduce (SUM(1))(2/8) switched to SCHEDULED
04/25/2017 14:41:07 Reduce (SUM(1))(1/8) switched to SCHEDULED
04/25/2017 14:41:07 Reduce (SUM(1))(3/8) switched to SCHEDULED
04/25/2017 14:41:07 Reduce (SUM(1))(4/8) switched to SCHEDULED
04/25/2017 14:41:07 Reduce (SUM(1))(5/8) switched to SCHEDULED
04/25/2017 14:41:07 Reduce (SUM(1))(6/8) switched to SCHEDULED
04/25/2017 14:41:07 Reduce (SUM(1))(7/8) switched to SCHEDULED
04/25/2017 14:41:07 Reduce (SUM(1))(8/8) switched to SCHEDULED
04/25/2017 14:41:07 Reduce (SUM(1))(2/8) switched to DEPLOYING
04/25/2017 14:41:07 Reduce (SUM(1))(7/8) switched to DEPLOYING
04/25/2017 14:41:07 Reduce (SUM(1))(8/8) switched to DEPLOYING
04/25/2017 14:41:07 Reduce (SUM(1))(6/8) switched to DEPLOYING
04/25/2017 14:41:07 Reduce (SUM(1))(3/8) switched to DEPLOYING
04/25/2017 14:41:07 Reduce (SUM(1))(5/8) switched to DEPLOYING
04/25/2017 14:41:07 Reduce (SUM(1))(4/8) switched to DEPLOYING
04/25/2017 14:41:07 Reduce (SUM(1))(1/8) switched to DEPLOYING
04/25/2017 14:41:07 CHAIN DataSource (at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:601) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at WordCountScala$.main(WordCountScala.scala:15)) -> Map (Map at WordCountScala$.main(WordCountScala.scala:16)) -> Combine(SUM(1))(1/1) switched to FINISHED
04/25/2017 14:41:07 Reduce (SUM(1))(2/8) switched to RUNNING
04/25/2017 14:41:07 Reduce (SUM(1))(8/8) switched to RUNNING
04/25/2017 14:41:07 Reduce (SUM(1))(1/8) switched to RUNNING
04/25/2017 14:41:07 Reduce (SUM(1))(3/8) switched to RUNNING
04/25/2017 14:41:07 Reduce (SUM(1))(4/8) switched to RUNNING
04/25/2017 14:41:07 Reduce (SUM(1))(5/8) switched to RUNNING
04/25/2017 14:41:07 Reduce (SUM(1))(6/8) switched to RUNNING
04/25/2017 14:41:07 Reduce (SUM(1))(7/8) switched to RUNNING
04/25/2017 14:41:07 DataSink (collect())(7/8) switched to SCHEDULED
04/25/2017 14:41:07 DataSink (collect())(7/8) switched to DEPLOYING
04/25/2017 14:41:07 DataSink (collect())(7/8) switched to RUNNING
04/25/2017 14:41:07 DataSink (collect())(3/8) switched to SCHEDULED
04/25/2017 14:41:07 DataSink (collect())(3/8) switched to DEPLOYING
04/25/2017 14:41:07 DataSink (collect())(3/8) switched to RUNNING
04/25/2017 14:41:07 DataSink (collect())(6/8) switched to SCHEDULED
04/25/2017 14:41:07 DataSink (collect())(6/8) switched to DEPLOYING
04/25/2017 14:41:07 DataSink (collect())(6/8) switched to RUNNING
04/25/2017 14:41:07 DataSink (collect())(4/8) switched to SCHEDULED
04/25/2017 14:41:07 DataSink (collect())(4/8) switched to DEPLOYING
04/25/2017 14:41:07 DataSink (collect())(4/8) switched to RUNNING
04/25/2017 14:41:07 DataSink (collect())(1/8) switched to SCHEDULED
04/25/2017 14:41:07 DataSink (collect())(1/8) switched to DEPLOYING
04/25/2017 14:41:07 DataSink (collect())(1/8) switched to RUNNING
04/25/2017 14:41:07 DataSink (collect())(8/8) switched to SCHEDULED
04/25/2017 14:41:07 DataSink (collect())(8/8) switched to DEPLOYING
04/25/2017 14:41:07 DataSink (collect())(8/8) switched to RUNNING
04/25/2017 14:41:07 DataSink (collect())(5/8) switched to SCHEDULED
04/25/2017 14:41:07 DataSink (collect())(5/8) switched to DEPLOYING
04/25/2017 14:41:07 DataSink (collect())(5/8) switched to RUNNING
04/25/2017 14:41:08 DataSink (collect())(2/8) switched to SCHEDULED
04/25/2017 14:41:08 DataSink (collect())(2/8) switched to DEPLOYING
04/25/2017 14:41:08 DataSink (collect())(2/8) switched to RUNNING
04/25/2017 14:41:08 Reduce (SUM(1))(4/8) switched to FINISHED
04/25/2017 14:41:08 DataSink (collect())(2/8) switched to FINISHED
04/25/2017 14:41:08 Reduce (SUM(1))(6/8) switched to FINISHED
04/25/2017 14:41:08 DataSink (collect())(6/8) switched to FINISHED
04/25/2017 14:41:08 Reduce (SUM(1))(7/8) switched to FINISHED
04/25/2017 14:41:08 Reduce (SUM(1))(2/8) switched to FINISHED
04/25/2017 14:41:08 DataSink (collect())(4/8) switched to FINISHED
04/25/2017 14:41:08 DataSink (collect())(1/8) switched to FINISHED
04/25/2017 14:41:08 DataSink (collect())(5/8) switched to FINISHED
04/25/2017 14:41:08 Reduce (SUM(1))(1/8) switched to FINISHED
04/25/2017 14:41:08 DataSink (collect())(7/8) switched to FINISHED
04/25/2017 14:41:08 DataSink (collect())(3/8) switched to FINISHED
04/25/2017 14:41:08 Reduce (SUM(1))(3/8) switched to FINISHED
04/25/2017 14:41:08 Reduce (SUM(1))(8/8) switched to FINISHED
04/25/2017 14:41:08 DataSink (collect())(8/8) switched to FINISHED
04/25/2017 14:41:08 Reduce (SUM(1))(5/8) switched to FINISHED
04/25/2017 14:41:08 Job execution switched to status FINISHED.
(is,1)
(a,1)
(in,1)
(mind,1)
(or,2)
(against,1)
(arms,1)
(not,1)
(sea,1)
(the,3)
(troubles,1)
(fortune,1)
(take,1)
(to,4)
(and,1)
(arrows,1)
(be,2)
(nobler,1)
(of,2)
(slings,1)
(suffer,1)
(outrageous,1)
(tis,1)
(whether,1)
(question,1)
(that,1)
Process finished with exit code 0
结果正确。
至此,Java、Scala版本的示例全部结束。