PySpark_命令與RDD使用(Transformation及Action)及Spark DataFrame使用筆記
可透過pyspark指令先打印當前版本跟進入到命令模式
上傳測試用data.txt到hdfs
hdfs dfs -put data.txt
hdfs dfs -ls /user/cloudera/data.txt
pyspark --help
sc
進到core spark入口
dir(sir)
查看sc能用的所有方法字典
help(sc)則可更詳盡去查閱每個方法用的方式
sc.textFile("/user/cloudera/data.txt")
Lazy延遲加載無實用,上面這個指令並沒有真正執行,需要等到進行action操作時才真正執行。
sc.textFile('文字檔')將檔案內容以\n當作分隔符號,每一筆資料稱為element
RDD1 = sc.textFile("/user/cloudera/data.txt")
讀文字檔內容並賦值、生成RDD1。
textFile將每行對應為一個RDD元素,上述是針對單一檔案作業的操作。
它其實可以針對多個檔案作業
RDD1.collect()
將RDD內容(elements)傳給Spark Driver
collect() 屬於一種 action操作,讀所有資料回來,前綴u代表unciode編碼。
RDD1.count()
將RDD內容的元素個數計算後傳給Spark Driver
dir(RDD1)
查看RDD型態變數,能用的所有方法字典。
>>> type(RDD1)
<class 'pyspark.rdd.RDD'>
>>> type(sc)
<class 'pyspark.context.SparkContext'>
對RDD1的每個record進行函數操作,生成RDD1_upper
RDD1_upper = RDD1.map(lambda x: x.upper())
RDD1_upper.collect()
一般map(transformation)所執行的函數必須事先定義
在python或scala語言中,則可以使用匿名函數(單行函數: lambda function)
以下是先定義具名函數,再將該函數當作transformation變數傳入運行效果。
>>> def f_upper(x):
... return x.upper()
...
>>>
>>> print(f_upper('Hello World'))
HELLO WORLD
>>> print('Hello World'.upper())
HELLO WORLD
>>> RDD_upper2 = RDD1.map(f_upper)
>>> RDD_upper2.collect()
[u'HELLO WORLD', u'HELLO HADOOP', u'HELLO SPARK']
>>>
雖然data.txt僅僅只有一個HDFS block
預設spark cluster還是會將RDD分為2個partition
去Hadoop Web UI查看
透過cat 合併預覽查看HDFS上固定part前綴的拆分的block文字檔內文
hdfs dfs -cat /user/cloudera/dataupper/part-*透過hdfs dfs -getmerge 去從HDFS下指定目錄下所有block拆分檔案給合併載到本地端
並且重命名為dataupper.txt
dataDF.show()
dataDF.select(dataDF.value).show()
hdfs dfs -getmerge /user/cloudera/dataupper dataupper.txt
spark中的DataFrame使用方式
dataDF=spark.read.text('/user/cloudera/data.txt')
dir(spark)
help(spark)
dir(dataDF)
show()是DataFrame才有的action,等同RDD.collect()
單抓value欄位
在Web UI上可去監管、確認資源使用狀況,比方像是CPU、MEMORY。
留言
張貼留言