Apache Spark_Cheat Sheet



Package/MethodDescriptionCode Example
appName()A name for your job to display on the cluster web UI.
  1. 1
  2. 2
  1. from pyspark.sql import SparkSession
  2. spark = SparkSession.builder.appName("MyApp").getOrCreate()
cache()An Apache Spark transformation often used on a DataFrame, data set, or RDD when you want to perform multiple actions. cache() caches the specified DataFrame, data set, or RDD in the memory of your cluster's workers. Since cache() is a transformation, the caching operation takes place only when a Spark action (for example, count(), show(), take(), or write()) is also used on the same DataFrame, data set, or RDD in a single action.
  1. 1
  2. 2
  1. df = spark.read.csv("customer.csv")
  2. df.cache()
count()Returns the number of elements with the specified value.
  1. 1
  2. 2
  1. count = df.count()
  2. print(count)
createTempView()Creates a temporary view that can later be used to query the data. The only required parameter is the name of the view.
  1. 1
  1. df.createOrReplaceTempView("cust_tbl")
filter()Returns an iterator where the items are filtered through a function to test if the item is accepted or not.
  1. 1
  1. filtered_df = df.filter(df['age'] > 30)
getOrCreate()Get or instantiate a SparkContext and register it as a singleton object.
  1. 1
  1. spark = SparkSession.builder.getOrCreate()
importUsed to make code from one module accessible in another. Python imports are crucial for a successful code structure. You may reuse code and keep your projects manageable by using imports effectively, which can increase your productivity.
  1. 1
  1. from pyspark.sql import SparkSession
len()Returns the number of items in an object. When the object is a string, the len() function returns the number of characters in the string.
  1. 1
  2. 2
  1. row_count = len(df.collect())
  2. print(row_count)
map()Returns a map object (an iterator) of the results after applying the given function to each item of a given iterable (list, tuple, etc.)
  1. 1
  2. 2
  1. rdd = df.rdd.map(lambda row: (row['name'],
  2. row['age']))
pipTo ensure that requests will function, the pip program searches for the package in the Python Package Index (PyPI), resolves any dependencies, and installs everything in your current Python environment.
  1. 1
  1. pip list
pip installThe pip install <package> command looks for the latest version of the package and installs it.
  1. 1
  1. pip install pyspark
print()Prints the specified message to the screen or other standard output device. The message can be a string or any other object; the object will be converted into a string before being written to the screen.
  1. 1
  1. print("Hello, PySpark!")
printSchema()Used to print or display the schema of the DataFrame or data set in tree format along with the column name and data type. If you have a DataFrame or data set with a nested structure, it displays the schema in a nested tree format.
  1. 1
  1. df.printSchema()
sc.parallelize()Creates a parallelized collection. Distributes a local Python collection to form an RDD. Using range is recommended if the input represents a range for performance.
  1. 1
  1. rdd = sc.parallelize([1, 2, 3, 4, 5])
select()Used to select one or multiple columns, nested columns, column by index, all columns from the list, by regular expression from a DataFrame. select() is a transformation function in Spark and returns a new DataFrame with the selected columns.
  1. 1
  1. selected_df = df.select('name', 'age')
show()Spark DataFrame show() is used to display the contents of the DataFrame in a table row and column format . By default, it shows only twenty rows, and the column values are truncated at twenty characters.
  1. 1
  1. df.show()
spark.read.jsonSpark SQL can automatically infer the schema of a JSON data set and load it as a DataFrame. The read.json() function loads data from a directory of JSON files where each line of the files is a JSON object. Note that the file offered as a JSON file is not a typical JSON file.
  1. 1
  1. json_df = spark.read.json("customer.json")
spark.sql()To issue any SQL query, use the sql() method on the SparkSession instance . All spark.sql queries executed in this manner return a DataFrame on which you may perform further Spark operations if required.
  1. 1
  2. 2
  1. result = spark.sql("SELECT name, age FROM cust_tbl WHERE age > 30")
  2. result.show()
time()Returns the current time in the number of seconds since the Unix Epoch.
  1. 1
  2. 2
  3. 3
  1. from pyspark.sql.functions import current_timestamp
  2. current_time = df.select(current_timestamp().alias("current_time"))
  3. current_time.show()
DataFrames and Spark SQL

Package/MethodDescriptionCode Example
appName()A name for your job to display on the cluster web UI.
  1. 1
  2. 2
  1. from pyspark.sql import SparkSession
  2. spark = SparkSession.builder.appName("MyApp").getOrCreate()
createDataFrame()Used to load the data into a Spark DataFrame.
  1. 1
  2. 2
  3. 3
  4. 4
  1. from pyspark.sql import SparkSession
  2. spark = SparkSession.builder.appName("MyApp").getOrCreate()
  3. data = [("Jhon", 30), ("Peter", 25), ("Bob", 35)]
  4. columns = ["name", "age"]

Creating a DataFrame

  1. 1
  1. df = spark.createDataFrame(data, columns)
createTempView()Create a temporary view that can later be used to query the data. The only required parameter is the name of the view.
  1. 1
  1. df.createOrReplaceTempView("cust_tbl")
fillna()Used to replace NULL/None values on all or selected multiple DataFrame columns with either zero (0), empty string, space, or any constant literal values.

Replace NULL/None values in a DataFrame

  1. 1
  1. filled_df = df.fillna(0)

Replace with zero

filter()Returns an iterator where the items are filtered through a function to test if the item is accepted or not.
  1. 1
  1. filtered_df = df.filter(df['age'] > 30)
getOrCreate()Get or instantiate a SparkContext and register it as a singleton object.
  1. 1
  1. spark = SparkSession.builder.getOrCreate()
groupby()Used to collect the identical data into groups on DataFrame and perform count, sum, avg, min, max functions on the grouped data.

Grouping data and performing aggregation

  1. 1
  1. grouped_df = df.groupBy("age").agg({"age": "count"})
head()Returns the first n rows for the object based on position.

Returning the first 5 rows

  1. 1
  1. first_5_rows = df.head(5)
importUsed to make code from one module accessible in another. Python imports are crucial for a successful code structure. You may reuse code and keep your projects manageable by using imports effectively, which can increase your productivity.
  1. 1
  1. from pyspark.sql import SparkSession
pd.read_csv()Required to access data from the CSV file from Pandas that retrieves data in the form of the data frame.
  1. 1
  1. import pandas as pd

Reading data from a CSV file into a DataFrame

  1. 1
  1. df_from_csv = pd.read_csv("data.csv")
pipTo ensure that requests will function, the pip program searches for the package in the Python Package Index (PyPI), resolves any dependencies, and installs everything in your current Python environment.
  1. 1
  1. pip list
pip installThe pip install <package> command looks for the latest version of the package and installs it.
  1. 1
  1. pip install pyspark
printSchema()Used to print or display the schema of the DataFrame or data set in tree format along with the column name and data type. If you have a DataFrame or data set with a nested structure, it displays the schema in a nested tree format.
  1. 1
  1. df.printSchema()
rename()Used to change the row indexes and the column labels.
  1. 1
  1. import pandas as pd

Create a sample DataFrame

  1. 1
  2. 2
  1. data = {'A': [1, 2, 3], 'B': [4, 5, 6]}
  2. df = pd.DataFrame(data)

Rename columns

  1. 1
  1. df = df.rename(columns={'A': 'X', 'B': 'Y'})

The columns 'A' and 'B' are now renamed to 'X' and 'Y'

  1. 1
  1. print(df)
select()Used to select one or multiple columns, nested columns, column by index, all columns from the list, by regular expression from a DataFrame. select() is a transformation function in Spark and returns a new DataFrame with the selected columns.
  1. 1
  1. selected_df = df.select('name', 'age')
show()Spark DataFrame show() is used to display the contents of the DataFrame in a table row and column format. By default, it shows only twenty rows, and the column values are truncated at twenty characters.
  1. 1
  1. df.show()
sort()Used to sort DataFrame by ascending or descending order based on single or multiple columns.

Sorting DataFrame by a column in ascending order

  1. 1
  1. sorted_df = df.sort("age")

Sorting DataFrame by multiple columns in descending order

  1. 1
  1. sorted_df_desc = df.sort(["age", "name"], ascending=[False, True])
SparkContext()It is an entry point to Spark and is defined in org.apache.spark package since version 1.x and used to programmatically create Spark RDD, accumulators, and broadcast variables on the cluster.
  1. 1
  1. from pyspark import SparkContext

Creating a SparkContext

  1. 1
  1. sc = SparkContext("local", "MyApp")
SparkSessionIt is an entry point to Spark, and creating a SparkSession instance would be the first statement you would write to the program with RDD, DataFrame, and dataset
  1. 1
  1. from pyspark.sql import SparkSession

Creating a SparkSession

  1. 1
  1. spark = SparkSession.builder.appName("MyApp").getOrCreate()
spark.read.json()Spark SQL can automatically infer the schema of a JSON data set and load it as a DataFrame. The read.json() function loads data from a directory of JSON files where each line of the files is a JSON object. Note that the file offered as a JSON file is not a typical JSON file.
  1. 1
  1. json_df = spark.read.json("customer.json")
spark.sql()To issue any SQL query, use the sql() method on the SparkSession instance. All spark.sql queries executed in this manner return a DataFrame on which you may perform further Spark operations if required.
  1. 1
  2. 2
  1. result = spark.sql("SELECT name, age FROM cust_tbl WHERE age > 30")
  2. result.show()
spark.udf.register()In PySpark DataFrame, it is used to register a user-defined function (UDF) with Spark, making it accessible for use in Spark SQL queries. This allows you to apply custom logic or operations to DataFrame columns using SQL expressions.

Registering a UDF (User-defined Function)

  1. 1
  2. 2
  3. 3
  4. 4
  5. 5
  1. from pyspark.sql.functions import udf
  2. from pyspark.sql.types import StringType
  3. def my_udf(value):
  4. return value.upper()
  5. spark.udf.register("my_udf", my_udf, StringType())
where()Used to filter the rows from DataFrame based on the given condition. Both filter() and where() functions are used for the same purpose.

Filtering rows based on a condition

  1. 1
  1. filtered_df = df.where(df['age'] > 30)
withColumn()Transformation function of DataFrame used to change the value, convert the data type of an existing column, create a new column, and many more.

Adding a new column and performing transformations

  1. 1
  2. 2
  1. from pyspark.sql.functions import col
  2. new_df = df.withColumn("age_squared", col("age") ** 2)
withColumnRenamed()Returns a new DataFrame by renaming an existing column.

Renaming an existing column

  1. 1
  1. renamed_df = df.withColumnRenamed("age", "years_old")



留言

這個網誌中的熱門文章

何謂淨重(Net Weight)、皮重(Tare Weight)與毛重(Gross Weight)

經得起原始碼資安弱點掃描的程式設計習慣培養(五)_Missing HSTS Header

(2021年度)駕訓學科筆試準備題庫歸納分析_法規是非題