發表文章

目前顯示的是 12月, 2024的文章

Apache Hive筆記(一)_Hive分散式資料倉儲系統介紹_Hive CLI和beeline CLI

圖片
Hive 是Hadoop當中一個極重要的子專案,與HDFS協同合作成為Hadoop上的資料倉儲架構,Hive提供相近於T-SQL的查詢語言稱為Hive QL。 Hive QL提供使用者執行部份與SQL類似的操作,如常見的資料定義(DDL)操作及資料操 作語言(DML)等 目前Hive QL並不完全支援目前SQL提供的所有的函式,如預存程序與觸發程序等。與Hadoop中的MapReduce程式技術結合後,可自動的將Hive QL語言轉换為MapReduce Task,使用者可容易的使用Hive QL對HDFS中的海量資料進行分析處理,不需要再以Java語言自行撰寫MapReduce程式。 目前Hive提供透過JDBC與ODBC及Thrift等方式來與Hive進行連接 https://www.nitendratech.com/hive/apache-hive/ Thrift 是Facebook在2007年交給Apache軟體基金會的開源專案,主要目的是為了解決Facebook在不同系統中大資料量的傅輸通訊,及系統之間所使用的不同軟體語言與異質環境而訂定的跨平台軟體服務,其支援C++、C#、Haskell、Java、Ocami、Per、PHP、Python、Ruby、Smalltalk等多種不同的程式語言間的通訊,並可作為二進位的高性能的通訊中介軟體,支援資料及物件的序列化和多種類型的RPC服務etaStore為Hive的系統目錄架構,負責Hive中介資料的存放,如資料表的格式、屬性等,因Hive儲存建立在HDFS架構上,在Hive下每張資料表實體存放位置皆會對應到HDFS中的檔案目錄。 Hive提供使用者兩種操作模式為Web UI及命令列介面,來進行對Hive執行Hive QL指令, Hive QL對MetaStore的中介資料進行分析後,透過Driver中的编譯器轉换為 MapReduce Task後,將該工作交給Hadoop進行資料處理。 於AWS網站摘要的內文介紹 Apache Hive 是一種容錯的分散式資料倉儲系統,可進行大規模分析。資料倉儲提供資訊的中央存放區,可以輕鬆分析資料以作出明智且資料導向的決策。Hive 讓客戶使用 SQL 讀取、寫入與管理 PB 級資料。 Hive 建構在 Apache Hadoop 之上,這是一個用於有效儲存和處理大型資料集的開...

Spark Streaming使用介紹_DStream API與Structured Streaming

圖片
  https://www.researchgate.net/figure/Spark-Streaming-uses-DStream-to-transform-streaming-data-into-a-series-of-batches_fig7_381900460 https://www.databricks.com/blog/2015/07/30/diving-into-apache-spark-streamings-execution-model.html DStream API(Discretized Streams) 較舊的流處理模型,主要操作RDD(彈性分佈式數據集)來處理數據流。 Dstream也就是離散的stream,也就表示把連續的資料分成一小團一小團,又稱作“microbatching”。每個microbatch 變成一個RDD以便Spark的後續處理。 在每一個batch interval中,每個DStream會有也僅有一個RDD。 可以由StreamingContext透過串流資料產生或其他DStream使用map方法產生(與RDD一樣)。 DStream裡面的RDD就是透過某個時間間隔產生的,而且以產生的時間為索引,因此time屬性對DStream而言非常重要。 在訪問DStream的某個RDD時,實際上是訪問它在某個時間點的RDD。 Structured Streaming 自spark2.0 開始, spark 引進了一套新的流式計算模型「Structured Streaming」 進一步降低了處理資料的延遲時間 提供快速,可擴展,容錯,end-to-end exactly-once stream processing (端到端的完全一次性流處理),實踐了"有且僅有一次(Exectly Once)" 語義, 可以保證資料被精準消費。 是一種基於Spark SQL 引擎構建的可擴展且容錯的stream processing engine 。 可以使用Dataset/DataFrame API 來表示streaming aggregations (流聚合), event-time windows (事件時間視窗), stream-to-batch joins (流到批次連接) 等。 結構化流提供了對複雜事件處理和狀態管...

PySpark_命令與Cloudera manager_SparcContext預設的Spark Job對照設置(PySparkShell Application UI)

圖片
http://cdh6:4040 會跳轉到PySparkShell Application UI  sc.uiWebUrl 是 SparkContext 的一個屬性,用來獲取 Spark 應用程式的 Web UI 的 URL。 這個 Web UI 提供了豐富的信息,包括應用程式的執行狀態、作業進度、環境設定、執行階段信息以及資源使用狀況等。 當你啟動一個 Spark 應用程式時,Spark 會自動啟動一個 Web UI,通常默認訪問端口為 4040(如果4040被佔用,會自動選擇下一個可用端口)。 這個 Web UI 是用來監控和調試 Spark 應用程式的重要工具。 >>> sc.uiWebUrl u'http://cdh6:4040' >>>

PySpark_命令sc(SparkContext)與spark(SparkSession)_SparkSQL跟DataFrameAPI比較

圖片
  進到core spark入口(建立SparkContext) 指令如下: sc 若是要進入到SparkSession 指令如下 spark SparkSession封裝了SparkSession的執行環境,為所有Spark SQL程式進入點。 是在Spark1.6時候引入,允許不同使用者在使用不同config、暫存表情況下,共享同一個cluster環境。 一個Spark Application只可擁有一個SparkContext,但可有多個SparkSession。 Using Python version 2.7.5 (default, Apr  2 2020 13:16:51) SparkSession available as 'spark'. >>> sc <SparkContext master=yarn appName=PySparkShell> >>> dir(sc) ['PACKAGE_EXTENSIONS', '__class__', '__delattr__', '__dict__', '__doc__', '__enter__', '__exit__', '__format__', '__getattribute__', '__getnewargs__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_accumulatorServer', '_active_spark_context', '_batchS...

PySpark_命令與RDD使用(Transformation及Action)_深入筆記(二)_count()、sum()、reduce()、sortBy、sortByKey

圖片
>>> rdd = sc.parallelize(range(1,10),1) >>> rdd.collect() [1, 2, 3, 4, 5, 6, 7, 8, 9] 將所有element傳回driver產生list物件,使用len(list)的到list的元素個數 >>> len(rdd.collect()) 9 count()是action,統計元素個數後,回傳到spark driver顯示 直接在各個worker計算每個partition的元素個數,再傳回driver加總所有partition的元素個數 >>> rdd.count() 9 sum()是action,將 rdd的元素加總後,回傳到spark driver顯示 >>> rdd.sum() 45 >>> rdd.reduce(lambda x,y: x+y) 45 背後運行方式兩相鄰持續累加,在此用到lambda單行匿名函數。 [1, 2, 3, 4, 5, 6, 7, 8, 9]  x  y    3    x    y     6 x     y   10     x     y      15 x     y     21   x     y    28    x     y        36   x     y       45 >>> mydata = sc.parallelize(["the cat sat on the mat","the aardvark sat o...

PySpark_命令與RDD使用(Transformation及Action)_深入筆記(一)_Lineage(譜系)與RDD Types_parallelize與collect方法及其他常用轉換方法使用

圖片
Lineage(譜系) 代表一組資訊,紀錄RDD如何產生的。 比方:由textfile毒入後或由某個RDD經過map轉換而產生,也會間接描述RDD之間的相依性。 https://www.researchgate.net/figure/Lineage-graph-for-RDDs-in-Phase-1-of-RDD-Apriori_fig1_334987142 在 Apache Spark 中, Resilient Distributed Dataset(RDD) 是最基本的數據結構,用於在多個節點之間分佈數據和進行並行計算。 RDD操作共分成兩大類 1.Transformation 對於已存在的RDD進行某種操作,當完成操作後再返回一個新的RDD。 所有transformation操作都屬於延遲操作(不會立刻執行)。 2.Action 把action操作後結果,返回給driver program或寫到外部儲存體之中。 所有action操作都是屬於會立即執行的。 RDD Types 基本上分為如下兩種 Regular RDD 可以存儲任何類型的物件(如整數、字符串、自定義物件等)。這種類型的 RDD 用於普通的並行數據處理,不涉及鍵值對操作。 # 創建一個 Regular RDD numbers = sc.parallelize([1, 2, 3, 4, 5]) # 使用 reduce 操作計算總和 sum = numbers.reduce(lambda a, b: a + b) sum  # 輸出將是 15 Pair RDD 是包含鍵值對(key-value pairs)的 RDD,每個元素都是一個二元組(tuple),第一個元素作為鍵(key),第二個元素作為值(value)。這種類型的 RDD 常用於需要進行分組、聚合或按鍵排序等操作的情況。 假設我們有一組銷售數據,需要按產品類別對銷售額進行聚合。 # 創建一個 Pair RDD sales_data = sc.parallelize([("Books", 100), ("Books", 200), ("Electronics", 300)]) # 使用 reduceByKey 操作來聚合同一類別的銷售額 sales_total = sales_dat...