Spark語法_SparkContext與SparkSession的初始化及RDD操作

下載PySpark與findspark的套件
pip install pyspark
pip install findspark

引入套件並初始化Spark Context跟Spark Session

import findspark
findspark.init()
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context. 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

if 'spark' in locals() and isinstance(spark, SparkSession):
    print("SparkSession is active and ready to use.")
else:
    print("SparkSession is not active. Please create a SparkSession.")




透過調用 sc.parallelize() 來創建一個 RDD
創建了一個包含從 1 到 30 的整數的 RDD。
data = range(1,30)
# print first element of iterator
print(data[0])
len(data)
xrangeRDD = sc.parallelize(data, 4)

# this will let us know that we created an RDD
xrangeRDD

Transformations(轉換)
Transformations是一種對 RDD 的操作,會產生一個新的 RDD。
轉換後的 RDD 會快速生成,因為新的 RDD 是懶惰評估的,這意味著在生成新 RDD 時不會進行計算。RDD 將包含一系列轉換或計算指令,這些指令僅在呼叫行動時才會執行。


以下轉換中,將 RDD 中的每個元素減少 1。
這裡透過 lambda 函數的使用。(還會過濾 RDD,使其僅包含小於 10 的元素。)

subRDD = xrangeRDD.map(lambda x: x-1)
filteredRDD = subRDD.filter(lambda x : x<10)

print(filteredRDD.collect())
filteredRDD.count()



創建一個 RDD 並將其緩存
速度提高了 10 倍!

import time 

test = sc.parallelize(range(1,50000),4)
test.cache()

t1 = time.time()
# first count will trigger evaluation of count *and* cache
count1 = test.count()
dt1 = time.time() - t1
print("dt1: ", dt1)


t2 = time.time()
# second count operates on cached data only
count2 = test.count()
dt2 = time.time() - t2
print("dt2: ", dt2)

#test.count()


在 Spark 中創建一個結構化數據集,就可使用強大的 SQL 工具來查詢和聯接您的DataFrames。

# Download the data first into a local `people.json` file
!curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/people.json >> people.json

# Read the dataset into a spark dataframe using the `read.json()` function
df = spark.read.json("people.json").cache()

# Print the dataframe as well as the data schema
df.show()
df.printSchema()

# Register the DataFrame as a SQL temporary view
df.createTempView("people")

# Select and show basic data columns

df.select("name").show()
df.select(df["name"]).show()
spark.sql("SELECT name FROM people").show()

spark sql常用作業

# Perform basic filtering

df.filter(df["age"] > 21).show()
spark.sql("SELECT age, name FROM people WHERE age > 21").show()

# Perfom basic aggregation of data

df.groupBy("age").count().show()
spark.sql("SELECT age, COUNT(age) as count FROM people GROUP BY age").show()


建立一個包含1到50的整數的RDD。
應用轉換將每個數字乘以2,結果是一個包含前50個偶數的RDD。

# starter code
# numbers = range(1, 50)
# numbers_RDD = ...
# even_numbers_RDD = numbers_RDD.map(lambda x: ..)
numbers = range(1, 50) 
numbers_RDD = sc.parallelize(numbers) 
even_numbers_RDD = numbers_RDD.map(lambda x: x * 2)
print( even_numbers_RDD.collect()) 


將 people2.json 文件讀入筆記本,將其加載到數據框中並應用 SQL 操作。
以確定我們 people2 文件中的平均年齡。

# starter code
!curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/people2.json >> people2.json
df = spark.read.json("people2.json").cache()
df.createTempView("people2")
result = spark.sql("SELECT AVG(age) from people2")
result.show()


關閉這個筆記本創建的 SparkSession








留言

這個網誌中的熱門文章

何謂淨重(Net Weight)、皮重(Tare Weight)與毛重(Gross Weight)

經得起原始碼資安弱點掃描的程式設計習慣培養(五)_Missing HSTS Header

(2021年度)駕訓學科筆試準備題庫歸納分析_法規是非題