Spark語法_SparkContext與SparkSession的初始化及RDD操作
下載PySpark與findspark的套件
pip install pyspark
pip install findspark
import findspark findspark.init() # PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context. from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession # Creating a spark context class sc = SparkContext() # Creating a spark session spark = SparkSession \ .builder \ .appName("Python Spark DataFrames basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() if 'spark' in locals() and isinstance(spark, SparkSession): print("SparkSession is active and ready to use.") else: print("SparkSession is not active. Please create a SparkSession.")
透過調用 sc.parallelize() 來創建一個 RDD
創建了一個包含從 1 到 30 的整數的 RDD。
data = range(1,30) # print first element of iterator print(data[0]) len(data) xrangeRDD = sc.parallelize(data, 4) # this will let us know that we created an RDD xrangeRDD
Transformations是一種對 RDD 的操作,會產生一個新的 RDD。
轉換後的 RDD 會快速生成,因為新的 RDD 是懶惰評估的,這意味著在生成新 RDD 時不會進行計算。RDD 將包含一系列轉換或計算指令,這些指令僅在呼叫行動時才會執行。
以下轉換中,將 RDD 中的每個元素減少 1。
這裡透過 lambda 函數的使用。(還會過濾 RDD,使其僅包含小於 10 的元素。)
subRDD = xrangeRDD.map(lambda x: x-1) filteredRDD = subRDD.filter(lambda x : x<10) print(filteredRDD.collect()) filteredRDD.count()
創建一個 RDD 並將其緩存
速度提高了 10 倍!
import time test = sc.parallelize(range(1,50000),4) test.cache() t1 = time.time() # first count will trigger evaluation of count *and* cache count1 = test.count() dt1 = time.time() - t1 print("dt1: ", dt1) t2 = time.time() # second count operates on cached data only count2 = test.count() dt2 = time.time() - t2 print("dt2: ", dt2) #test.count()
# Download the data first into a local `people.json` file !curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/people.json >> people.json # Read the dataset into a spark dataframe using the `read.json()` function df = spark.read.json("people.json").cache() # Print the dataframe as well as the data schema df.show() df.printSchema() # Register the DataFrame as a SQL temporary view df.createTempView("people") # Select and show basic data columns df.select("name").show() df.select(df["name"]).show() spark.sql("SELECT name FROM people").show()
spark sql常用作業
# Perform basic filtering df.filter(df["age"] > 21).show() spark.sql("SELECT age, name FROM people WHERE age > 21").show() # Perfom basic aggregation of data df.groupBy("age").count().show() spark.sql("SELECT age, COUNT(age) as count FROM people GROUP BY age").show()
應用轉換將每個數字乘以2,結果是一個包含前50個偶數的RDD。
# starter code # numbers = range(1, 50) # numbers_RDD = ... # even_numbers_RDD = numbers_RDD.map(lambda x: ..) numbers = range(1, 50) numbers_RDD = sc.parallelize(numbers) even_numbers_RDD = numbers_RDD.map(lambda x: x * 2) print( even_numbers_RDD.collect())
以確定我們 people2 文件中的平均年齡。
# starter code !curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/people2.json >> people2.json df = spark.read.json("people2.json").cache() df.createTempView("people2") result = spark.sql("SELECT AVG(age) from people2") result.show()
留言
張貼留言