實作PySpark與Pandas程式的數據分析_Spark DataFrame與SparkSQL常見操作

 


findspark :用於定位 Spark 安裝。
pandas :被導入以進行數據處理。

SparkSession對於使用PySpark至關重要。它允許創建DataFrame、加載數據和進行各種操作。


初始化具有指定應用程式名稱的 Spark Session。
SparkSession.builder.appName("COVID-19 Data Analysis").getOrCreate()

第一階段.檢測Spark Session是否成功啟動

import findspark  # This helps us find and use Apache Spark
findspark.init()  # Initialize findspark to locate Spark
from pyspark.sql import SparkSession  
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DateType
import pandas as pd  
# Initialize a Spark Session
spark = SparkSession \
    .builder \
    .appName("COVID-19 Data Analysis") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

# Check if the Spark Session is active
if 'spark' in locals() and isinstance(spark, SparkSession):
    print("SparkSession is active and ready to use.")
else:
    print("SparkSession is not active. Please create a SparkSession.")


Data on COVID-19 (coronavirus) by Our World in Data
https://docs.owid.io/projects/covid/en/latest/dataset.html


https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/KpHDlIzdtR63BdTofl1mOg/owid-covid-latest.csv

從提供的網址讀取 COVID-19 數據。
定義一個名為 columns_to_display 的清單,其包含的欄位名稱為:['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']。分別表示洲別、總病例、總死亡、總疫苗接種、人口。

使用 vaccination_data[columns_to_display].head(),我們可以篩選 DataFrame,只顯示指定的欄位,並再次顯示該子集的前五條記錄。


第二階段.資料讀取暫存並緊抓感興趣欄位

# Read the COVID-19 data from the provided URL
vaccination_data = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/KpHDlIzdtR63BdTofl1mOg/owid-covid-latest.csv')

print("Displaying the first 5 records of the vaccination data:")
columns_to_display = ['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']
# Show the first 5 records
print(vaccination_data[columns_to_display].head())



第三階段.將 Pandas DataFrame 轉換為 Spark DataFrame

# Read the COVID-19 data from the provided URL
vaccination_data = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/KpHDlIzdtR63BdTofl1mOg/owid-covid-latest.csv')

print("Displaying the first 5 records of the vaccination data:")
columns_to_display = ['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']
# Show the first 5 records
print(vaccination_data[columns_to_display].head())

# Convert to Spark DataFrame directly
# Define the schema
schema = StructType([
    StructField("continent", StringType(), True),
    StructField("total_cases", LongType(), True),
    StructField("total_deaths", LongType(), True),
    StructField("total_vaccinations", LongType(), True),
    StructField("population", LongType(), True)
])

# Convert the columns to the appropriate data types
vaccination_data['continent'] = vaccination_data['continent'].astype(str)  # Ensures continent is a string
vaccination_data['total_cases'] = vaccination_data['total_cases'].fillna(0).astype('int64')  # Fill NaNs and convert to int
vaccination_data['total_deaths'] = vaccination_data['total_deaths'].fillna(0).astype('int64')  # Fill NaNs and convert to int
vaccination_data['total_vaccinations'] = vaccination_data['total_vaccinations'].fillna(0).astype('int64')  # Fill NaNs and convert to int
vaccination_data['population'] = vaccination_data['population'].fillna(0).astype('int64')  # Fill NaNs and convert to int

spark_df = spark.createDataFrame(vaccination_data[schema.fieldNames()])  # Use only the specified fields
# Show the Spark DataFrame
spark_df.show()
緊接著要含我們 COVID-19 疫苗接種數據的 Pandas DataFrame 轉換為 Spark DataFrame。
此轉換過程至關重要,因為它攸關我們能否利用 Spark 的分佈式計算能力,使我們能夠處理更大的數據集並以更高效的方式執行操作。


continent 欄位被明確轉換為字串。
數值欄位(總病例、總死亡、總疫苗接種、人口)則用零填充 NaN 值,然後轉換為 int64(這與 Spark 中的 LongType 兼容)。
使用 fillna(0) 可確保在創建 Spark DataFrame 時,NaN 值不會引起類型問題。

可以用 spark_df.printSchema() 來確認Spark DataFrame的schema。


對 Spark DataFrame 進行基本的數探索與局部擷取

print("Schema of the Spark DataFrame:")
spark_df.printSchema()
# Print the structure of the DataFrame (columns and types)

# List the names of the columns you want to display
columns_to_display = ['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']
# Display the first 5 records of the specified columns
spark_df.select(columns_to_display).show(5)

print("Displaying the 'continent' and 'total_cases' columns:")
# Show only the 'continent' and 'total_cases' columns
spark_df.select('continent', 'total_cases').show(5)


根據特定條件過濾記錄

print("Filtering records where 'total_cases' is greater than 1,000,000:")
 # Show records with more than 1 million total cases
spark_df.filter(spark_df['total_cases'] > 1000000).show(5) 

各洲地區死亡率

from pyspark.sql import functions as F

spark_df_with_percentage = spark_df.withColumn(
    'death_percentage', 
    (spark_df['total_deaths'] / spark_df['population']) * 100
)
spark_df_with_percentage = spark_df_with_percentage.withColumn(
    'death_percentage',
    F.concat(
        # Format to 2 decimal places
        F.format_number(spark_df_with_percentage['death_percentage'], 2), 
        # Append the percentage symbol 
        F.lit('%')  
    )
)
columns_to_display = ['total_deaths', 'population', 'death_percentage', 'continent', 'total_vaccinations', 'total_cases']
spark_df_with_percentage.select(columns_to_display).show(5)


將創建一個名為 death_percentage 的新列,計算 COVID-19 大流行期間的死亡率。這個計算是基於我們的 Spark DataFrame 中的 total_deaths(死亡人數)和 population(總人口)兩列。這個新指標將提供有價值的見解,幫助了解 COVID-19 在不同地區的影響。



將 death_percentage 橫列的數值格式化為小數點後兩位,使用 F.format_number(),並使用 F.concat() 和 F.lit('%') 來連接百分比符號。

定義一個名為 columns_to_display 的列表,其中含蓋了'total_deaths'、'population'、
'death_percentage'、'continent'、'total_vaccinations' 和 'total_cases'。

最後,通過調用 spark_df_with_percentage.select(columns_to_display).show(5) 來顯示修改後的 DataFrame 的前五條記錄。



分組與彙總-計算每個洲的總死亡人數
print("Calculating the total deaths per continent:")
# Group by continent and sum total death rates
spark_df.groupby(['continent']).agg({"total_deaths": "SUM"}).show()

spark_df.groupby(['continent']) 方法根據洲別列將數據分組。
代表與每個洲相關的所有記錄將被分同一組在一起。

agg({"total_deaths": "SUM"}) 函數用於指定聚合操作。
在這個例子中,我們想要計算每個洲的死亡人數總和。


PySpark 中的UDF (User-Defined Function)的使用

from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
# Function definition
def convert_total_deaths(total_deaths):
    return total_deaths * 2
# Here you can define any transformation you want
# Register the UDF with Spark
spark.udf.register("convert_total_deaths", convert_total_deaths, IntegerType())


PySpark 中的UDF (User-Defined Function)允許我們創建可應用於 DataFrame 中各個列的自定義函數。此功能提供了更高的靈活性和自訂性,使我們能夠定義通過內置函數無法實現的特定轉換或計算。


spark.udf.register("convert_total_deaths", convert_total_deaths, IntegerType()) 
這行代表將 UDF 註冊到 Spark,表明該函數返回一個整數。
使我們可以在 Spark SQL 查詢和 DataFrame 操作中使用它。


Spark SQL 使我們能夠直接在 DataFrame 上執行 SQL 查詢。

# Drop the existing temporary view if it exists
spark.sql("DROP VIEW IF EXISTS data_v")

# Create a new temporary view
spark_df.createTempView('data_v')

# Execute the SQL query using the UDF
spark.sql('SELECT continent, total_deaths, convert_total_deaths(total_deaths) as converted_total_deaths FROM data_v').show()

#Displaying All Records
spark.sql('SELECT * FROM data_v').show()

以下是撈取臨時檢視過程中,順帶去使用convert_total_deaths的UDF。
convert_total_deaths() 接受一個參數 total_deaths,並返回其值的兩倍。

以下查詢使用 SQL 命令 SELECT * FROM data_v 從臨時檢視數據中檢索所有記錄。
調用 show() 方法以表格格式顯示結果。

過濾數據集只顯示那些總疫苗接種量超過一百萬的洲。


這裡使用的SQL命令是 SELECT continent FROM data_v WHERE total_vaccinations > 1000000。show() 方法再次被用來顯示結果,具體列出符合過濾條件的洲。


Q1.Which data sources can be utilized with Apache Spark SQL?

MongoDB
(O)Parquet files
Custom file formats
External APIs

Spark SQL supports reading and writing data from Parquet files while preserving the data schema.


Q2.How can you create a Global Temporary view in Spark SQL?

Use the createTempView function with a “Global” prefix
(O)Use the createGlobalTempView function
Use the createGlobalView function
Use the createView function with a “Global” prefix





留言

這個網誌中的熱門文章

何謂淨重(Net Weight)、皮重(Tare Weight)與毛重(Gross Weight)

經得起原始碼資安弱點掃描的程式設計習慣培養(五)_Missing HSTS Header

(2021年度)駕訓學科筆試準備題庫歸納分析_法規是非題