PySpark_命令與RDD使用(Transformation及Action)及Spark DataFrame使用筆記

 


Parallel Programming using Resilient Distributed Datasets
  • 彈性分散式數據集,也稱為RDD,是Spark的主要數據抽象。
  • 彈性分散式數據集是一個容錯元素的集合,分布在集群的節點上,能夠接受並行操作。
  • 彈性分散式數據集是不可變的,這意味著一旦創建,這些數據集就無法改變。 
  • 每個Spark應用程式都包含一個驅動程序,該程序執行用戶的主要功能並在集群上執行多個並行操作。 
  • RDD支援文本、序列文件、Avro、Parquet和Hadoop輸入格式文件類型。 
  • RDD還支持本地、Cassandra、HBase、HDFS、Amazon S3等文件格式,以及大量的關係型和NoSQL數據庫。 
  • 可以使用來自Hadoop支持的文件系統(如HDFS、Cassandra、HBase或Amazon S3)的外部或本地文件創建RDD。
  • 將parallelize函數應用於驅動程序中的現有集合。
  • Spark為集群的每個分區運行一個任務。 通常,希望每個CPU在集群中有兩到四個分區。
  • 在操作過程中將數據集持久化或緩存到內存中。 緩存是容錯的,並且始終可以恢復,因為RDD是不變的。
  • 當持久化一個RDD時,每個節點將節點計算的分區存儲在內存中 並在該數據集或從第一個RDD派生的後續數據集上的其他操作中重用相同的分區。 
RDD Supported File Types
  • Text
  • SequenceFiles
  • Avro
  • Parquet
  • Hadoop input formats

RDD Supported File formats
  • Local
  • Cassandra
  • HBase
  • HDFS
  • Amazon S3
  • 或其他



which pyspark
可確認是否有安裝pyspark,並且系統環境變數路徑有配置,可識別到。

可透過pyspark指令先打印當前版本跟進入到命令模式

上傳測試用data.txt到hdfs
hdfs dfs -put data.txt
hdfs dfs -ls /user/cloudera/data.txt


去察看幫助文檔
pyspark --help




sc
進到core spark入口

dir(sir)
查看sc能用的所有方法字典
help(sc)
則可更詳盡去查閱每個方法用的方式


sc.textFile("/user/cloudera/data.txt")
Lazy延遲加載無實用,上面這個指令並沒有真正執行,需要等到進行action操作時才真正執行。
sc.textFile('文字檔')將檔案內容以\n當作分隔符號,每一筆資料稱為element


RDD1 = sc.textFile("/user/cloudera/data.txt")
讀文字檔內容並賦值、生成RDD1。
textFile將每行對應為一個RDD元素,上述是針對單一檔案作業的操作。
它其實可以針對多個檔案作業


RDD1.collect()
將RDD內容(elements)傳給Spark Driver
collect() 屬於一種 action操作,讀所有資料回來,前綴u代表unciode編碼。



RDD1.count()
將RDD內容的元素個數計算後傳給Spark Driver

dir(RDD1)
查看RDD型態變數,能用的所有方法字典。


>>> type(RDD1)
<class 'pyspark.rdd.RDD'>
>>> type(sc)
<class 'pyspark.context.SparkContext'>


對RDD1的每個record進行函數操作,生成RDD1_upper
RDD1_upper = RDD1.map(lambda x: x.upper())
RDD1_upper.collect()


一般map(transformation)所執行的函數必須事先定義
在python或scala語言中,則可以使用匿名函數(單行函數: lambda function)


以下是先定義具名函數,再將該函數當作transformation變數傳入運行效果。


>>> def f_upper(x):
...     return x.upper()
...
>>>
>>> print(f_upper('Hello World'))
HELLO WORLD
>>> print('Hello World'.upper())
HELLO WORLD
>>> RDD_upper2 = RDD1.map(f_upper)
>>> RDD_upper2.collect()
[u'HELLO WORLD', u'HELLO HADOOP', u'HELLO SPARK']
>>>

雖然data.txt僅僅只有一個HDFS block
預設spark cluster還是會將RDD分為2個partition






去Hadoop Web UI查看

透過cat 合併預覽查看HDFS上固定part前綴的拆分的block文字檔內文
hdfs dfs -cat /user/cloudera/dataupper/part-*

透過hdfs dfs -getmerge 去從HDFS下指定目錄下所有block拆分檔案給合併載到本地端
並且重命名為dataupper.txt
hdfs dfs -getmerge /user/cloudera/dataupper dataupper.txt



spark中的DataFrame使用方式


dataDF=spark.read.text('/user/cloudera/data.txt')
dir(spark)
help(spark)


dir(dataDF)


dataDF.show()
show()是DataFrame才有的action,等同RDD.collect()

dataDF.select(dataDF.value).show()
單抓value欄位

dataDF.select(upper(dataDF.value)).show()
在value欄位之上套用upper()函數




YARN WEB UI介面上可去看到常駐的PySpark應用程式資源




在Web UI上可去監管、確認資源使用狀況,比方像是CPU、MEMORY。

留言

這個網誌中的熱門文章

何謂淨重(Net Weight)、皮重(Tare Weight)與毛重(Gross Weight)

Architecture(架構) 和 Framework(框架) 有何不同?_軟體設計前的事前規劃的藍圖概念

(2021年度)駕訓學科筆試準備題庫歸納分析_法規是非題