在學習技術時,總會有一個簡單程序 Demo 帶著我們入門,所以參考著官網例子,帶大家快速熟悉 Flink 的 Hello World~
說明一下,項目運行的環境如下:
OS : Mac
Flink Version : 1.9
IDE : IDEA
Java Version : 1.8
下面來講下關于環境准備,如果是 Windows 的用戶,請參照每個步驟,找到適應自己的安裝 or 啓動方法。
1 環境准備
首先我們默認已經安裝了 Jdk 1.8 和編碼工具 IDEA,下面來講如何安裝 Flink 和建立腳手架。下面展示的項目代碼已經放入了 Github,可以下載進行本地運行
1.1 安裝 Flink
$ brew install apache-flink
檢查安裝是否成功以及版本號
$ flink --version
Version: 1.9.0, Commit ID: 9c32ed9
接著以單機集群模式啓動 Flink
$ sh /usr/local/Cellar/apache-flink/1.9.0/libexec/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host yejingqideMBP-c510.
Starting taskexecutor daemon on host yejingqideMBP-c510.
然後訪問 localhost:8081 監控界面(1.9 版本更新了 UI):
1.2 創建項目
這裏推薦的是使用 maven 進行構建,在命令行中輸入如下內容(# 號後面是說明,請不要輸入):
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \ # flink 的 group.id
-DarchetypeArtifactId=flink-quickstart-java \ # flink 的 artifact.id
-DarchetypeVersion=1.9.0 \ # flink 的 version,以上三個請不要修改,按照默認即可
-DgroupId=wiki-edits \ # 項目的 group.id
-DartifactId=wiki-edits \ # 項目的 artifact.id
-Dversion=0.1 \ # 項目的 version.id
-Dpackage=wikiedits \ # 項目的基礎包名
-DinteractiveMode=false # 是否需要和用戶交互以獲得輸入,由于上面已經自己寫了項目的參數,所以禁用了。反之請刪掉 上面項目的配置,將交互模式設爲 true
如果按照官方的例子填寫,那麽你將得到如下的項目結構:
$ tree wiki-edits
wiki-edits/
├── pom.xml
└── src
└── main
├── java
│ └── wikiedits
│ ├── BatchJob.java
│ └── StreamingJob.java
└── resources
└── log4j.properties
如果是自己自定義的,包結構會不一致,但是通過腳手架創立的,pom 文件中預置的依賴都將一致,引入了 Flink 基礎開發相關的 API,然後通過 IDEA 打開該項目目錄,就可以開始我們的 Hello world。
2 開始項目
首先交代一下待會的流程,編寫程序代碼,啓動 netcat 命令來監聽 9000 端口,啓動或提交 Flink 程序,最後監聽日志輸出信息。
2.1 項目代碼
Demo 的代碼作用是監聽 netcat 輸入的字符,然後進行聚合操作,最後輸出字符統計
public class SocketTextStreamWordCount {
public static void main(String[] args) throws Exception {
String hostName = "127.0.0.1";
int port = 9000;
// 設置運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 獲取數據源
DataStreamSource<String> stream = env.socketTextStream(hostName, port);
// 計數
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream
.flatMap((new LineSplitter()))
.keyBy(0)
.sum(1);
// 輸出
sum.print();
// 提交任務
env.execute("Java Word from SocketTextStream Example");
}
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] tokens = s.toLowerCase().split("\W+");
for (String token : tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}
簡單說明一下,上面出現了 SocketTextStream 套接字字符 數據源(Source),接著是 算子(Operator): FlatMap(一個輸入源,可以輸出零個或多個結果)、KeyBy(按照某字段或者 tuple 元組中某個下標進行分類) 和 sum(跟翻譯一樣,就是進行聚合彙總) ,最後輸出
2.2 開啓 tcp 長鏈接
爲了模擬流數據,我們造的場景是不斷往 9000 端口輸入字符,Flink 程序添加的數據源是 SocketTextStream (套接字字符流)。
在你的終端中輸入以下命令
$ nc -l 9000
有關 netcat 命令的用法,請看參考資料第二條,這裏的作用就是打開 TCP 長鏈接,監聽 9000 端口
2.3 啓動 Flink 程序
剛才第一個步驟中,已經編輯好了程序代碼,第二個步驟也已經啓動了一個 TCP 客戶端,啓動 Flink 程序有兩種方法:
2.3.1 本地調試
使用 IDEA 的好處很多,代碼補全,語法檢查和快捷鍵之類的。我經常使用的調試方法就是添加一個 psvm 的 main 方法,在裏面寫執行代碼,最後點擊綠色的啓動按鈕~
如果不需要調試,想直接看結果,選擇第一個 Run,但有時不確定代碼執行過程和出錯的具體原因,可以通過第二個選項 Debug 進行調試。
這是本地開發經常使用的方法,進行結果的驗證。
2.3.2 提交到 JobManager
前面我們啓動的是單機集群版,啓動了一個 JobManager 和 TaskWorker,打開的 localhost:8081 就是 JobManager 的監控面板,所以我們要通過下面的方式,將 Flink 程序提交到 JobManager。
這裏教一個簡單的方法,我們通過 mvn clean package 進行打包後,可以在 IDEA 集成的終端標簽欄下提交我們的程序:
由于每個人的絕對路徑都不一樣,所以我們通過 IDEA 的終端,它會自動定位到項目的路徑,然後執行時填寫相對路徑的 jar 包名字即可
$ flink run -c cn.sevenyuan.wordcount.SocketTextStreamWordCount target/flink-quick-start-1.0-SNAPSHOT.jar
-c 參數是指定運行的主程序入口,接著我們去查看監控面板,可以發現任務狀態已經處于監控中:
頂部信息講的是運行程序名字、時間、時間線、配置參數等信息,底下 Name 一欄,說明該程序邏輯步驟(讀取數據源,進行映射處理,使用 keyBy 和聚合運算,最後輸出到【打印 sink】)
2.4 輸入數據 & 驗證結果
前面驗證了程序正常啓動,接下來我們來驗證輸入和輸出
先來監聽輸出,進入 Flink 的日志目錄,接著通過 tail 命令監聽任務執行者 TaskWorkder(默認會啓動一個任務執行者,所以編碼爲 0) 的日志輸出
$ usr/local/Cellar/apache-flink/1.9.0/libexec/log
$ tail -400f flink*-taskexecutor-0*.out
接著,在 nc -l 9000 對應的終端窗口中輸入如下數據:
$ nc -l 9000
hello world
test world
test hello
hello my world
最後就能夠看到以下輸出結果:
(hello,1)
(world,1)
(test,1)
(world,2)
(test,2)
(hello,2)
(hello,3)
(my,1)
(world,3)
每行字符以空格進行分割,然後分別進行彙總統計,得到的輸出結果一致。
3 擴展閱讀
如果你在官網閱覽,應該也曾看到過 TimeWindow 時間窗口的例子,下面是 Demo 代碼
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// the port to connect to
String hostName = "127.0.0.1";
int port = 9000;
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.getWord(), a.getCount() + b.getCount());
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
}
這裏的程序代碼核心點在于,比之前的多了一個算子 timeWindow,並且有兩個參數,分別是時間窗口大小以及滑動窗口大小(Time size, Time slide),下面是簡單的輸入和輸出示意圖:
由于滑動窗口大小是 1s,窗口是有重合的部分,然後每秒統計自己所在窗口的數據(5s 內傳輸過來的數據),可以看到第 6s 時,已經舍棄掉第 0s 輸入的字符串數據。
小夥伴們也可以修改一下時間窗口大小和滑動窗口大小,然後輸入自定義的數據,進行不同參數的設置,看下輸出效果如何,是否有達到自己的預期。
這裏先初步接觸一下 時間(Time)和窗口(Window)概念,之後慢慢接觸逐步加深理解吧。
4 總結
本文基于 Mac 系統、 Apache Flink 1.9 版本進行了項目搭建和 Demo 編寫,介紹了 Suorce -> Transformation -> Sink 的流程。簡單的實現了一個字符計數器,往套接字數據源 SocketTextStream,源源不斷的輸入,然後進行統計出現的次數,如有疑惑或不對之處請與我討論~