Flink入門(mén)教程——DataStream編程(1)
1.1 輸入流
源是程序從中讀取輸入的位置,可以使用以下方法將源附加到您的程序:
StreamExecutionEnvironment.addSource(sourceFunction) 。
Flink附帶了許多預(yù)先實(shí)現(xiàn)的源函數(shù),但您可以通過(guò)實(shí)現(xiàn) SourceFunction 非并行源,或通過(guò)實(shí)現(xiàn)
ParallelSourceFunction 接口或擴(kuò)展 RichParallelSourceFunction for parallel源來(lái)編寫(xiě)自己的自
定義源。
有幾個(gè)預(yù)定義的流源可從以下位置訪問(wèn) StreamExecutionEnvironment :
基于文件:
readTextFile(path) - TextInputFormat 逐行讀取文本文件,即符合規(guī)范的文件,并將它們作
為字符串返回。
readFile(fileInputFormat, path) - 按指定的文件輸入格式指定讀?。ㄒ淮危┪募?。
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 這
是前兩個(gè)內(nèi)部調(diào)用的方法。它 path 根據(jù)給定的內(nèi)容讀取文件 fileInputFormat 。根據(jù)提供的內(nèi)
容 watchType ,此源可以定期監(jiān)視(每 interval ms)新數(shù)據(jù)
( FilePROCessingMode.PROCESS_CONTINUOUSLY )的路徑,或者處理當(dāng)前在路徑中的數(shù)據(jù)并退
出( FilePROCessingMode.PROCESS_ONCE )。使用 pathFilter ,用戶可以進(jìn)一步排除正在處
理的文件。
實(shí)現(xiàn):
Flink將文件讀取過(guò)程分為兩個(gè)子任務(wù),即目錄監(jiān)控和數(shù)據(jù)讀取。這些子任務(wù)中的每一個(gè)都由單獨(dú)
的實(shí)體實(shí)現(xiàn)。監(jiān)視由單個(gè)非并行(并行性= 1)任務(wù)實(shí)現(xiàn),而讀取由并行運(yùn)行的多個(gè)任務(wù)執(zhí)行。后
者的并行性等于工作并行性。單個(gè)監(jiān)視任務(wù)的作用是掃描目錄(定期或僅一次,具體取決于
watchType ),找到要處理的文件,將它們分成分割,并將這些拆分分配給下游讀者。讀者是那
些將閱讀實(shí)際數(shù)據(jù)的人。每個(gè)分割僅由一個(gè)讀取器讀取,而讀取器可以逐個(gè)讀取多個(gè)分割。
重要筆記:
如果 watchType 設(shè)置為 FilePROCessingMode.PROCESS_CONTINUOUSLY ,則在修改文件時(shí),將完全重新處理其內(nèi)容。這可以打破“完全一次”的語(yǔ)義,因?yàn)樵谖募┪哺郊訑?shù)據(jù)將導(dǎo)致其所有內(nèi)容被重新處理。
如果 watchType 設(shè)置為 FilePROCessingMode.PROCESS_ONCE ,則源掃描路徑一次并退出,而不等待讀者完成讀取文件內(nèi)容。當(dāng)然讀者將繼續(xù)閱讀,直到讀取所有文件內(nèi)容。在該點(diǎn)之后關(guān)閉源將導(dǎo)致不再有檢查點(diǎn)。這可能會(huì)導(dǎo)致節(jié)點(diǎn)故障后恢復(fù)速度變慢,因?yàn)樽鳂I(yè)將從上一個(gè)檢查點(diǎn)恢復(fù)讀取。
基于Socket:
socketTextStream - 從Socket中讀取,元素可以用分隔符分隔。
基于集合:
fromCollection(Collection) - 從Java Java.util.Collection創(chuàng)建數(shù)據(jù)流。集合中的所有元素必須
屬于同一類型。
fromCollection(Iterator, Class) - 從迭代器創(chuàng)建數(shù)據(jù)流。該類指定迭代器返回的元素的數(shù)
據(jù)類型。
fromElements(T ...) - 從給定的對(duì)象序列創(chuàng)建數(shù)據(jù)流。所有對(duì)象必須屬于同一類型。
fromParallelCollection(SplittableIterator, Class) - 并行地從迭代器創(chuàng)建數(shù)據(jù)流。該類
指定迭代器返回的元素的數(shù)據(jù)類型。
generateSequence(from, to) - 并行生成給定間隔中的數(shù)字序列。
自定義:
addSource - 附加新的源功能。例如,要從Apache Kafka讀取,可以使用 addSource(new
FlinkKafkaConsumer08<>(...)) 。
今天先分享到這兒,明天為大家分享DataStream編程——數(shù)據(jù)流轉(zhuǎn)換。