PySpark_命令與RDD使用(Transformation及Action)_深入筆記(一)_Lineage(譜系)與RDD Types_parallelize與collect方法及其他常用轉換方法使用

Lineage(譜系)
代表一組資訊,紀錄RDD如何產生的。
比方:由textfile毒入後或由某個RDD經過map轉換而產生,也會間接描述RDD之間的相依性。
https://www.researchgate.net/figure/Lineage-graph-for-RDDs-in-Phase-1-of-RDD-Apriori_fig1_334987142

在 Apache Spark 中,Resilient Distributed Dataset(RDD)是最基本的數據結構,用於在多個節點之間分佈數據和進行並行計算。

RDD操作共分成兩大類

1.Transformation
對於已存在的RDD進行某種操作,當完成操作後再返回一個新的RDD。
所有transformation操作都屬於延遲操作(不會立刻執行)。

2.Action
把action操作後結果,返回給driver program或寫到外部儲存體之中。
所有action操作都是屬於會立即執行的。


RDD Types
基本上分為如下兩種
Regular RDD
可以存儲任何類型的物件(如整數、字符串、自定義物件等)。這種類型的 RDD 用於普通的並行數據處理,不涉及鍵值對操作。
# 創建一個 Regular RDD
numbers = sc.parallelize([1, 2, 3, 4, 5])
# 使用 reduce 操作計算總和
sum = numbers.reduce(lambda a, b: a + b)
sum  # 輸出將是 15

Pair RDD
是包含鍵值對(key-value pairs)的 RDD,每個元素都是一個二元組(tuple),第一個元素作為鍵(key),第二個元素作為值(value)。這種類型的 RDD 常用於需要進行分組、聚合或按鍵排序等操作的情況。

假設我們有一組銷售數據,需要按產品類別對銷售額進行聚合。
# 創建一個 Pair RDD
sales_data = sc.parallelize([("Books", 100), ("Books", 200), ("Electronics", 300)])
# 使用 reduceByKey 操作來聚合同一類別的銷售額
sales_total = sales_data.reduceByKey(lambda a, b: a + b)
sales_total.collect()# 收集結果並打印

要特別注意非pair RDD則不能使用partitioner函數,因為partitioner為NONE。


parallelize與collect方法使用
在 Apache Spark 中,parallelize 方法是用於創建 RDD 的基本方式之一,它可以將本地 Python 集合(如列表)轉換為分佈式的 RDD。
>>> sc.parallelize([2,3,4])
ParallelCollectionRDD[15] at parallelize at PythonRDD.scala:195

使用 SparkContext 的 parallelize 方法創建了一個 Regular RDD。
這個 RDD 包含三個元素:2、3 和 4。
這些元素將被分佈在集群的不同節點上以便進行並行處理。
#該 RDD 被分為兩個分區。
>>> sc.parallelize([2,3,4]).getNumPartitions()
調用 getNumPartitions() 方法,該方法返回 RDD 的分區數量。
分區數量是 Spark 在分佈式環境中進行數據處理時
將數據分割的塊數,這直接影響到並行處理的程度。
分區的數量可以由 Spark 的配置和環境決定,或者可以在調用 parallelize 方法時手動指定。

collect() 方法是一個動作(action),用於將 RDD 中的所有元素以列表的形式返回到驅動程序。
>>> sc.parallelize([2,3,4]).collect()
[2, 3, 4]


其他常用轉換方法
在常見的Transformation方法中,若該方法名稱沒有特別加上以捨麼為單位(比方有partition字眼就是屬於Partiton-Based)就算是Element-Based的Transformation方法。

map(function) 
把來源RDD中各個元素經過function處理後再返回一個新的RDD元素。
map()將父RDD每個element當作function輸入,執行函數的回傳值
當作子RDD的element(不論回傳值的型態為何)

flatMap(function)
把來源RDD中各個元素經過function處理後再返回一個新的RDD元素,但每個輸入元素會被展開變平坦化,可能輸出0~多個元素。flatMap()將父RDD每個element當作function輸入,執行函數的回傳值若為陣列(列表),則將每個陣列元素當作子RDD的element

filter(function)
將父RDD每個element當作function輸入,讓函數回傳值為TRUE的element才會當作子RDD的element

以下用python的range方法,含頭不含尾產生依序數值元素。


Using Python version 2.7.5 (default, Apr  2 2020 13:16:51)
SparkSession available as 'spark'.
>>> sc
<SparkContext master=yarn appName=PySparkShell>
>>> sc.parallelize([2,3,4]).collect()
[2, 3, 4]
>>> sc.parallelize([2,3,4]).map(lambda x: range(1,x)).collect()
[[1], [1, 2], [1, 2, 3]]

每個元素依序是python range函數的運行輸出結果
range(1,2),range(1,3),range(1,4)

>>> sc.parallelize([2,3,4]).flatMap(lambda x: range(1,x)).collect()
[1, 1, 2, 1, 2, 3]
>>> sc.parallelize([2,3,4]).map(lambda x: range(1,x)).filter(lambda x: len(x)>1).collect()
[[1, 2], [1, 2, 3]]

union(otherRDD)
取聯集不去重,結果類似SQL的union all。

>>> rdd1 = sc.parallelize([1,2,5])
>>> rdd2 = sc.parallelize(range(1,5))
>>> rdd1.collect()
[1, 2, 5]
>>> rdd2.collect()
[1, 2, 3, 4]
>>> rdd1.union(rdd2).collect()
[1, 2, 5, 1, 2, 3, 4]
>>> rdd1.union(rdd2).distinct().collect()
[4, 1, 5, 2, 3]
>>>

distinct([num Tasks])
對來源RDD元素單獨化,去除重複。

>>> rdd1.subtract(rdd2).collect()
[5]
>>> rdd2.subtract(rdd1).collect()
[4, 3]
>>>


insersection(otherRDD)
把來源RDD跟輸入RDD做交集,輸出為新的RDD。
同時出現在rdd1與rdd2元素
>>> rdd1.intersection(rdd2).collect()
[1, 2]


zip(otherRDD)
zip可以用來產生tuple rdd(左邊rdd的元素為key,右邊rdd的元素為value),但兩個rdd的元素個數必須相同
用於將兩個 RDD 合併成一個新的 RDD,其中每個元素是由兩個原始 RDD 中相對應位置的元素組成的對(tuple)。

錯誤原因是rdd1與rdd2的元素個數不相同
org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition

>>> rdd3 = sc.parallelize(range(5,9))
>>> rdd3
ParallelCollectionRDD[61] at parallelize at PythonRDD.scala:195
>>> rdd3.collect()
[5, 6, 7, 8]
>>> rdd2.collect()
[1, 2, 3, 4]
>>> rdd3.zip(rdd2).collect()
[(5, 1), (6, 2), (7, 3), (8, 4)]
>>>

cartesian(otherRDD)
元素個數不相同,可以使用cartesian笛卡爾乘積。
>>> rdd1.collect()
[1, 2, 5]
>>> rdd2.collect()
[1, 2, 3, 4]
>>> rdd1.cartesian(rdd2).collect()
[(1, 1), (1, 2), (1, 3), (1, 4), (2, 1), (2, 2), (5, 1), (5, 2), (2, 3), (2, 4), (5, 3), (5, 4)]
>>>



留言

這個網誌中的熱門文章

何謂淨重(Net Weight)、皮重(Tare Weight)與毛重(Gross Weight)

Architecture(架構) 和 Framework(框架) 有何不同?_軟體設計前的事前規劃的藍圖概念

經得起原始碼資安弱點掃描的程式設計習慣培養(五)_Missing HSTS Header