dataeng

This article will use some images from: Learning Spark, 2nd Edition [Book] (oreilly.com)

Main

General theory

Concern

The biggest issue I faced with popular (and free) Apache Spark and Databricks learning materials is that they often skip infrastructure theory and as a guy who made a step from OPS into DevOps in the past – I feel uncertain about products if I don’t know how their cogs spin.

Even that book does not explain such concepts as: DAG, RDD, DataFrame, DataSet and more, but briefly mentions what they do, sometimes in one sentence.

I disagree with such approach, so which theory I feel is necessary for the topic of the article will be mentioned here.

Consider this as a recommendation of “what” and “where” you should dig to get a better understanding.

DAG and how it works

  • Directed acyclic graph - Wikipedia
  • My brief: a graph (math concept) that enables us to create a visual representation and order of operations (dependencies) as seen on the image below.

    It’s finite and non-cycle. Chains can only depend on previous operations.

Apache Spark Theory

How we connect

Sessions

To execute code, first you need to connect to Spark it can be done programmatically or non-interactively.

HINT

Before Spark 2.0 people used separate SparkContext object to initiate sessions, but once Dataframes (Spark SQL) arrived – pyspark.sql was born and all session building components were put inside of SparkSession.

For RDDs (Spark Core) you still going to use pyspark.SparkContext

SparkSession - pyspark.sql.SparkSession serves as a wrapper for all connection purpose stuff for Dataframes.
A session consists of:

  • BuilderSparkSession.builder
    • You create a session with it.
  • SparkConfSparkSession.conf
    • Represents key-value parameters to load into Spark application.
    • Can be disabled, to prevent load of external settings and apply default system settings
  • SparkContextSparkSession.sparkContext
    • Represents a connection to a Spark Cluster.
Dataframe vs Metastore table

Dataframes are in memory, while metastore table “CREATE TABLE” are persistent on the disc.

CLI - PySpark

If you execute script in the same environment and the same host where Spark is setup you can expect the spark variable which contains session object to be auto-populated, meaning you don’t need to create the session.

# import pyspark # (I prefer this more)
from pyspark.sql import SparkSession # ..but usually you will encounter this more.
 
#pyspark
# Create a Spark session.
spark = (
	 SparkSession.builder
		 .master("local")
		 .appName("Word Count")
		 .config("spark.some.config.option", "some-value")
		 .getOrCreate()
 )
 
# Create a Spark session with Spark Connect.
 spark = (
	 SparkSession.builder
		 .remote("sc://localhost")
		 .appName("Word Count")
		 .config("spark.some.config.option", "some-value")
		 .getOrCreate()
 )  # doctest: +SKIP
GUI - Databricks

Databricks connects you to its Spark infrastructure when you login to Databricks, so there is no need to initiate a session with the in-house cluster. Though you may need to build a session when you connect to a remote instance Databricks.

How it executes

What happens when you run an application or Notebooks.

Driver

Spark driver is responsible to execution, during execution it covert Application to into Jobs

  • Receives requests for computations and data from the user program.
  • Breaks down the task into stages and submits them to the DAGScheduler.
  • DAGScheduler organizes tasks into stages and constructs a directed acyclic graph (DAG).
  • Resilient Distributed Datasets (RDDs) manage the data flow between stages.
  • Tasks are scheduled on the Spark executors by the TaskScheduler.

Driver are either Transformations or Actions.

Transformations

Provide RDD or Dataframe data
Transformations are creating a new Dataframe to work with to preserve Immutability of the source Dataframe. They feature lazy evaluation, meaning they are not executed immediately, but formed as a lineage (a-la future versions revision) with DAG graph which gives total overview on how data was/will be transformed. This also help Spark to optimize it’s execution plan, e.g. certain transformations may be split more Stages.

Hint

No wonder Apache Spark team chose “lazy” transformations and “eager” actions, the names tells us a lot about their behavior

Transformation can be classified into Narrow and Wide:

Any transformation where a single output partition can be computed from a single input partition is a narrow transformation. Both filter() and contains() can be classified as such.
There are many Narrow Transformations supported by Spark: map(func); flatMap(func); filter(func); mapPartition(func); mapPartitionWithIndex(func); union(dataset); zip(dataset); zipWithIndex(); zipWithUniqueId().
Map func returns new RDD after calulation, Filter returns resource on if condition succesful, and Union basically joins RDDs into new one

However, groupBy() or orderBy() instruct Spark to perform wide transformations, where data from other partitions is read in, combined, and written to disk. Since each partition will have its own count of the word that contains the “Spark” word in its row of data, a count(groupBy()) will force a shuffle of data from each of the executor’s partitions across the cluster. In this transformation, orderBy() requires output from other partitions to compute the final aggregation.


Here is how can you list all of dataframes in a session:

def list_dataframes():
    from pyspark.sql import DataFrame
    return [k for (k, v) in globals().items() if isinstance(v, DataFrame)]

Zipping
zip, zipWithIndex, zipWithUniqueId
zip -  zips this RDD with another one, returning key-value pairs (PairRDD) with the elements pair.
For instance, if you have two RDDs with (‘A’, ‘B’, ‘C’) first and (1, 2, 3) second, and you try to zip them, then you get a new RDD with pairs (‘A’, 1), (‘B’, 2), (‘C’, 3). It’s like a hash map with ‘A’, ‘B’, ‘C’ as keys, and 1,2,3 as values for them respectively

val x = sc.parallelize('A' to 'C',2)
val y = sc.parallelize(1 to 3, 2)
val z = x.zip(y)
// (A,1), (B,2), (C,3)

Zipping two RDDs with N elements each is a way to get new N elements where each value is a function of elements on the same position in initial RDDs. Spark has few transformations for that.


Wide
Intersection – shufles partitions: For example, you have two RDD. The first one consists of 1 to 5 integers and has one partition only, and the second one consists of 3 to 6 integers and has two partitions. When you apply intersection to them, data is shuffled between nodes (repartitioning) and you get two new partitions (A and B) with 4 and 3,5 values respectively.

val x = sc.parallelize(1 to 5)
val y = sc.parallelize(3 to 6, 2)
val z = x.intersection(y)
// 4, 3, 5

distinct (a-la UNIQUE)returns a new dataset that contains the distinct elements of the source dataset. For instance, in the simplest case, if you have an RDD with some values duplicated (say 1,2,1,3,2) and run the distinct transformation, then you get a new RDD with unique values only. Please note that the values might change partitions.

val x = sc.parallelize(Array(1, 2, 1, 3, 2), 2)
val z = x.distinct()
// 1, 3, 2

coalesce decreases the number of partitions in the RDD to the given one. It is useful for running operations more efficiently after filtering down a large dataset. For instance, if you have an RDD with 1 to 6 integers, spread around 4 partitions, you can easily decrease partitions to 2 by running coalesce(2) on it.

val x = sc.parallelize(1 to 6, 4)
// [1], [2, 3], [4], [5, 6]

val z = x.coalesce(2)
// [1, 2, 3], [4, 5, 6]

**repartition** (DOES NOT EQULLY DISTRIBUTE VALUES, SOME RRDS may be empty)changes the number of partitions in the RDD to the given one. It means you can also decrease partitions to 2 with its help. Decreasing the number of partitions does not mean you get equal partitions, for example, RDD values are not distributed equally as you can see in this case.

val x = sc.parallelize(1 to 6, 4)
// [1], [2, 3], [4], [5, 6]

val z = x.repartition(2)
// [1, 2, 4, 5], [3, 6]

Thus, what’s the difference between a coalesce and repartition? The only thing coalesce can do is to decrease the number of partitions, while repartition can increase them.
If you are not sure how many partitions you have and do not want to increase them but possibly decrease, then coalesce is your choice. Otherwise, if there is no need to change the number of partitions, whether increasing or decreasing, then select repartition

Actions

Provide non RDD DATA
Examples include: count(); collect(); take(n); top(n); countByValue(); reduce(func); fold(zeroValue, func); aggregate(zeroValue, seqOp, combOp); foreach(func); saveAsTextFile(path); saveAsSequenceFile(path); saveAsObjectFile(path).

Top returns unordered values, Take returns ordered values

An action is basically what triggers execution (computation) of transformations.
In the given example, execution only happens when filtered.count() is passed. This is an example of narrow transformation, everything happens within one partition.

>>> strings = spark.read.text("../README.md")  
>>> filtered = strings.filter(strings.value.contains("Spark")) >>> filtered.count()  
20

The table below has some examples of transformations and actions.

TransformationsActions
orderBy()show()
groupBy()take()
filter()count()
select()collect()
join()save()

Jobs

Applications are transformed into a Jobs and then into a DAG (directed acyclic graph)

Stages

A Job has at least 1 stage, but can be divided intro many depending on the complexity.

Tasks

A task is a subset of Stage, an actual execution unit. Maps to a single core working with one partition of data. I.e. An executor with 10 cores has 10 task in achieving native level of parallelization