Spark Streaming使用介紹_DStream API與Structured Streaming

 
https://www.researchgate.net/figure/Spark-Streaming-uses-DStream-to-transform-streaming-data-into-a-series-of-batches_fig7_381900460


https://www.databricks.com/blog/2015/07/30/diving-into-apache-spark-streamings-execution-model.html



DStream API(Discretized Streams)
  • 較舊的流處理模型,主要操作RDD(彈性分佈式數據集)來處理數據流。
  • Dstream也就是離散的stream,也就表示把連續的資料分成一小團一小團,又稱作“microbatching”。每個microbatch 變成一個RDD以便Spark的後續處理。
  • 在每一個batch interval中,每個DStream會有也僅有一個RDD。
  • 可以由StreamingContext透過串流資料產生或其他DStream使用map方法產生(與RDD一樣)。
  • DStream裡面的RDD就是透過某個時間間隔產生的,而且以產生的時間為索引,因此time屬性對DStream而言非常重要。
  • 在訪問DStream的某個RDD時,實際上是訪問它在某個時間點的RDD。


Structured Streaming
  • 自spark2.0 開始, spark 引進了一套新的流式計算模型「Structured Streaming」
  • 進一步降低了處理資料的延遲時間
  • 提供快速,可擴展,容錯,end-to-end exactly-once stream processing (端到端的完全一次性流處理),實踐了"有且僅有一次(Exectly Once)" 語義, 可以保證資料被精準消費。
  • 是一種基於Spark SQL 引擎構建的可擴展且容錯的stream processing engine 。
  • 可以使用Dataset/DataFrame API 來表示streaming aggregations (流聚合), event-time windows (事件時間視窗), stream-to-batch joins (流到批次連接) 等。
  • 結構化流提供了對複雜事件處理和狀態管理的內建支持,使其非常適合需要精細事件時間控制和狀態持久化的應用。
  • 預設內部Structured Streaming 查詢使用微批次引擎(micro-batch processing engine)處理
  • 微批次引擎把流資料當做一系列小批job(small batch jobs)處理,因此延遲低至100 毫秒。

減少畫面訊息,可暫時將console訊息詳細程度設為ERROR

通過修改log4j.properties檔案來調整日誌紀錄等級。一開始,日誌等級設定為INFO,之後被改為ERROR。這表示只有錯誤訊息會被輸出到控制台。
DStream API(Discretized Streams)程式範本

--開啟一個新終端機,連線到本機tcp:12345埠號
$ nc -lk 12345
  暫時不輸入任何資料

--開啟一個新終端機
$ pyspark
>>> from pyspark.streaming import StreamingContext
>>> ssc = StreamingContext(sc,2) #每2秒截斷串流一次,將其變成DStream
>>> logs = ssc.socketTextStream('localhost',12345)
>>> userreqs = logs.map(lambda line: (line.split(' ')[2],1)).reduceByKey(lambda v1,v2: v1+v2)
>>> userreqs.pprint()
>>> ssc.start()  

創建了一個流處理上下文(StreamingContext),並且設置每2秒鐘處理一次資料流。
將資料轉換為DStream。
使用socketTextStream從本機的12345端口接收數據,進行單詞計數。
程式會將每行文本按空格分割,然後統計每個單詞的出現次數。

可觀察到即便沒有資料,也每2秒產生micro-batch進行操作1次。

回到nc -lk 12345的終端機,接著輸入下列內容
I've never seen a purple cow.
I never hope to see one;
But I can tell you, anyhow,

再返回spark終端就會觀察到
當我們從另一個終端機使用nc -lk 12345發送文本資料到PySpark應用時,終端的輸出顯示每2秒的時間戳和進行中的單詞計數結果。

發送的文本逐行被分析,並且相關的單詞計數結果在特定時間點顯示。
在此選取每行的第三個單詞,使用 split(' ') 函數按空格分割每一行文本。,從分割後的列表中選取第三個元素(索引為2的元素)。
在特定的時間點顯示(u'seen', 1)、(u'hope', 1)、和(u'can', 1),這表示這些單詞在輸入流中被識別和計數。




Structured Streaming程式範本
使用了 PySpark 的結構化流(Structured Streaming)來實現一個簡單的實時單詞計數程式。這個程序從一個網絡端口(socket)讀取文本數據,然後計算每個單詞的出現次數。







產生一個wordcount_streaming.py 程式如下

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
lines = spark.readStream.format("socket").option("host", "localhost").option("port", 12345).load()
words = lines.select(explode(split(lines.value," ")).alias("word"))
wordCounts = words.groupBy("word").count()
query = wordCounts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()

功能解釋:

建立 SparkSession:
SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate() 初始化 SparkSession,這是使用 Spark SQL 和結構化流的入口點。

讀取數據流:
spark.readStream.format("socket").option("host", "localhost").option("port", 12345).load() 這行設定從本機的12345端口讀取數據流,數據源格式設定為“socket”。

處理文本數據:
explode(split(lines.value, " ")) 將每行文本使用空格分割後,使用 explode 函数將分割後的數據展開成多行,每行包含一個單詞。
.alias("word") 將結果列命名為 "word"。

計數和分組:
words.groupBy("word").count() 對單詞進行分組,並計算每個分組的數量。

輸出和啟動查詢:
wordCounts.writeStream.outputMode("complete").format("console").start() 設定輸出模式為“complete”,表示每次批次處理完後都將輸出所有累計的計數結果到控制台。
query.awaitTermination() 啟動流查詢,並等待處理終止(通常是外部中斷)。

spark-submit 命令的作用
spark-submit 是用於提交 Spark 應用程序到集群上執行的命令。
這個命令用來啟動先前創建的結構化流處理程序 wordcount_streaming.py。
此命令允許 Spark 應用程序在集群環境中運行,無論是在本地模式還是分布式模式下。




輸出
Batch: 0:
首個批次時沒有數據處理,因此沒有輸出。

Batch: 1:
第一批數據處理了 "Hello World" 和 "Hello Samuel":
"Hello" 出現了兩次。
"World" 和 "Samuel" 各出現了一次。

Batch: 2:
第二批數據繼續處理了 "Hello Spark":
"Hello" 的總計數增加到三次(前兩次加這一次)。
"Spark" 新增至統計,出現一次。
"World" 和 "Samuel" 的計數保持不變,因為這一批次沒有這兩個單詞。


Ref:
https://www.databricks.com/blog/2015/07/30/diving-into-apache-spark-streamings-execution-model.html
https://vanducng.dev/2020/10/10/Getting-started-with-spark-structure-streaming/
https://www.debadityachakravorty.com/programming/sparkstream/
https://mikolaje.github.io/2016/spark_foreachRDD.html
https://www.cnblogs.com/zwCHAN/p/4274804.html
https://zhenchao125.github.io/bigdata_spark-project_atguigu/di-7-bu-fen-structured-streaming/di-1-zhang-structured-streaming-gai-shu.html
https://cloud.tencent.com/developer/inventory/2478/article/1735668


留言

這個網誌中的熱門文章

何謂淨重(Net Weight)、皮重(Tare Weight)與毛重(Gross Weight)

Architecture(架構) 和 Framework(框架) 有何不同?_軟體設計前的事前規劃的藍圖概念

經得起原始碼資安弱點掃描的程式設計習慣培養(五)_Missing HSTS Header