PySpark_命令與RDD使用(Transformation及Action)_深入筆記(一)_Lineage(譜系)與RDD Types_parallelize與collect方法及其他常用轉換方法使用
Lineage(譜系)
代表一組資訊,紀錄RDD如何產生的。
比方:由textfile毒入後或由某個RDD經過map轉換而產生,也會間接描述RDD之間的相依性。
在 Apache Spark 中,Resilient Distributed Dataset(RDD)是最基本的數據結構,用於在多個節點之間分佈數據和進行並行計算。
RDD操作共分成兩大類
1.Transformation
對於已存在的RDD進行某種操作,當完成操作後再返回一個新的RDD。
所有transformation操作都屬於延遲操作(不會立刻執行)。
例如:
map 轉換通過函數傳遞數據集的每個元素,產生新的 RDD。
filter 支持根據函數篩選數據集的元素。
distinct 非重複查找數據集中不同元素。
flatmap 轉換(類似於 map 轉換)可以將每個 input 項映射到零個或更大的 output 項。
2.Action
把action操作後結果,返回給driver program或寫到外部儲存體之中。
所有action操作都是屬於會立即執行的。
例如:
reduce 操作,它聚合了 RDD 的所有元素並返回result 添加到驅動程式中。
reduce,它使用函數 func 聚合 dataset 元素。
take,它返回一個包含前 n 個元素的陣列。
collect,它將數據集的所有元素作為陣列返回。
takeOrdered,它返回按升序排序或由可選的function 參數。
轉換和操作是如何發生的呢?
Spark 使用一種名為有向無環圖directed acyclic graph(稱為 DAG)
和一個關聯的 DAG 計劃程式RDD 操作。
將 DAG 視為由邊和頂點組成的圖形結構。
術語"acyclic"「非迴圈」意味著新邊僅源自現有頂點。
通常頂點和邊會是連續的,頂點表示 RDD,邊表示轉換或操作。
DAG Scheduler 應用圖形結構來運行使用 RDD 執行的任務轉型過程。
為什麼Spark使用DAG?
因為DAG 有助於實現容錯能力。當節點宕機時,Spark 會複製 DAG 並恢復節點。
首先,Spark 在創建 RDD 時創建 DAG。
接下來,Spark 使 DAG Scheduler能夠執行轉換並更新 DAG。
DAG 現在指向新的 RDD。轉換 RDD 的指標將返回到 Spark 驅動程式。
如果存在Action,則調用該Action的驅動程式僅在Spark完成操作後評估 DAG。
RDD Types
基本上分為如下兩種
Regular RDD
可以存儲任何類型的物件(如整數、字符串、自定義物件等)。這種類型的 RDD 用於普通的並行數據處理,不涉及鍵值對操作。
可以存儲任何類型的物件(如整數、字符串、自定義物件等)。這種類型的 RDD 用於普通的並行數據處理,不涉及鍵值對操作。
# 創建一個 Regular RDD
numbers = sc.parallelize([1, 2, 3, 4, 5])
# 使用 reduce 操作計算總和
sum = numbers.reduce(lambda a, b: a + b)
sum # 輸出將是 15
Pair RDD
是包含鍵值對(key-value pairs)的 RDD,每個元素都是一個二元組(tuple),第一個元素作為鍵(key),第二個元素作為值(value)。這種類型的 RDD 常用於需要進行分組、聚合或按鍵排序等操作的情況。
假設我們有一組銷售數據,需要按產品類別對銷售額進行聚合。
# 創建一個 Pair RDD
sales_data = sc.parallelize([("Books", 100), ("Books", 200), ("Electronics", 300)])
# 使用 reduceByKey 操作來聚合同一類別的銷售額
sales_total = sales_data.reduceByKey(lambda a, b: a + b)
sales_total.collect()# 收集結果並打印
在 Apache Spark 中,parallelize 方法是用於創建 RDD 的基本方式之一,它可以將本地 Python 集合(如列表)轉換為分佈式的 RDD。
>>> sc.parallelize([2,3,4])
ParallelCollectionRDD[15] at parallelize at PythonRDD.scala:195
使用 SparkContext 的 parallelize 方法創建了一個 Regular RDD。
這個 RDD 包含三個元素:2、3 和 4。
這些元素將被分佈在集群的不同節點上以便進行並行處理。
#該 RDD 被分為兩個分區。
>>> sc.parallelize([2,3,4]).getNumPartitions()
2
調用 getNumPartitions() 方法,該方法返回 RDD 的分區數量。
分區數量是 Spark 在分佈式環境中進行數據處理時
將數據分割的塊數,這直接影響到並行處理的程度。
分區的數量可以由 Spark 的配置和環境決定,或者可以在調用 parallelize 方法時手動指定。
collect() 方法是一個動作(action),用於將 RDD 中的所有元素以列表的形式返回到驅動程序。
>>> sc.parallelize([2,3,4]).collect()
[2, 3, 4]
其他常用轉換方法
在常見的Transformation方法中,若該方法名稱沒有特別加上以捨麼為單位(比方有partition字眼就是屬於Partiton-Based)就算是Element-Based的Transformation方法。
map(function)
把來源RDD中各個元素經過function處理後再返回一個新的RDD元素。
map()將父RDD每個element當作function輸入,執行函數的回傳值
當作子RDD的element(不論回傳值的型態為何)
flatMap(function)
把來源RDD中各個元素經過function處理後再返回一個新的RDD元素,但每個輸入元素會被展開變平坦化,可能輸出0~多個元素。flatMap()將父RDD每個element當作function輸入,執行函數的回傳值若為陣列(列表),則將每個陣列元素當作子RDD的element
filter(function)
將父RDD每個element當作function輸入,讓函數回傳值為TRUE的element才會當作子RDD的element
以下用python的range方法,含頭不含尾產生依序數值元素。
Using Python version 2.7.5 (default, Apr 2 2020 13:16:51)
SparkSession available as 'spark'.
>>> sc
<SparkContext master=yarn appName=PySparkShell>
>>> sc.parallelize([2,3,4]).collect()
[2, 3, 4]
>>> sc.parallelize([2,3,4]).map(lambda x: range(1,x)).collect()
[[1], [1, 2], [1, 2, 3]]
每個元素依序是python range函數的運行輸出結果
range(1,2),range(1,3),range(1,4)
>>> sc.parallelize([2,3,4]).flatMap(lambda x: range(1,x)).collect()
[1, 1, 2, 1, 2, 3]
>>> sc.parallelize([2,3,4]).map(lambda x: range(1,x)).filter(lambda x: len(x)>1).collect()
[[1, 2], [1, 2, 3]]
union(otherRDD)
取聯集不去重,結果類似SQL的union all。
>>> rdd1 = sc.parallelize([1,2,5])
>>> rdd2 = sc.parallelize(range(1,5))
>>> rdd1.collect()
[1, 2, 5]
>>> rdd2.collect()
[1, 2, 3, 4]
>>> rdd1.union(rdd2).collect()
[1, 2, 5, 1, 2, 3, 4]
>>> rdd1.union(rdd2).distinct().collect()
[4, 1, 5, 2, 3]
>>>
distinct([num Tasks])
對來源RDD元素單獨化,去除重複。
>>> rdd1.subtract(rdd2).collect()
[5]
>>> rdd2.subtract(rdd1).collect()
[4, 3]
>>>
把來源RDD跟輸入RDD做交集,輸出為新的RDD。
同時出現在rdd1與rdd2元素
>>> rdd1.intersection(rdd2).collect()
[1, 2]
zip(otherRDD)
zip可以用來產生tuple rdd(左邊rdd的元素為key,右邊rdd的元素為value),但兩個rdd的元素個數必須相同
用於將兩個 RDD 合併成一個新的 RDD,其中每個元素是由兩個原始 RDD 中相對應位置的元素組成的對(tuple)。
錯誤原因是rdd1與rdd2的元素個數不相同
org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition
>>> rdd3 = sc.parallelize(range(5,9))
>>> rdd3
ParallelCollectionRDD[61] at parallelize at PythonRDD.scala:195
>>> rdd3.collect()
[5, 6, 7, 8]
>>> rdd2.collect()
[1, 2, 3, 4]
>>> rdd3.zip(rdd2).collect()
[(5, 1), (6, 2), (7, 3), (8, 4)]
>>>
元素個數不相同,可以使用cartesian笛卡爾乘積。
>>> rdd1.collect()
[1, 2, 5]
>>> rdd2.collect()
[1, 2, 3, 4]
>>> rdd1.cartesian(rdd2).collect()
[(1, 1), (1, 2), (1, 3), (1, 4), (2, 1), (2, 2), (5, 1), (5, 2), (2, 3), (2, 4), (5, 3), (5, 4)]
>>>
Q1.What is the primary goal of data transformation?
To verify that the data originates from a reputable source
To reformat the data for new databases
To change the data into a format useful for business users
(O)To keep only relevant data
Q2.Which of the following statements is true about a directed acyclic graph (DAG)? Select all that apply.
A tabular data structure with rows and columns
(O)A data structure with edges and vertices
(O)A new edge is obtained from an older vertex
(O)In Apache Spark, RDDs are represented by the vertices
Q3.
1.Spark creates a Directed Acyclic Graph (DAG) during the creation of a Resilient Distributed Dataset (RDD).
2.The DAG is associated with the new RDD.
3.If there is an action, the driver program, which invokes calls the action, evaluates the DAG after Spark completes the action.
4.The pointer responsible for transforming the RDD returns to the Spark driver program.
5.Spark utilizes the DAG Scheduler to perform a transformation and updates the DAG accordingly.
Select the sequence that explains RDD transformation and actions.
1 >> 2 >> 3 >> 4 >> 5
(O)1 >> 5 >> 2 >> 4 >> 3
1 >> 3 >> 4 >> 5 >> 2
2 >> 3 >> 4 >> 1 >> 5
This process explains RDD transformation and actions.
Spark creates the DAG when creating an RDD. It then enables the DAG Schedular to perform a transformation and update the DAG. The DAG points to the new RDD. The pointer that transforms RDD is returned to the Spark driver program. If there is an action, the driver program that calls the action evaluates the DAG only after Spark completes the action.
留言
張貼留言