發表文章

目前顯示的是 1月, 2025的文章

Apache Spark筆記04_Apache Spark 集群模式(Cluster Modes)

圖片
https://events.prace-ri.eu/event/896/sessions/2721/attachments/997/1666/Spark_Cluster.pdf Spark Cluster Manager 與集群通信以獲取運行應用程序所需的資源 作為應用程序外部的服務運行,並抽象化集群管理器類型。 當應用程序運行時,Spark Context 創建任務並通知集群管理器所需的資源。 Cluster Manager種類(Spark 內建支持幾種集群管理器): Spark Standalone 包含了一個獨立的集群管理器。它最適合設置一個簡單的集群。 Spark Standalone 隨 Spark 安裝一起打包,因此不需要額外的依賴來配置和部署。 Spark Standalone 專門設計用來運行 Spark,通常是快速搭建並運行應用程序的最快方式。 有兩個主要組件:Workers 和 Master。 工作節點上運行的是 Workers。它們啟動一個或多個保留核心的執行器進程。 必須有一個可在任何集群節點上運行的 Master 可用。 它將工作節點連接到集群並通過心跳輪詢跟踪它們。如果 Master 與 Worker 在一起,不要為 Worker 預留節點的所有核心和內存。 Apache Hadoop YARN,或稱作另一資源協商器,是來自 Hadoop 項目的集群管理器。 在大數據生態系統中很受歡迎,支持除 Spark 之外的許多其他框架。 YARN 集群有自己的依賴性、設置和配置要求。 因此部署它們比Spark Standalone 更複雜。 Apache Mesos 是一個通用的集群管理器,Spark 運行在其上可以獲得額外的好處。 可以在 Spark 和其他大數據框架之間提供動態分割,以及在多個 Spark 實例之間提供可擴展的分割。 然而,在 Apache Mesos 上運行 Spark 可能需要根據客製化配置要求,進行一些額外的設置。 Kubernetes 是一個用於運行容器化應用程序的開源系統。 Kubernetes 集群運行容器化應用程序。這使得 Spark 應用程序更具可移植性,並有助於自動化部署,簡化依賴性管理,並根據需要擴展集群。 本地模式在機器上本地作為單一進程運行 Spark 應用程序。 該進程調用‘spark-submit’...

Spark 中的常見窄/寬轉換和Rule-Based/Cost-Based優化技術_窄轉換(narrow transformations)和寬轉換(wide transformations)

圖片
  https://medium.com/@dvcanton/wide-and-narrow-dependencies-in-apache-spark-21acf2faf031 窄轉換(narrow transformations) 將各種類型的數據放在單獨的容器中。可以對每個數據容器執行操作,也可以在容器之間獨立行動數據,而無需交互或傳輸。窄轉換的範例包括修改單個數據片段、選擇特定項或組合兩個數據容器。 在窄轉換中,無需執行數據隨機排序操作即可傳輸數據。 當 父 RDD 的每個分區最多被子 RDD 的一個分區使用時 ,我們的依賴關係很窄。 一對一映射:子 RDD 中的每個分區最多依賴於父 RDD 中的一個分區。換言之, 子分區僅處理來自單個相應父分區的數據。 更快的執行速度 :窄依賴項支援流水線等優化。在流水線中, 一個轉換的輸出可以用作下一個轉換的輸入 ,而無需等待處理整個父 RDD。這提高了效率。 減少隨機排序:由於每個子分區都有一個特定的父分區可供訪問,因此無需通過網路對數據進行隨機排序。 例如: map 、 filter 和 union 轉換 備註:隨機排序是指數據在Spark集群中的不同 worker節點之間移動。 窄轉換常見例子 Map: Applying a function to each element in the data set. from pyspark import SparkContext sc = SparkContext( "local" , "MapExample" ) data = [ 1 , 2 , 3 , 4 , 5 ] rdd = sc.parallelize(data) mapped_rdd = rdd.map( lambda x: x * 2 ) mapped_rdd.collect() # Output: [2, 4, 6, 8, 10] Filter: Selecting elements based on a specified condition. from pyspark import SparkContext sc = SparkContext( "local" , "FilterExample" ) dat...

在 PySpark 中為 CSV 檔案進行user-defined schema的步驟

圖片
  引入所需的庫。 from pyspark.sql.types import StructType, IntegerType, FloatType, StringType, StructField 定義schema架構 比方我們有一個employee.csv檔案,內容如下: emp_id,emp_name,dept,salary,phone A101,jhon,computer science, 1000 ,+ 1 ( 701 ) 846 958 A102,Peter,Electronics, 2000 , A103,Micheal,IT, 2500 , 在Spark中使用 『StructType』 類,為每個欄位創建一個 『StructField』,並提及欄位名稱、數據類型和其他屬性。 “False”表示該列不允許使用 null 值。 from pyspark.sql.types import StructType, IntegerType, FloatType, StringType, StructField schema = StructType([ StructField( "Emp_Id" , StringType(), False ), StructField( "Emp_Name" , StringType(), False ), StructField( "Department" , StringType(), False ), StructField( "Salary" , IntegerType(), False ), StructField( "Phone" , IntegerType(), True ), ]) Step3.使用使用者定義的架構讀取輸入檔。 #create a dataframe on top a csv file df = (spark.read .format( "csv" ) .schema(schema) .option( "header" , "true" ) .load( ...

SparkSQL中的兩種Optimizer_Catalyst與Tungsten記憶體優化

圖片
https://www.learntospark.com/2020/02/spark-sql-catalyst-optimizer.html  Spark SQL 支援 基於規則 和 基於成本 的查詢優化。(以下先名詞解說這兩個定義差異) Rule-Based Optimization 代表著 How to run the SQL query (比方:一個表是否有使用到Index、檢查一個查詢是否僅包含所需的欄位。) Cost-Based Optimization 代表著 當一個query執行時所需耗費多少時間跟memory。 (例如:基於查詢消耗的時間和記憶體來衡量和計算成本。) https://www.slideshare.net/slideshow/deep-dive-into-catalyst-apache-spark-20s-optimizer-63071120/63071120#14 Catalyst 也稱為 Catalyst Optimizer,是 Spark SQL 內置的基於規則的查詢優化器。 基於 Scala 函數式編程構造。 Catalyst 優化器選擇會以最低耗時和記憶體消耗較少的查詢路徑評估。 開發人員可以通過添加數據源特定的規則和支持新數據類型來擴展優化器。 Catalyst Optimizer 背後使用一種樹狀數據結構並提供數據樹規則集。 在基於規則的優化期間,SQL 優化器遵循預定義的規則,確定如何運行 SQL 查詢。 Catalyst 執行以下四個高階任務或階段: 分析、邏輯優化、物理規劃和代碼生成。(Analysis, logistical optimization, physical planning, and code generation) Catalyst 將 SQL 查詢轉換為抽象語法樹,以便進一步優化 Apache Spark Catalyst SQL 優化器中的示例樹 https://bigdataschool.ru/blog/spark-sql-catalyst-ast-query-plans.html https://blog.csdn.net/Full_Stack_delp/article/details/72877892 Tungsten  優化了基礎硬件的性能,著重於 CPU 性能而非 I...

Spark中DataSet與DataFrame之間比較_DataSet創建的三種方式

圖片
  DataSets 數據集是不可變的。與 RDD 一樣,它們不能被刪除或丟失。 數據集具有一個編碼器,可將類型指定的 JVM 物件轉換為表格表示形式。 數據集擴展了 DataFrame API。從概念上講,泛型無類型化 「Row」 的數據集是一個 JVM 物件,被視為 DataFrame 的一個column。 數據集是強類型的(type-safe),因此 API 目前僅在 Scala 和Java,它們是靜態類型的語言。動態類型化語言,例如 Python和 R 不支援資料集 API。 與使用 DataFrames 和 RDD 相比 DataSet數據集具有一些獨特的優勢和好處: Datasets 是靜態類型的,因此 Datasets 提供編譯時類型安全性。編譯時類型安全意味著 Spark 可以檢測生產中的語法和語義錯誤,從而節省大量的開發人員和運營成本及時間。 數據集的計算速度比 RDD 快,尤其是對於聚合查詢。數據集提供由 Catalyst 和 Tungsten 支援的其他查詢優化。 數據集可以改進記憶體使用和緩存。Spark 了解數據集的數據結構並優化記憶體中的佈局。 數據集 API 還提供了方便的聚合操作函數。包括 Sum、Average、Join 和 Group-by。 在Spark中創建數據集的三種方法 用 Scala 撰寫的,它使用 toDS 函數從一個序列來間接創建一個數據集。 從文字檔創建數據集,應用原始 “String”數據類型添加到顯式架構聲明中。 使用 JSON 檔案創建數據集 Which function is applied to create a data set from a sequence? Create() DSRdd() (O)toDS() seqDS() You can utilize the ‘toDS()’ function to create a data set from a sequence in Apache Spark.

Apache Spark_Cheat Sheet

Package/Method Description Code Example appName() A name for your job to display on the cluster web UI. 1 2 from pyspark . sql import SparkSession spark = SparkSession . builder . appName ( "MyApp" ). getOrCreate () Copied! cache() An Apache Spark transformation often used on a DataFrame, data set, or RDD when you want to perform multiple actions. cache() caches the specified DataFrame, data set, or RDD in the memory of your cluster's workers. Since cache() is a transformation, the caching operation takes place only when a Spark action (for example, count(), show(), take(), or write()) is also used on the same DataFrame, data set, or RDD in a single action. 1 2 df = spark . read . csv ( "customer.csv" ) df . cache () Copied! count() Returns the number of elements with the specified value. 1 2 count = df . count () print ( count ) Copied! createTempView() Creates a temporary view that can later be used to query the data. The only required parameter is the na...