發表文章

目前顯示的是有「PySpark」標籤的文章

Spark 中的常見窄/寬轉換和Rule-Based/Cost-Based優化技術_窄轉換(narrow transformations)和寬轉換(wide transformations)

圖片
  https://medium.com/@dvcanton/wide-and-narrow-dependencies-in-apache-spark-21acf2faf031 窄轉換(narrow transformations) 將各種類型的數據放在單獨的容器中。可以對每個數據容器執行操作,也可以在容器之間獨立行動數據,而無需交互或傳輸。窄轉換的範例包括修改單個數據片段、選擇特定項或組合兩個數據容器。 在窄轉換中,無需執行數據隨機排序操作即可傳輸數據。 當 父 RDD 的每個分區最多被子 RDD 的一個分區使用時 ,我們的依賴關係很窄。 一對一映射:子 RDD 中的每個分區最多依賴於父 RDD 中的一個分區。換言之, 子分區僅處理來自單個相應父分區的數據。 更快的執行速度 :窄依賴項支援流水線等優化。在流水線中, 一個轉換的輸出可以用作下一個轉換的輸入 ,而無需等待處理整個父 RDD。這提高了效率。 減少隨機排序:由於每個子分區都有一個特定的父分區可供訪問,因此無需通過網路對數據進行隨機排序。 例如: map 、 filter 和 union 轉換 備註:隨機排序是指數據在Spark集群中的不同 worker節點之間移動。 窄轉換常見例子 Map: Applying a function to each element in the data set. from pyspark import SparkContext sc = SparkContext( "local" , "MapExample" ) data = [ 1 , 2 , 3 , 4 , 5 ] rdd = sc.parallelize(data) mapped_rdd = rdd.map( lambda x: x * 2 ) mapped_rdd.collect() # Output: [2, 4, 6, 8, 10] Filter: Selecting elements based on a specified condition. from pyspark import SparkContext sc = SparkContext( "local" , "FilterExample" ) dat...

在 PySpark 中為 CSV 檔案進行user-defined schema的步驟

圖片
  引入所需的庫。 from pyspark.sql.types import StructType, IntegerType, FloatType, StringType, StructField 定義schema架構 比方我們有一個employee.csv檔案,內容如下: emp_id,emp_name,dept,salary,phone A101,jhon,computer science, 1000 ,+ 1 ( 701 ) 846 958 A102,Peter,Electronics, 2000 , A103,Micheal,IT, 2500 , 在Spark中使用 『StructType』 類,為每個欄位創建一個 『StructField』,並提及欄位名稱、數據類型和其他屬性。 “False”表示該列不允許使用 null 值。 from pyspark.sql.types import StructType, IntegerType, FloatType, StringType, StructField schema = StructType([ StructField( "Emp_Id" , StringType(), False ), StructField( "Emp_Name" , StringType(), False ), StructField( "Department" , StringType(), False ), StructField( "Salary" , IntegerType(), False ), StructField( "Phone" , IntegerType(), True ), ]) Step3.使用使用者定義的架構讀取輸入檔。 #create a dataframe on top a csv file df = (spark.read .format( "csv" ) .schema(schema) .option( "header" , "true" ) .load( ...

Spark語法_SparkContext與SparkSession的初始化及RDD操作

圖片
  https://link.springer.com/chapter/10.1007/978-1-4842-9380-5_1 下載PySpark與findspark的套件 pip install pyspark pip install findspark 引入套件並初始化Spark Context跟Spark Session import findspark findspark.init() # PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context. from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession # Creating a spark context class sc = SparkContext() # Creating a spark session spark = SparkSession \ .builder \ .appName( "Python Spark DataFrames basic example" ) \ .config( "spark.some.config.option" , "some-value" ) \ .getOrCreate() if 'spark' in locals () and isinstance (spark, SparkSession): print ( "SparkSession is active and ready to use." ) else : print ( "SparkSession is not active. Please create a SparkSession." ) 透過調用 sc.parallelize() 來創建一個 RDD 創建了一個包含從 1 到 30 的整數的 RDD。 data = range ( 1 , 30 ) # print...

實作PySpark與Pandas程式的數據分析_Spark DataFrame與SparkSQL常見操作

圖片
  findspark :用於定位 Spark 安裝。 pandas :被導入以進行數據處理。 SparkSession對於使用PySpark至關重要。它允許創建DataFrame、加載數據和進行各種操作。 初始化具有指定應用程式名稱的 Spark Session。 SparkSession.builder.appName("COVID-19 Data Analysis").getOrCreate() 第一階段.檢測Spark Session是否成功啟動 import findspark # This helps us find and use Apache Spark findspark.init() # Initialize findspark to locate Spark from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DateType import pandas as pd # Initialize a Spark Session spark = SparkSession \ .builder \ .appName( "COVID-19 Data Analysis" ) \ .config( "spark.sql.execution.arrow.pyspark.enabled" , "true" ) \ .getOrCreate() # Check if the Spark Session is active if 'spark' in locals () and isinstance (spark, SparkSession): print ( "SparkSession is active and ready to use." ) else : print ( "SparkSession is not active. Plea...

Apache Spark筆記03_Dataframes跟SparkSQL介紹及常見操作

圖片
  https://rharshad.com/spark-sql-dataframes-datasets/#dataframes 捨麼是SparkSQL? Spark SQL 是一個用於結構化數據處理的 Spark 模組。 用於在 Spark DataFrames 上運行 SQL 查詢,並提供 Java、 Scala、Python 和 R 的 API。  可以使用 SQL 查詢和透過DataFrame API 與 Spark SQL 交互。 有別於Spark RDD API,Spark SQL 包括基於成本的優化器(cost-based optimizer)、欄位式儲存(columnar storage)和代碼生成(code generation)。為 Spark 提供有關數據結構以及正在進行計算的優化。 Spark SQL 支持臨時和全局臨時表格視圖。 (臨時視圖具有局部作用域。 局部作用域意味著視圖僅在當前 Spark 會話中的當前節點內存在。然而,全局臨時視圖存在於一般的 Spark 應用程序中。全局臨時視圖可以在不同的 Spark 會話間共享。) SparkSQL支持的資料來源格式: Parquet:是許多數據處理系統支持的列式格式。 JSON 數據:可以通過推斷模式加載和寫入 JSON 數據。 Hive 中的表數據:支持讀取和寫入存儲在 Hive 中的數據。 Spark-SQL-Optimization https://medium.com/bryanyang0528/%E7%B0%A1%E5%96%AE%E4%BB%8B%E7%B4%B9-sparksql-77dd47c80bc1 https://www.databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html 使用SparkSQL的好處? Spark SQL 支援 Java、Scala、Python 和 R這類程式語言的API。 Spark SQL 使用相同的執行引擎來計算結果,與用於計算的 API 或語言無關。開發人員可以使用提供最自然方式來表示給定轉換的 API。 什麼是 DataFrame? DataFrame 是組織到命名欄位(named...