Apache Spark筆記03_Dataframes跟SparkSQL介紹及常見操作

 
https://rharshad.com/spark-sql-dataframes-datasets/#dataframes


捨麼是SparkSQL?
  • Spark SQL 是一個用於結構化數據處理的 Spark 模組。
  • 用於在 Spark DataFrames 上運行 SQL 查詢,並提供 Java、 Scala、Python 和 R 的 API。 
  • 可以使用 SQL 查詢和透過DataFrame API 與 Spark SQL 交互。
  • 有別於Spark RDD API,Spark SQL 包括基於成本的優化器(cost-based optimizer)、欄位式儲存(columnar storage)和代碼生成(code generation)。為 Spark 提供有關數據結構以及正在進行計算的優化。
  • Spark SQL 支持臨時和全局臨時表格視圖。
    (臨時視圖具有局部作用域。 局部作用域意味著視圖僅在當前 Spark 會話中的當前節點內存在。然而,全局臨時視圖存在於一般的 Spark 應用程序中。全局臨時視圖可以在不同的 Spark 會話間共享。)
  • SparkSQL支持的資料來源格式:
    Parquet:是許多數據處理系統支持的列式格式。
    JSON 數據:可以通過推斷模式加載和寫入 JSON 數據。
    Hive 中的表數據:支持讀取和寫入存儲在 Hive 中的數據。


Spark-SQL-Optimization
https://medium.com/bryanyang0528/%E7%B0%A1%E5%96%AE%E4%BB%8B%E7%B4%B9-sparksql-77dd47c80bc1

使用SparkSQL的好處?
  • Spark SQL 支援 Java、Scala、Python 和 R這類程式語言的API。
  • Spark SQL 使用相同的執行引擎來計算結果,與用於計算的 API 或語言無關。開發人員可以使用提供最自然方式來表示給定轉換的 API。



什麼是 DataFrame?
  • DataFrame 是組織到命名欄位(named columns)中的數據集合。
  • DataFrames 在概念上等同於關係資料庫中的表。
  • DataFrames 構建在 Spark SQL 之上RDD API,DataFrames 使用 RDD 來執行關係查詢。
  • 類似於 R/Python 中的DataFrame,但具有更豐富的優化。


    https://juejin.cn/post/7109093694090772510


和RDD之間差異
  • RDD 的計算引擎是Spark Core,至於DataFrame 背後的最佳化引擎是Spark SQL。
  • RDD 是不含Schema 的分散式資料集。因此無從知道每個元素的內部欄位資訊。DataFrame 則是涵蓋Data Schema的結構化分散式資料集。 DataFrame 卻提供了詳細的結構信息,使得Spark SQL 可以清楚知道該資料集中包含哪些列,每列的名稱和類型各是什麼。






使用 DataFrame 的好處?

  • DataFrame 具有高度可擴充性 (從單台筆記型電腦上的幾 KB 開始擴展到 PB 級。)
  • DataFrames 支援多種數據格式和存儲系統。
  • DataFrames 通過 Catalyst 優化器提供優化和代碼生成。
  • DataFrames 對開發人員友好,通過 Spark 提供與大多數大數據工具的整合,以及用於 Python、Java、Scala 和 R 的 API。



  • Dataframe是二維的。欄位可以是不同的資料型別。
  • Dataframe接受多種資料輸入,包括序列和其他Dataframe。
  • 可以傳遞索引(列標籤/row label)和欄位(欄位標籤column labels)。
  • 索引可以是數字、日期或strings , tuples。
Dataframe操作實操

起手式基本上還是做相關package的下載與引入
在以下測試會繼續用到pyspark , findspark , pandas等套件
# Installing required packages
!pip install pyspark
!pip install findspark
!pip install pandas

引入套件

import findspark
findspark.init()
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession



建立並初始化 spark session,以載入和操作dataframe

import findspark
findspark.init()
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession


# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

spark





將資料檔案載入Dataframe
將 CSV 檔案讀入 Pandas 的Dataframe,然後再將其讀入 Spark Dataframe。

# Read the file using `read_csv` function in pandas
mtcars = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/mtcars.csv')

# Preview a few records
mtcars.head()

# We use the `createDataFrame` function to load the data into a spark dataframe
sdf = spark.createDataFrame(mtcars) 

# Let us look at the schema of the loaded spark dataframe
sdf.printSchema()

Pandas 是用於資料處理和分析的函式庫。
Pandas 提供了用於建立和操作資料序列和資料框物件的資料結構和操作。
資料可以從各種資料源匯入,例如 Numpy 陣列、Python 字典和 CSV 檔案。Pandas 允許您操作、組織和顯示資料。





檢視Dataframe的資料架構
要建立 Spark 資料框,需載入一個名為 mtcars 的外部dataframe。
這個資料框包含 32 個觀察值和 11 個變數
  1. mpg:每加侖英里數,用於衡量汽車的燃油效率。
  2. cyl:汽缸數,表示該汽車引擎的汽缸數量。
  3. disp:排氣量,以立方英寸(cu.in.)為單位,顯示引擎的總體積。
  4. hp:馬力,指汽車引擎的動力輸出。
  5. drat:後軸比,是後軸的齒輪比,影響汽車的加速性和燃油經濟性。
  6. wt:重量,以千磅(lb/1000)為單位,表示汽車的總重量。
  7. qsec:1/4英里時間,測量汽車加速到一定速度所需的時間(秒)。
  8. vs:V/S,可能表示引擎的配置或其他特定特性。
  9. am:傳動方式,0 代表自動擋,1 代表手動擋。
  10. gear:前進擋數,顯示汽車有多少個前進擋位。
  11. carb:化油器數量,表示汽車引擎中化油器的數目。



執行Dataframe基本資料處理

Dataframe常見基礎操作測試程式

sdf.show(5)

sdf.select('mpg').show(5)

sdf.filter(sdf['mpg'] < 18).show(5)

sdf.withColumn('wtTon', sdf['wt'] * 0.45).show(5)

sdf_new = sdf.withColumnRenamed("vs", "versus")

sdf.where(sdf['mpg'] < 18).show(3) 



# define sample DataFrame 1 
data = [("A101", "John"), ("A102", "Peter"), ("A103", "Charlie")] 
columns = ["emp_id", "emp_name"] 
dataframe_1 = spark.createDataFrame(data, columns) 


# define sample DataFrame 2 
data = [("A101", 1000), ("A102", 2000), ("A103", 3000)]
columns = ["emp_id", "salary"]
dataframe_2 = spark.createDataFrame(data, columns)

# create a new DataFrame, "combined_df" by performing an inner join
combined_df = dataframe_1.join(dataframe_2, on="emp_id", how="inner")



# define sample DataFrame 1
data = [("A101", 1000), ("A102", 2000), ("A103",None)]
columns = ["emp_id", "salary"]
dataframe_1 = spark.createDataFrame(data, columns)


# fill missing salary value with a specified value 
filled_df = dataframe_1.fillna({"salary": 3000}) 
filled_df.head(3)

使用 show() 方法來實現預覽前5條記錄。這邊用法與 Pandas 中類似。
一樣透過head() 函數進行。


select('特定欄位名')
使用 select() 函數來選擇特定只擷取某欄位資料列,這邊擷取mpg欄位的前五筆資料。


filter(條件) 過濾篩查
使用 filter() 函數來實現,篩選出僅保留 mpg > 18 的列。


Spark 還提供了許多可以直接應用於欄位的函數,用於數據處理和聚合。

withColumn('新欄位名稱' , 欄位值定義或公式計算) 用來創建新欄位
下面的例子顯示了使用基本算術函數將重量值從磅轉換為公噸
這邊創建了一個名為 wtTon 的新欄位,該欄位的重量來自 wt 欄位,並轉換為公噸。



將現有的欄位名 "vs" 重命名為 "versus",並將新的結果Dataframe指派給變數 "sdf_new"。
函數 "withColumnRenamed()" 重新命名現有的欄位名稱。

上述函數的執行不會修改原始的 DataFrame "sdf"
相反,一個名為 "sdf_new" 的新 DataFrame 被創建並且包含重命名的欄位。

使用函數 "where()" 根據給定的條件過濾 Dataframe 欄位。
根據特定條件合併Dataframe,函數 "join()" 根據特定條件結合Dataframe。
以下是透過inner join方式來將dataframe_1跟dataframe_2取交集。

填補缺失值("fillna()" 或 "fill()" 函數用指定的值填補缺失值。)
注意到dataframe的 "dataframe_1" 的第三條記錄中。
"salary" 列包含空值("na")。可以使用 "fillna()" 函數填充該值。





彙總Dataframe中的資料
分組與聚合(Grouping & Aggregation)

sdf.groupby(['cyl'])\
.agg({"wt": "AVG"})\
.show(5)



car_counts = sdf.groupby(['cyl'])\
.agg({"wt": "count"})\
.sort("count(wt)", ascending=False)\
.show(5)


Spark DataFrames 支援多種常用函數,以在分組後彙總數據。
以下我們計算按照汽缸數的平均汽車重量。



我們還可以對聚合的輸出進行排序,以獲得前五名最常見的汽車。



驗收練習

顯示所有至少有 5 個氣缸的車輛的前 5 筆。
sdf.filter(sdf["cyl"] >= 5).show(5)




使用上面顯示的函數和表格,打印出我們資料庫中汽車的平均重量(以公噸為單位)。
已知欄位 wt 的單位是 千磅,即每個單位代表 1000 磅。

# 計算平均重量並轉換為公噸
average_weight_metric_tons = sdf.selectExpr("avg(wt) * 1000 / 2204.62 as average_weight_metric_tons")
# 顯示結果
average_weight_metric_tons.show()
使用 selectExpr 方法,執行 SQL 表達式。
在此計算了 wt 欄位的平均值,並將其乘以 1000 從千磅轉換成磅,再除以 2204.62 從磅轉換成公噸。最終結果以 "average_weight_metric_tons" 的別名顯示,正確計算並顯示了資料庫中所有車輛的平均重量。



附加新的欄位
sdf.withColumn('kmpl', sdf['mpg'] * 0.425).sort('mpg', ascending=False).show()
使用 0.425 的轉換因子創建一個新的里程數轉換:
一個新的公里每升(kmpl)行駛里程列,而不是每加侖英里(mpg)。
另外,按每公升公里數(kmpl)遞減的順序排列輸出。


SparkSQL臨時視圖(temp view)

createTempView函數
如何使用 PySpark 在 Python 中創建臨時視圖?
首先從 JSON 文件創建 DataFrame,然後創建一個名為“people”的臨時視圖。 
然後,可以使用此視圖運行 SQL 查詢。 


創建一個全域臨時視圖。 
createGlobalTempView函數
注意語法的微小變化,包括函數名稱前的“Global”前綴 以及視圖名稱前的“global temp”前綴。 



SparkSQL操作實操

起手式基本上還是做相關package的下載與引入
# Installing required packages
!pip install pyspark
!pip install findspark
!pip install pyarrow==0.14.1 
!pip install pandas
!pip install numpy==1.19.5


SparkSQL建立Table View並做相關SQL存取操作
視圖是一個臨時表,用於運行 SQL 查詢。
臨時視圖在當前 Spark 會話中提供本地範圍。
在這個例子中,我們使用 createTempView() 函數創建一個臨時視圖。

import findspark
findspark.init()

import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# Read the file using `read_csv` function in pandas
mtcars = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/mtcars.csv')

# Preview a few records
mtcars.head()

mtcars.rename( columns={'Unnamed: 0':'name'}, inplace=True )
mtcars.head()
#Loading data into a Spark DataFrame
sdf = spark.createDataFrame(mtcars) 
sdf.printSchema()

#Rename the existing column name "vs" to "versus" and assign the new result DataFrame to a variable, "sdf_new"

sdf_new = sdf.withColumnRenamed("vs", "versus")
#View the new dataframe
sdf_new.head(5)

#Create a Table View
sdf.createTempView("cars")

# Showing the whole table
spark.sql("SELECT * FROM cars").show()

# Showing a specific column
spark.sql("SELECT mpg FROM cars").show(5)

# Basic filtering query to determine cars that have a high mileage and low cylinder count
spark.sql("SELECT * FROM cars where mpg>20 AND cyl < 6").show(5)

# Use where method to get list of cars that have miles per gallon is less than 18
sdf.where(sdf['mpg'] < 18).show(3) 

# Aggregating data and grouping by cylinders
spark.sql("SELECT count(*), cyl from cars GROUP BY cyl").show()

一旦我們有了表格視圖,可以運行類似於查詢 SQL 表的查詢。 
執行的操作與 DataFrames 中的操作類似。





建立一個 Pandas UDF 來套用到column層級操作

  • 為了讓數據科學家能夠利用大數據的價值,Spark 在 0.7 版本中添加了 Python API,支持用戶自定義函數(UDF)。
  • 這些用戶自定義函數一次只對一row操作,因此會遭受高序列化和調用開銷。
  • 因此,許多數據管道在 Java 和 Scala 中定義 UDF,然後從 Python 中調用它們。
  • 根據Apache Arrow 的 Pandas UDF 帶來了「兩全其美」的能力,完全用 Python 定義低開銷、高性能的 UDF。
  • 在如下示範中,將構建一個標量 Pandas UDF,將 wT 列從英制單位(1000 磅)轉換為公制單位(公噸)。
  • 此外,可以通過使用 @pandas_udf() decorator註冊常規 Python 函數,隨時在 SQL 中註冊和調用 UDF。我們然後可以將這個 UDF 應用到我們的 wt 列。
Pandas UDF註冊實操

# import the Pandas UDF function 
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf("float")
def convert_wt(s: pd.Series) -> pd.Series:
    # The formula for converting from imperial to metric tons
    return s * 0.45

spark.udf.register("convert_weight", convert_wt)

spark.sql("SELECT *, wt AS weight_imperial, convert_weight(wt) as weight_metric FROM cars").show()
現在可以將 convert_weight 用戶自定義函數應用於汽車表格視圖中的 wt 欄位。
可以通過以下顯示的 SQL 查詢簡單地完成。


根據特定條件合併Dataframe操作

# define sample DataFrame 1 

data = [("A101", "John"), ("A102", "Peter"), ("A103", "Charlie")] 

columns = ["emp_id", "emp_name"]

dataframe_1 = spark.createDataFrame(data, columns)

# define sample DataFrame 2

data = [("A101", 3250), ("A102", 6735), ("A103", 8650)] 

columns = ["emp_id", "salary"] 

dataframe_2 = spark.createDataFrame(data, columns) 

# create a new DataFrame, "combined_df" by performing an inner join 

combined_df = dataframe_1.join(dataframe_2, on="emp_id", how="inner") 

# Show the data in combined_df as a list of Row.

combined_df.collect()


填補缺失值

# define sample DataFrame 1 with some missing values

data = [("A101", 1000), ("A102", 2000), ("A103",None)]

columns = ["emp_id", "salary"]

dataframe_1 = spark.createDataFrame(data, columns)
dataframe_1.head(3)


# fill missing salary value with a specified value

filled_df = dataframe_1.fillna({"salary": 3000})
filled_df.head(3)







留言

這個網誌中的熱門文章

何謂淨重(Net Weight)、皮重(Tare Weight)與毛重(Gross Weight)

經得起原始碼資安弱點掃描的程式設計習慣培養(五)_Missing HSTS Header

(2021年度)駕訓學科筆試準備題庫歸納分析_法規是非題