實作PySpark與Pandas程式的數據分析_Spark DataFrame與SparkSQL常見操作


findspark :用於定位 Spark 安裝。
pandas :被導入以進行數據處理。


初始化具有指定應用程式名稱的 Spark Session。
SparkSession.builder.appName("COVID-19 Data Analysis").getOrCreate()

第一階段.檢測Spark Session是否成功啟動

import findspark  # This helps us find and use Apache Spark
findspark.init()  # Initialize findspark to locate Spark
from pyspark.sql import SparkSession  
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DateType
import pandas as pd  
# Initialize a Spark Session
spark = SparkSession \
    .builder \
    .appName("COVID-19 Data Analysis") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \

# Check if the Spark Session is active
if 'spark' in locals() and isinstance(spark, SparkSession):
    print("SparkSession is active and ready to use.")
    print("SparkSession is not active. Please create a SparkSession.")

Data on COVID-19 (coronavirus) by Our World in Data


從提供的網址讀取 COVID-19 數據。
定義一個名為 columns_to_display 的清單,其包含的欄位名稱為:['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']。分別表示洲別、總病例、總死亡、總疫苗接種、人口。

使用 vaccination_data[columns_to_display].head(),我們可以篩選 DataFrame,只顯示指定的欄位,並再次顯示該子集的前五條記錄。


# Read the COVID-19 data from the provided URL
vaccination_data = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/KpHDlIzdtR63BdTofl1mOg/owid-covid-latest.csv')

print("Displaying the first 5 records of the vaccination data:")
columns_to_display = ['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']
# Show the first 5 records

第三階段.將 Pandas DataFrame 轉換為 Spark DataFrame

# Convert to Spark DataFrame directly
# Define the schema
schema = StructType([
    StructField("continent", StringType(), True),
    StructField("total_cases", LongType(), True),
    StructField("total_deaths", LongType(), True),
    StructField("total_vaccinations", LongType(), True),
    StructField("population", LongType(), True)

# Convert the columns to the appropriate data types
vaccination_data['continent'] = vaccination_data['continent'].astype(str)  # Ensures continent is a string
vaccination_data['total_cases'] = vaccination_data['total_cases'].fillna(0).astype('int64')  # Fill NaNs and convert to int
vaccination_data['total_deaths'] = vaccination_data['total_deaths'].fillna(0).astype('int64')  # Fill NaNs and convert to int
vaccination_data['total_vaccinations'] = vaccination_data['total_vaccinations'].fillna(0).astype('int64')  # Fill NaNs and convert to int
vaccination_data['population'] = vaccination_data['population'].fillna(0).astype('int64')  # Fill NaNs and convert to int

spark_df = spark.createDataFrame(vaccination_data[schema.fieldNames()])  # Use only the specified fields
# Show the Spark DataFrame
緊接著要含我們 COVID-19 疫苗接種數據的 Pandas DataFrame 轉換為 Spark DataFrame。
此轉換過程至關重要,因為它攸關我們能否利用 Spark 的分佈式計算能力,使我們能夠處理更大的數據集並以更高效的方式執行操作。

continent 欄位被明確轉換為字串。
數值欄位(總病例、總死亡、總疫苗接種、人口)則用零填充 NaN 值,然後轉換為 int64(這與 Spark 中的 LongType 兼容)。
使用 fillna(0) 可確保在創建 Spark DataFrame 時,NaN 值不會引起類型問題。

可以用 spark_df.printSchema() 來確認Spark DataFrame的schema。

對 Spark DataFrame 進行基本的數探索與局部擷取

print("Schema of the Spark DataFrame:")
# Print the structure of the DataFrame (columns and types)

# List the names of the columns you want to display
columns_to_display = ['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']
# Display the first 5 records of the specified columns

print("Displaying the 'continent' and 'total_cases' columns:")
# Show only the 'continent' and 'total_cases' columns
spark_df.select('continent', 'total_cases').show(5)


print("Filtering records where 'total_cases' is greater than 1,000,000:")
 # Show records with more than 1 million total cases
spark_df.filter(spark_df['total_cases'] > 1000000).show(5) 


from pyspark.sql import functions as F

spark_df_with_percentage = spark_df.withColumn(
    (spark_df['total_deaths'] / spark_df['population']) * 100
spark_df_with_percentage = spark_df_with_percentage.withColumn(
        # Format to 2 decimal places
        F.format_number(spark_df_with_percentage['death_percentage'], 2), 
        # Append the percentage symbol 
columns_to_display = ['total_deaths', 'population', 'death_percentage', 'continent', 'total_vaccinations', 'total_cases']

將創建一個名為 death_percentage 的新列,計算 COVID-19 大流行期間的死亡率。這個計算是基於我們的 Spark DataFrame 中的 total_deaths(死亡人數)和 population(總人口)兩列。這個新指標將提供有價值的見解,幫助了解 COVID-19 在不同地區的影響。

將 death_percentage 橫列的數值格式化為小數點後兩位,使用 F.format_number(),並使用 F.concat() 和 F.lit('%') 來連接百分比符號。

定義一個名為 columns_to_display 的列表,其中含蓋了'total_deaths'、'population'、
'death_percentage'、'continent'、'total_vaccinations' 和 'total_cases'。

最後,通過調用 spark_df_with_percentage.select(columns_to_display).show(5) 來顯示修改後的 DataFrame 的前五條記錄。

print("Calculating the total deaths per continent:")
# Group by continent and sum total death rates
spark_df.groupby(['continent']).agg({"total_deaths": "SUM"}).show()

spark_df.groupby(['continent']) 方法根據洲別列將數據分組。

agg({"total_deaths": "SUM"}) 函數用於指定聚合操作。

PySpark 中的UDF (User-Defined Function)的使用

from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
# Function definition
def convert_total_deaths(total_deaths):
    return total_deaths * 2
# Here you can define any transformation you want
# Register the UDF with Spark
spark.udf.register("convert_total_deaths", convert_total_deaths, IntegerType())

PySpark 中的UDF (User-Defined Function)允許我們創建可應用於 DataFrame 中各個列的自定義函數。此功能提供了更高的靈活性和自訂性,使我們能夠定義通過內置函數無法實現的特定轉換或計算。

spark.udf.register("convert_total_deaths", convert_total_deaths, IntegerType()) 
這行代表將 UDF 註冊到 Spark,表明該函數返回一個整數。
使我們可以在 Spark SQL 查詢和 DataFrame 操作中使用它。

Spark SQL 使我們能夠直接在 DataFrame 上執行 SQL 查詢。

# Drop the existing temporary view if it exists
spark.sql("DROP VIEW IF EXISTS data_v")

# Create a new temporary view

# Execute the SQL query using the UDF
spark.sql('SELECT continent, total_deaths, convert_total_deaths(total_deaths) as converted_total_deaths FROM data_v').show()

#Displaying All Records
spark.sql('SELECT * FROM data_v').show()

convert_total_deaths() 接受一個參數 total_deaths,並返回其值的兩倍。

以下查詢使用 SQL 命令 SELECT * FROM data_v 從臨時檢視數據中檢索所有記錄。
調用 show() 方法以表格格式顯示結果。


這裡使用的SQL命令是 SELECT continent FROM data_v WHERE total_vaccinations > 1000000。show() 方法再次被用來顯示結果,具體列出符合過濾條件的洲。

Q1.Which data sources can be utilized with Apache Spark SQL?

(O)Parquet files
Custom file formats
External APIs

Spark SQL supports reading and writing data from Parquet files while preserving the data schema.

Q2.How can you create a Global Temporary view in Spark SQL?

Use the createTempView function with a “Global” prefix
(O)Use the createGlobalTempView function
Use the createGlobalView function
Use the createView function with a “Global” prefix



