實作PySpark與Pandas程式的數據分析_Spark DataFrame與SparkSQL常見操作
findspark :用於定位 Spark 安裝。
pandas :被導入以進行數據處理。
SparkSession對於使用PySpark至關重要。它允許創建DataFrame、加載數據和進行各種操作。
SparkSession.builder.appName("COVID-19 Data Analysis").getOrCreate()
第一階段.檢測Spark Session是否成功啟動
import findspark # This helps us find and use Apache Spark findspark.init() # Initialize findspark to locate Spark from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DateType import pandas as pd # Initialize a Spark Session spark = SparkSession \ .builder \ .appName("COVID-19 Data Analysis") \ .config("spark.sql.execution.arrow.pyspark.enabled", "true") \ .getOrCreate() # Check if the Spark Session is active if 'spark' in locals() and isinstance(spark, SparkSession): print("SparkSession is active and ready to use.") else: print("SparkSession is not active. Please create a SparkSession.")
Data on COVID-19 (coronavirus) by Our World in Data
https://docs.owid.io/projects/covid/en/latest/dataset.htmlhttps://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/KpHDlIzdtR63BdTofl1mOg/owid-covid-latest.csv
從提供的網址讀取 COVID-19 數據。
定義一個名為 columns_to_display 的清單,其包含的欄位名稱為:['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']。分別表示洲別、總病例、總死亡、總疫苗接種、人口。
使用 vaccination_data[columns_to_display].head(),我們可以篩選 DataFrame,只顯示指定的欄位,並再次顯示該子集的前五條記錄。
第二階段.資料讀取暫存並緊抓感興趣欄位
# Read the COVID-19 data from the provided URL vaccination_data = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/KpHDlIzdtR63BdTofl1mOg/owid-covid-latest.csv') print("Displaying the first 5 records of the vaccination data:") columns_to_display = ['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population'] # Show the first 5 records print(vaccination_data[columns_to_display].head())
# Read the COVID-19 data from the provided URL vaccination_data = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/KpHDlIzdtR63BdTofl1mOg/owid-covid-latest.csv') print("Displaying the first 5 records of the vaccination data:") columns_to_display = ['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population'] # Show the first 5 records print(vaccination_data[columns_to_display].head()) # Convert to Spark DataFrame directly # Define the schema schema = StructType([ StructField("continent", StringType(), True), StructField("total_cases", LongType(), True), StructField("total_deaths", LongType(), True), StructField("total_vaccinations", LongType(), True), StructField("population", LongType(), True) ]) # Convert the columns to the appropriate data types vaccination_data['continent'] = vaccination_data['continent'].astype(str) # Ensures continent is a string vaccination_data['total_cases'] = vaccination_data['total_cases'].fillna(0).astype('int64') # Fill NaNs and convert to int vaccination_data['total_deaths'] = vaccination_data['total_deaths'].fillna(0).astype('int64') # Fill NaNs and convert to int vaccination_data['total_vaccinations'] = vaccination_data['total_vaccinations'].fillna(0).astype('int64') # Fill NaNs and convert to int vaccination_data['population'] = vaccination_data['population'].fillna(0).astype('int64') # Fill NaNs and convert to int spark_df = spark.createDataFrame(vaccination_data[schema.fieldNames()]) # Use only the specified fields # Show the Spark DataFrame spark_df.show()
緊接著要含我們 COVID-19 疫苗接種數據的 Pandas DataFrame 轉換為 Spark DataFrame。
此轉換過程至關重要,因為它攸關我們能否利用 Spark 的分佈式計算能力,使我們能夠處理更大的數據集並以更高效的方式執行操作。
continent 欄位被明確轉換為字串。
數值欄位(總病例、總死亡、總疫苗接種、人口)則用零填充 NaN 值,然後轉換為 int64(這與 Spark 中的 LongType 兼容)。
使用 fillna(0) 可確保在創建 Spark DataFrame 時,NaN 值不會引起類型問題。
可以用 spark_df.printSchema() 來確認Spark DataFrame的schema。
print("Schema of the Spark DataFrame:") spark_df.printSchema() # Print the structure of the DataFrame (columns and types) # List the names of the columns you want to display columns_to_display = ['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population'] # Display the first 5 records of the specified columns spark_df.select(columns_to_display).show(5) print("Displaying the 'continent' and 'total_cases' columns:") # Show only the 'continent' and 'total_cases' columns spark_df.select('continent', 'total_cases').show(5)
print("Filtering records where 'total_cases' is greater than 1,000,000:") # Show records with more than 1 million total cases spark_df.filter(spark_df['total_cases'] > 1000000).show(5)
from pyspark.sql import functions as F spark_df_with_percentage = spark_df.withColumn( 'death_percentage', (spark_df['total_deaths'] / spark_df['population']) * 100 ) spark_df_with_percentage = spark_df_with_percentage.withColumn( 'death_percentage', F.concat( # Format to 2 decimal places F.format_number(spark_df_with_percentage['death_percentage'], 2), # Append the percentage symbol F.lit('%') ) ) columns_to_display = ['total_deaths', 'population', 'death_percentage', 'continent', 'total_vaccinations', 'total_cases'] spark_df_with_percentage.select(columns_to_display).show(5)
將創建一個名為 death_percentage 的新列,計算 COVID-19 大流行期間的死亡率。這個計算是基於我們的 Spark DataFrame 中的 total_deaths(死亡人數)和 population(總人口)兩列。這個新指標將提供有價值的見解,幫助了解 COVID-19 在不同地區的影響。
定義一個名為 columns_to_display 的列表,其中含蓋了'total_deaths'、'population'、
'death_percentage'、'continent'、'total_vaccinations' 和 'total_cases'。
最後,通過調用 spark_df_with_percentage.select(columns_to_display).show(5) 來顯示修改後的 DataFrame 的前五條記錄。
分組與彙總-計算每個洲的總死亡人數
print("Calculating the total deaths per continent:") # Group by continent and sum total death rates spark_df.groupby(['continent']).agg({"total_deaths": "SUM"}).show()
spark_df.groupby(['continent']) 方法根據洲別列將數據分組。
代表與每個洲相關的所有記錄將被分同一組在一起。
agg({"total_deaths": "SUM"}) 函數用於指定聚合操作。
在這個例子中,我們想要計算每個洲的死亡人數總和。
PySpark 中的UDF (User-Defined Function)的使用
from pyspark.sql import SparkSession from pyspark.sql.types import IntegerType # Function definition def convert_total_deaths(total_deaths): return total_deaths * 2 # Here you can define any transformation you want # Register the UDF with Spark spark.udf.register("convert_total_deaths", convert_total_deaths, IntegerType())
PySpark 中的UDF (User-Defined Function)允許我們創建可應用於 DataFrame 中各個列的自定義函數。此功能提供了更高的靈活性和自訂性,使我們能夠定義通過內置函數無法實現的特定轉換或計算。
spark.udf.register("convert_total_deaths", convert_total_deaths, IntegerType())
這行代表將 UDF 註冊到 Spark,表明該函數返回一個整數。
使我們可以在 Spark SQL 查詢和 DataFrame 操作中使用它。
Spark SQL 使我們能夠直接在 DataFrame 上執行 SQL 查詢。
# Drop the existing temporary view if it exists spark.sql("DROP VIEW IF EXISTS data_v") # Create a new temporary view spark_df.createTempView('data_v') # Execute the SQL query using the UDF spark.sql('SELECT continent, total_deaths, convert_total_deaths(total_deaths) as converted_total_deaths FROM data_v').show() #Displaying All Records spark.sql('SELECT * FROM data_v').show()
以下是撈取臨時檢視過程中,順帶去使用convert_total_deaths的UDF。
convert_total_deaths() 接受一個參數 total_deaths,並返回其值的兩倍。
這裡使用的SQL命令是 SELECT continent FROM data_v WHERE total_vaccinations > 1000000。show() 方法再次被用來顯示結果,具體列出符合過濾條件的洲。
Q1.Which data sources can be utilized with Apache Spark SQL?
MongoDB
(O)Parquet files
Custom file formats
External APIs
Spark SQL supports reading and writing data from Parquet files while preserving the data schema.
Q2.How can you create a Global Temporary view in Spark SQL?
Use the createTempView function with a “Global” prefix
(O)Use the createGlobalTempView function
Use the createGlobalView function
Use the createView function with a “Global” prefix
留言
張貼留言