PySpark_命令與RDD使用(Transformation及Action)及Spark DataFrame使用筆記
Parallel Programming using Resilient Distributed Datasets
- 彈性分散式數據集,也稱為RDD,是Spark的主要數據抽象。
- 彈性分散式數據集是一個容錯元素的集合,分布在集群的節點上,能夠接受並行操作。
- 彈性分散式數據集是不可變的,這意味著一旦創建,這些數據集就無法改變。
- 每個Spark應用程式都包含一個驅動程序,該程序執行用戶的主要功能並在集群上執行多個並行操作。
- RDD支援文本、序列文件、Avro、Parquet和Hadoop輸入格式文件類型。
- RDD還支持本地、Cassandra、HBase、HDFS、Amazon S3等文件格式,以及大量的關係型和NoSQL數據庫。
- 可以使用來自Hadoop支持的文件系統(如HDFS、Cassandra、HBase或Amazon S3)的外部或本地文件創建RDD。
- 將parallelize函數應用於驅動程序中的現有集合。
- Spark為集群的每個分區運行一個任務。 通常,希望每個CPU在集群中有兩到四個分區。
- 在操作過程中將數據集持久化或緩存到內存中。 緩存是容錯的,並且始終可以恢復,因為RDD是不變的。
- 當持久化一個RDD時,每個節點將節點計算的分區存儲在內存中 並在該數據集或從第一個RDD派生的後續數據集上的其他操作中重用相同的分區。
RDD Supported File Types
- Text
- SequenceFiles
- Avro
- Parquet
- Hadoop input formats
RDD Supported File formats
- Local
- Cassandra
- HBase
- HDFS
- Amazon S3
- 或其他
which pyspark
可確認是否有安裝pyspark,並且系統環境變數路徑有配置,可識別到。
可透過pyspark指令先打印當前版本跟進入到命令模式
上傳測試用data.txt到hdfs
hdfs dfs -put data.txt
hdfs dfs -ls /user/cloudera/data.txt
pyspark --help
sc
進到core spark入口
dir(sir)
查看sc能用的所有方法字典
help(sc)則可更詳盡去查閱每個方法用的方式
sc.textFile("/user/cloudera/data.txt")
Lazy延遲加載無實用,上面這個指令並沒有真正執行,需要等到進行action操作時才真正執行。
sc.textFile('文字檔')將檔案內容以\n當作分隔符號,每一筆資料稱為element
RDD1 = sc.textFile("/user/cloudera/data.txt")
讀文字檔內容並賦值、生成RDD1。
textFile將每行對應為一個RDD元素,上述是針對單一檔案作業的操作。
它其實可以針對多個檔案作業
RDD1.collect()
將RDD內容(elements)傳給Spark Driver
collect() 屬於一種 action操作,讀所有資料回來,前綴u代表unciode編碼。
RDD1.count()
將RDD內容的元素個數計算後傳給Spark Driver
dir(RDD1)
查看RDD型態變數,能用的所有方法字典。
>>> type(RDD1)
<class 'pyspark.rdd.RDD'>
>>> type(sc)
<class 'pyspark.context.SparkContext'>
對RDD1的每個record進行函數操作,生成RDD1_upper
RDD1_upper = RDD1.map(lambda x: x.upper())
RDD1_upper.collect()
一般map(transformation)所執行的函數必須事先定義
在python或scala語言中,則可以使用匿名函數(單行函數: lambda function)
以下是先定義具名函數,再將該函數當作transformation變數傳入運行效果。
>>> def f_upper(x):
... return x.upper()
...
>>>
>>> print(f_upper('Hello World'))
HELLO WORLD
>>> print('Hello World'.upper())
HELLO WORLD
>>> RDD_upper2 = RDD1.map(f_upper)
>>> RDD_upper2.collect()
[u'HELLO WORLD', u'HELLO HADOOP', u'HELLO SPARK']
>>>
雖然data.txt僅僅只有一個HDFS block
預設spark cluster還是會將RDD分為2個partition
去Hadoop Web UI查看
透過cat 合併預覽查看HDFS上固定part前綴的拆分的block文字檔內文
hdfs dfs -cat /user/cloudera/dataupper/part-*透過hdfs dfs -getmerge 去從HDFS下指定目錄下所有block拆分檔案給合併載到本地端
並且重命名為dataupper.txt
dataDF.show()
dataDF.select(dataDF.value).show()
hdfs dfs -getmerge /user/cloudera/dataupper dataupper.txt
spark中的DataFrame使用方式
dataDF=spark.read.text('/user/cloudera/data.txt')
dir(spark)
help(spark)
dir(dataDF)
show()是DataFrame才有的action,等同RDD.collect()
單抓value欄位
在Web UI上可去監管、確認資源使用狀況,比方像是CPU、MEMORY。
留言
張貼留言