發表文章

目前顯示的是 12月, 2024的文章

Apache Hive筆記(一)_Hive分散式資料倉儲系統介紹_Hive CLI和beeline CLI

圖片
Hive 是Hadoop當中一個極重要的子專案,與HDFS協同合作成為Hadoop上的資料倉儲架構,Hive提供相近於T-SQL的查詢語言稱為Hive QL。 Hive QL提供使用者執行部份與SQL類似的操作,如常見的資料定義(DDL)操作及資料操 作語言(DML)等 目前Hive QL並不完全支援目前SQL提供的所有的函式,如預存程序與觸發程序等。與Hadoop中的MapReduce程式技術結合後,可自動的將Hive QL語言轉换為MapReduce Task,使用者可容易的使用Hive QL對HDFS中的海量資料進行分析處理,不需要再以Java語言自行撰寫MapReduce程式。 目前Hive提供透過JDBC與ODBC及Thrift等方式來與Hive進行連接 https://www.nitendratech.com/hive/apache-hive/ Thrift 是Facebook在2007年交給Apache軟體基金會的開源專案,主要目的是為了解決Facebook在不同系統中大資料量的傅輸通訊,及系統之間所使用的不同軟體語言與異質環境而訂定的跨平台軟體服務,其支援C++、C#、Haskell、Java、Ocami、Per、PHP、Python、Ruby、Smalltalk等多種不同的程式語言間的通訊,並可作為二進位的高性能的通訊中介軟體,支援資料及物件的序列化和多種類型的RPC服務etaStore為Hive的系統目錄架構,負責Hive中介資料的存放,如資料表的格式、屬性等,因Hive儲存建立在HDFS架構上,在Hive下每張資料表實體存放位置皆會對應到HDFS中的檔案目錄。 Hive提供使用者兩種操作模式為Web UI及命令列介面,來進行對Hive執行Hive QL指令, Hive QL對MetaStore的中介資料進行分析後,透過Driver中的编譯器轉换為 MapReduce Task後,將該工作交給Hadoop進行資料處理。 於AWS網站摘要的內文介紹 Apache Hive 是一種容錯的分散式資料倉儲系統,可進行大規模分析。資料倉儲提供資訊的中央存放區,可以輕鬆分析資料以作出明智且資料導向的決策。Hive 讓客戶使用 SQL 讀取、寫入與管理 PB 級資料。 Hive 建構在 Apache Hadoop 之上,這是一個用於有效儲存和處理大型資料集的開...

Spark Streaming使用介紹_DStream API與Structured Streaming

圖片
  https://www.researchgate.net/figure/Spark-Streaming-uses-DStream-to-transform-streaming-data-into-a-series-of-batches_fig7_381900460 https://www.databricks.com/blog/2015/07/30/diving-into-apache-spark-streamings-execution-model.html DStream API(Discretized Streams) 較舊的流處理模型,主要操作RDD(彈性分佈式數據集)來處理數據流。 Dstream也就是離散的stream,也就表示把連續的資料分成一小團一小團,又稱作“microbatching”。每個microbatch 變成一個RDD以便Spark的後續處理。 在每一個batch interval中,每個DStream會有也僅有一個RDD。 可以由StreamingContext透過串流資料產生或其他DStream使用map方法產生(與RDD一樣)。 DStream裡面的RDD就是透過某個時間間隔產生的,而且以產生的時間為索引,因此time屬性對DStream而言非常重要。 在訪問DStream的某個RDD時,實際上是訪問它在某個時間點的RDD。 Structured Streaming 自spark2.0 開始, spark 引進了一套新的流式計算模型「Structured Streaming」 進一步降低了處理資料的延遲時間 提供快速,可擴展,容錯,end-to-end exactly-once stream processing (端到端的完全一次性流處理),實踐了"有且僅有一次(Exectly Once)" 語義, 可以保證資料被精準消費。 是一種基於Spark SQL 引擎構建的可擴展且容錯的stream processing engine 。 可以使用Dataset/DataFrame API 來表示streaming aggregations (流聚合), event-time windows (事件時間視窗), stream-to-batch joins (流到批次連接) 等。 結構化流提供了對複雜事件處理和狀態管...

PySpark_命令與Cloudera manager_SparcContext預設的Spark Job對照設置(PySparkShell Application UI)

圖片
http://cdh6:4040 會跳轉到PySparkShell Application UI  sc.uiWebUrl 是 SparkContext 的一個屬性,用來獲取 Spark 應用程式的 Web UI 的 URL。 這個 Web UI 提供了豐富的信息,包括應用程式的執行狀態、作業進度、環境設定、執行階段信息以及資源使用狀況等。 當你啟動一個 Spark 應用程式時,Spark 會自動啟動一個 Web UI,通常默認訪問端口為 4040(如果4040被佔用,會自動選擇下一個可用端口)。 這個 Web UI 是用來監控和調試 Spark 應用程式的重要工具。 >>> sc.uiWebUrl u'http://cdh6:4040' >>>

PySpark_命令sc(SparkContext)與spark(SparkSession)_SparkSQL跟DataFrameAPI比較

圖片
  進到core spark入口(建立SparkContext) 指令如下: sc 若是要進入到SparkSession 指令如下 spark SparkSession封裝了SparkSession的執行環境,為所有Spark SQL程式進入點。 是在Spark1.6時候引入,允許不同使用者在使用不同config、暫存表情況下,共享同一個cluster環境。 一個Spark Application只可擁有一個SparkContext,但可有多個SparkSession。 Using Python version 2.7.5 (default, Apr  2 2020 13:16:51) SparkSession available as 'spark'. >>> sc <SparkContext master=yarn appName=PySparkShell> >>> dir(sc) ['PACKAGE_EXTENSIONS', '__class__', '__delattr__', '__dict__', '__doc__', '__enter__', '__exit__', '__format__', '__getattribute__', '__getnewargs__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_accumulatorServer', '_active_spark_context', '_batchS...

PySpark_命令與RDD使用(Transformation及Action)_深入筆記(二)_count()、sum()、reduce()、sortBy、sortByKey

圖片
>>> rdd = sc.parallelize(range(1,10),1) >>> rdd.collect() [1, 2, 3, 4, 5, 6, 7, 8, 9] 將所有element傳回driver產生list物件,使用len(list)的到list的元素個數 >>> len(rdd.collect()) 9 count()是action,統計元素個數後,回傳到spark driver顯示 直接在各個worker計算每個partition的元素個數,再傳回driver加總所有partition的元素個數 >>> rdd.count() 9 sum()是action,將 rdd的元素加總後,回傳到spark driver顯示 >>> rdd.sum() 45 >>> rdd.reduce(lambda x,y: x+y) 45 背後運行方式兩相鄰持續累加,在此用到lambda單行匿名函數。 [1, 2, 3, 4, 5, 6, 7, 8, 9]  x  y    3    x    y     6 x     y   10     x     y      15 x     y     21   x     y    28    x     y        36   x     y       45 >>> mydata = sc.parallelize(["the cat sat on the mat","the aardvark sat o...

PySpark_命令與RDD使用(Transformation及Action)_深入筆記(一)_Lineage(譜系)與RDD Types_parallelize與collect方法及其他常用轉換方法使用

圖片
Lineage(譜系) 代表一組資訊,紀錄RDD如何產生的。 比方:由textfile毒入後或由某個RDD經過map轉換而產生,也會間接描述RDD之間的相依性。 https://www.researchgate.net/figure/Lineage-graph-for-RDDs-in-Phase-1-of-RDD-Apriori_fig1_334987142 在 Apache Spark 中, Resilient Distributed Dataset(RDD) 是最基本的數據結構,用於在多個節點之間分佈數據和進行並行計算。 RDD操作共分成兩大類 1.Transformation 對於已存在的RDD進行某種操作,當完成操作後再返回一個新的RDD。 所有transformation操作都屬於延遲操作(不會立刻執行)。 例如: map 轉換通過函數傳遞數據集的每個元素,產生新的 RDD。 filter 支持根據函數篩選數據集的元素。 distinct 非重複查找數據集中不同元素。 flatmap 轉換(類似於 map 轉換)可以將每個 input 項映射到零個或更大的 output 項。 2.Action 把action操作後結果,返回給driver program或寫到外部儲存體之中。 所有action操作都是屬於會立即執行的。 例如: reduce 操作,它聚合了 RDD 的所有元素並返回result 添加到驅動程式中。 reduce,它使用函數 func 聚合 dataset 元素。 take,它返回一個包含前 n 個元素的陣列。 collect,它將數據集的所有元素作為陣列返回。 takeOrdered,它返回按升序排序或由可選的function 參數。 轉換和操作是如何發生的呢? Spark 使用一種名為有向無環圖directed acyclic graph(稱為 DAG) 和一個關聯的 DAG 計劃程式RDD 操作。 將 DAG 視為由邊和頂點組成的圖形結構。 術語"acyclic"「非迴圈」意味著新邊僅源自現有頂點。 通常頂點和邊會是連續的,頂點表示 RDD,邊表示轉換或操作。 DAG Scheduler 應用圖形結構來運行使用 RDD 執行的任務轉型過程。 為什麼Spark使用DAG? 因為DAG 有助於實現容錯能力。當節點宕機時,...