PySpark_命令與RDD使用(Transformation及Action)_深入筆記(一)_Lineage(譜系)與RDD Types_parallelize與collect方法及其他常用轉換方法使用
Lineage(譜系)
代表一組資訊,紀錄RDD如何產生的。
比方:由textfile毒入後或由某個RDD經過map轉換而產生,也會間接描述RDD之間的相依性。
在 Apache Spark 中,Resilient Distributed Dataset(RDD)是最基本的數據結構,用於在多個節點之間分佈數據和進行並行計算。
RDD操作共分成兩大類
1.Transformation
對於已存在的RDD進行某種操作,當完成操作後再返回一個新的RDD。
所有transformation操作都屬於延遲操作(不會立刻執行)。
2.Action
把action操作後結果,返回給driver program或寫到外部儲存體之中。
所有action操作都是屬於會立即執行的。
RDD Types
基本上分為如下兩種
Regular RDD
可以存儲任何類型的物件(如整數、字符串、自定義物件等)。這種類型的 RDD 用於普通的並行數據處理,不涉及鍵值對操作。
可以存儲任何類型的物件(如整數、字符串、自定義物件等)。這種類型的 RDD 用於普通的並行數據處理,不涉及鍵值對操作。
# 創建一個 Regular RDD
numbers = sc.parallelize([1, 2, 3, 4, 5])
# 使用 reduce 操作計算總和
sum = numbers.reduce(lambda a, b: a + b)
sum # 輸出將是 15
Pair RDD
是包含鍵值對(key-value pairs)的 RDD,每個元素都是一個二元組(tuple),第一個元素作為鍵(key),第二個元素作為值(value)。這種類型的 RDD 常用於需要進行分組、聚合或按鍵排序等操作的情況。
假設我們有一組銷售數據,需要按產品類別對銷售額進行聚合。
# 創建一個 Pair RDD
sales_data = sc.parallelize([("Books", 100), ("Books", 200), ("Electronics", 300)])
# 使用 reduceByKey 操作來聚合同一類別的銷售額
sales_total = sales_data.reduceByKey(lambda a, b: a + b)
sales_total.collect()# 收集結果並打印
在 Apache Spark 中,parallelize 方法是用於創建 RDD 的基本方式之一,它可以將本地 Python 集合(如列表)轉換為分佈式的 RDD。
>>> sc.parallelize([2,3,4])
ParallelCollectionRDD[15] at parallelize at PythonRDD.scala:195
使用 SparkContext 的 parallelize 方法創建了一個 Regular RDD。
這個 RDD 包含三個元素:2、3 和 4。
這些元素將被分佈在集群的不同節點上以便進行並行處理。
#該 RDD 被分為兩個分區。
>>> sc.parallelize([2,3,4]).getNumPartitions()
2
調用 getNumPartitions() 方法,該方法返回 RDD 的分區數量。
分區數量是 Spark 在分佈式環境中進行數據處理時
將數據分割的塊數,這直接影響到並行處理的程度。
分區的數量可以由 Spark 的配置和環境決定,或者可以在調用 parallelize 方法時手動指定。
collect() 方法是一個動作(action),用於將 RDD 中的所有元素以列表的形式返回到驅動程序。
>>> sc.parallelize([2,3,4]).collect()
[2, 3, 4]
其他常用轉換方法
在常見的Transformation方法中,若該方法名稱沒有特別加上以捨麼為單位(比方有partition字眼就是屬於Partiton-Based)就算是Element-Based的Transformation方法。
map(function)
把來源RDD中各個元素經過function處理後再返回一個新的RDD元素。
map()將父RDD每個element當作function輸入,執行函數的回傳值
當作子RDD的element(不論回傳值的型態為何)
flatMap(function)
把來源RDD中各個元素經過function處理後再返回一個新的RDD元素,但每個輸入元素會被展開變平坦化,可能輸出0~多個元素。flatMap()將父RDD每個element當作function輸入,執行函數的回傳值若為陣列(列表),則將每個陣列元素當作子RDD的element
filter(function)
將父RDD每個element當作function輸入,讓函數回傳值為TRUE的element才會當作子RDD的element
以下用python的range方法,含頭不含尾產生依序數值元素。
Using Python version 2.7.5 (default, Apr 2 2020 13:16:51)
SparkSession available as 'spark'.
>>> sc
<SparkContext master=yarn appName=PySparkShell>
>>> sc.parallelize([2,3,4]).collect()
[2, 3, 4]
>>> sc.parallelize([2,3,4]).map(lambda x: range(1,x)).collect()
[[1], [1, 2], [1, 2, 3]]
每個元素依序是python range函數的運行輸出結果
range(1,2),range(1,3),range(1,4)
>>> sc.parallelize([2,3,4]).flatMap(lambda x: range(1,x)).collect()
[1, 1, 2, 1, 2, 3]
>>> sc.parallelize([2,3,4]).map(lambda x: range(1,x)).filter(lambda x: len(x)>1).collect()
[[1, 2], [1, 2, 3]]
union(otherRDD)
取聯集不去重,結果類似SQL的union all。
>>> rdd1 = sc.parallelize([1,2,5])
>>> rdd2 = sc.parallelize(range(1,5))
>>> rdd1.collect()
[1, 2, 5]
>>> rdd2.collect()
[1, 2, 3, 4]
>>> rdd1.union(rdd2).collect()
[1, 2, 5, 1, 2, 3, 4]
>>> rdd1.union(rdd2).distinct().collect()
[4, 1, 5, 2, 3]
>>>
distinct([num Tasks])
對來源RDD元素單獨化,去除重複。
>>> rdd1.subtract(rdd2).collect()
[5]
>>> rdd2.subtract(rdd1).collect()
[4, 3]
>>>
把來源RDD跟輸入RDD做交集,輸出為新的RDD。
同時出現在rdd1與rdd2元素
>>> rdd1.intersection(rdd2).collect()
[1, 2]
zip(otherRDD)
zip可以用來產生tuple rdd(左邊rdd的元素為key,右邊rdd的元素為value),但兩個rdd的元素個數必須相同
用於將兩個 RDD 合併成一個新的 RDD,其中每個元素是由兩個原始 RDD 中相對應位置的元素組成的對(tuple)。
錯誤原因是rdd1與rdd2的元素個數不相同
org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition
>>> rdd3 = sc.parallelize(range(5,9))
>>> rdd3
ParallelCollectionRDD[61] at parallelize at PythonRDD.scala:195
>>> rdd3.collect()
[5, 6, 7, 8]
>>> rdd2.collect()
[1, 2, 3, 4]
>>> rdd3.zip(rdd2).collect()
[(5, 1), (6, 2), (7, 3), (8, 4)]
>>>
元素個數不相同,可以使用cartesian笛卡爾乘積。
>>> rdd1.collect()
[1, 2, 5]
>>> rdd2.collect()
[1, 2, 3, 4]
>>> rdd1.cartesian(rdd2).collect()
[(1, 1), (1, 2), (1, 3), (1, 4), (2, 1), (2, 2), (5, 1), (5, 2), (2, 3), (2, 4), (5, 3), (5, 4)]
>>>
留言
張貼留言