Menu
快讀
  • 旅遊
  • 生活
    • 美食
    • 寵物
    • 養生
    • 親子
  • 娛樂
    • 動漫
  • 時尚
  • 社會
  • 探索
  • 故事
  • 科技
  • 軍事
  • 国际
快讀

Flink 基礎學習(二)搭建一個 “Hello World” 程序

2020 年 1 月 9 日 科技伍小黑

在學習技術時,總會有一個簡單程序 Demo 帶著我們入門,所以參考著官網例子,帶大家快速熟悉 Flink 的 Hello World~

說明一下,項目運行的環境如下:

OS : Mac

Flink Version : 1.9

IDE : IDEA

Java Version : 1.8

下面來講下關于環境准備,如果是 Windows 的用戶,請參照每個步驟,找到適應自己的安裝 or 啓動方法。

1 環境准備

首先我們默認已經安裝了 Jdk 1.8 和編碼工具 IDEA,下面來講如何安裝 Flink 和建立腳手架。下面展示的項目代碼已經放入了 Github,可以下載進行本地運行

1.1 安裝 Flink

$ brew install apache-flink

檢查安裝是否成功以及版本號

$ flink --version
Version: 1.9.0, Commit ID: 9c32ed9

接著以單機集群模式啓動 Flink

$ sh /usr/local/Cellar/apache-flink/1.9.0/libexec/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host yejingqideMBP-c510.
Starting taskexecutor daemon on host yejingqideMBP-c510.

然後訪問 localhost:8081 監控界面(1.9 版本更新了 UI):

Flink 基礎學習(二)搭建一個

1.2 創建項目

這裏推薦的是使用 maven 進行構建,在命令行中輸入如下內容(# 號後面是說明,請不要輸入):

$ mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \               # flink 的 group.id
    -DarchetypeArtifactId=flink-quickstart-java \       # flink 的 artifact.id
    -DarchetypeVersion=1.9.0 \                          # flink 的 version,以上三個請不要修改,按照默認即可
    -DgroupId=wiki-edits \                              # 項目的 group.id
    -DartifactId=wiki-edits \                           # 項目的 artifact.id
    -Dversion=0.1 \                                     # 項目的 version.id
    -Dpackage=wikiedits \                               # 項目的基礎包名
    -DinteractiveMode=false                             # 是否需要和用戶交互以獲得輸入,由于上面已經自己寫了項目的參數,所以禁用了。反之請刪掉 上面項目的配置,將交互模式設爲 true

如果按照官方的例子填寫,那麽你將得到如下的項目結構:

$ tree wiki-edits
wiki-edits/
├── pom.xml
└── src
    └── main
        ├── java
        │   └── wikiedits
        │       ├── BatchJob.java
        │       └── StreamingJob.java
        └── resources
            └── log4j.properties

如果是自己自定義的,包結構會不一致,但是通過腳手架創立的,pom 文件中預置的依賴都將一致,引入了 Flink 基礎開發相關的 API,然後通過 IDEA 打開該項目目錄,就可以開始我們的 Hello world。

2 開始項目

首先交代一下待會的流程,編寫程序代碼,啓動 netcat 命令來監聽 9000 端口,啓動或提交 Flink 程序,最後監聽日志輸出信息。

2.1 項目代碼

Demo 的代碼作用是監聽 netcat 輸入的字符,然後進行聚合操作,最後輸出字符統計

public class SocketTextStreamWordCount {

    public static void main(String[] args) throws Exception {
        String hostName = "127.0.0.1";
        int port = 9000;
        // 設置運行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 獲取數據源
        DataStreamSource<String> stream = env.socketTextStream(hostName, port);
        // 計數
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream
            .flatMap((new LineSplitter()))
            .keyBy(0)
            .sum(1);
        // 輸出
        sum.print();
        // 提交任務
        env.execute("Java Word from SocketTextStream Example");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
            String[] tokens = s.toLowerCase().split("\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    collector.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}

Flink 基礎學習(二)搭建一個

簡單說明一下,上面出現了 SocketTextStream 套接字字符 數據源(Source),接著是 算子(Operator): FlatMap(一個輸入源,可以輸出零個或多個結果)、KeyBy(按照某字段或者 tuple 元組中某個下標進行分類) 和 sum(跟翻譯一樣,就是進行聚合彙總) ,最後輸出

2.2 開啓 tcp 長鏈接

爲了模擬流數據,我們造的場景是不斷往 9000 端口輸入字符,Flink 程序添加的數據源是 SocketTextStream (套接字字符流)。

在你的終端中輸入以下命令

$ nc -l 9000

有關 netcat 命令的用法,請看參考資料第二條,這裏的作用就是打開 TCP 長鏈接,監聽 9000 端口

2.3 啓動 Flink 程序

剛才第一個步驟中,已經編輯好了程序代碼,第二個步驟也已經啓動了一個 TCP 客戶端,啓動 Flink 程序有兩種方法:

2.3.1 本地調試

使用 IDEA 的好處很多,代碼補全,語法檢查和快捷鍵之類的。我經常使用的調試方法就是添加一個 psvm 的 main 方法,在裏面寫執行代碼,最後點擊綠色的啓動按鈕~

Flink 基礎學習(二)搭建一個

如果不需要調試,想直接看結果,選擇第一個 Run,但有時不確定代碼執行過程和出錯的具體原因,可以通過第二個選項 Debug 進行調試。

這是本地開發經常使用的方法,進行結果的驗證。

2.3.2 提交到 JobManager

前面我們啓動的是單機集群版,啓動了一個 JobManager 和 TaskWorker,打開的 localhost:8081 就是 JobManager 的監控面板,所以我們要通過下面的方式,將 Flink 程序提交到 JobManager。

這裏教一個簡單的方法,我們通過 mvn clean package 進行打包後,可以在 IDEA 集成的終端標簽欄下提交我們的程序:

Flink 基礎學習(二)搭建一個

由于每個人的絕對路徑都不一樣,所以我們通過 IDEA 的終端,它會自動定位到項目的路徑,然後執行時填寫相對路徑的 jar 包名字即可

$ flink run -c cn.sevenyuan.wordcount.SocketTextStreamWordCount target/flink-quick-start-1.0-SNAPSHOT.jar

-c 參數是指定運行的主程序入口,接著我們去查看監控面板,可以發現任務狀態已經處于監控中:

Flink 基礎學習(二)搭建一個

頂部信息講的是運行程序名字、時間、時間線、配置參數等信息,底下 Name 一欄,說明該程序邏輯步驟(讀取數據源,進行映射處理,使用 keyBy 和聚合運算,最後輸出到【打印 sink】)

2.4 輸入數據 & 驗證結果

前面驗證了程序正常啓動,接下來我們來驗證輸入和輸出

先來監聽輸出,進入 Flink 的日志目錄,接著通過 tail 命令監聽任務執行者 TaskWorkder(默認會啓動一個任務執行者,所以編碼爲 0) 的日志輸出

$ usr/local/Cellar/apache-flink/1.9.0/libexec/log
$ tail -400f flink*-taskexecutor-0*.out

接著,在 nc -l 9000 對應的終端窗口中輸入如下數據:

$ nc -l 9000
hello world
test world
test hello
hello my world

最後就能夠看到以下輸出結果:

(hello,1)
(world,1)
(test,1)
(world,2)
(test,2)
(hello,2)
(hello,3)
(my,1)
(world,3)

每行字符以空格進行分割,然後分別進行彙總統計,得到的輸出結果一致。

3 擴展閱讀

如果你在官網閱覽,應該也曾看到過 TimeWindow 時間窗口的例子,下面是 Demo 代碼

public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

        // the port to connect to
        String hostName = "127.0.0.1";
        int port = 9000;

        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream("localhost", port, "\n");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.getWord(), a.getCount() + b.getCount());
                    }
                });

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }
}

這裏的程序代碼核心點在于,比之前的多了一個算子 timeWindow,並且有兩個參數,分別是時間窗口大小以及滑動窗口大小(Time size, Time slide),下面是簡單的輸入和輸出示意圖:

Flink 基礎學習(二)搭建一個

由于滑動窗口大小是 1s,窗口是有重合的部分,然後每秒統計自己所在窗口的數據(5s 內傳輸過來的數據),可以看到第 6s 時,已經舍棄掉第 0s 輸入的字符串數據。

小夥伴們也可以修改一下時間窗口大小和滑動窗口大小,然後輸入自定義的數據,進行不同參數的設置,看下輸出效果如何,是否有達到自己的預期。

這裏先初步接觸一下 時間(Time)和窗口(Window)概念,之後慢慢接觸逐步加深理解吧。


4 總結

本文基于 Mac 系統、 Apache Flink 1.9 版本進行了項目搭建和 Demo 編寫,介紹了 Suorce -> Transformation -> Sink 的流程。簡單的實現了一個字符計數器,往套接字數據源 SocketTextStream,源源不斷的輸入,然後進行統計出現的次數,如有疑惑或不對之處請與我討論~

相關文章:

  • Microsoft 70-465 Demo Download Sale Online Sites - 70-465…
  • [NEW Online Training] < 300-075 Exam Materials Passing…
  • 【周末去哪】送票!動物園門票半價,8折張亮麻辣燙,半仙豆夫買1送1,$1吃生蚝,1折家電、Hello Kitty涮涮鍋
  • Mock.js入門
  • 戰友和對手感謝聲不斷,80歲老領導:Hello,我還沒退出政壇
  • Hello Kitty主題下午茶&Staycation!住一晚還可以帶回家超多萌物
科技

發佈留言 取消回覆

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *

©2025 快讀 | 服務協議 | DMCA | 聯繫我們