在 PySpark 中為 CSV 檔案進行user-defined schema的步驟
引入所需的庫。
from pyspark.sql.types import StructType, IntegerType, FloatType, StringType, StructField
定義schema架構
比方我們有一個employee.csv檔案,內容如下:
emp_id,emp_name,dept,salary,phone A101,jhon,computer science,1000,+1 (701) 846 958 A102,Peter,Electronics,2000, A103,Micheal,IT,2500,
在Spark中使用 『StructType』 類,為每個欄位創建一個 『StructField』,並提及欄位名稱、數據類型和其他屬性。
“False”表示該列不允許使用 null 值。
from pyspark.sql.types import StructType, IntegerType, FloatType, StringType, StructField schema = StructType([ StructField("Emp_Id", StringType(), False), StructField("Emp_Name", StringType(), False), StructField("Department", StringType(), False), StructField("Salary", IntegerType(), False), StructField("Phone", IntegerType(), True), ])
Step3.使用使用者定義的架構讀取輸入檔。
#create a dataframe on top a csv file df = (spark.read .format("csv") .schema(schema) .option("header", "true") .load("employee.csv") ) # display the dataframe content df.show()
Step4.使用 Spark中的printSchema()顯示 DataFrame 的架構,並確保該架構正確應用於數據。
#create a dataframe on top a csv file df = (spark.read .format("csv") .schema(schema) .option("header", "true") .load("employee.csv") ) df.printSchema()
Ref:
https://blog.devgenius.io/mastering-file-formats-in-pyspark-a-comprehensive-guide-aca0de4ca6fd
https://www.learntospark.com/2020/01/define-schema-to-spark-dataframe.html
https://python.plainenglish.io/how-to-read-and-write-json-data-in-pyspark-2bf638c20eec
https://macxima.medium.com/pyspark-read-csv-file-into-dataframe-6cef1f0edfdc
留言
張貼留言