dataeng

%run magic command can isolate execution to local context within notebook cell.

To clear context and variables: Visit the Run menu and select the Clear state and outputs.

Query a file/directory
To query the data contained in a single file, execute the query with the following pattern:

SELECT * FROM file_format.`/path/to/file ..or ${CLASS.paths.OFKAFKAEVENT}`

This call can also be done in python, but more lines

# UNCOMMENT %python
json_path = f"{DA.paths.kafka_events}/001.json"
df = spark.read.json(json_path)
display(df)

Reference to Query by view

CREATE OR REPLACE VIEW event_view
AS SELECT * FROM json.`${DA.paths.kafka_events}`

Reference to Query by view (TEMP)

CREATE OR REPLACE TEMP VIEW events_temp_view
AS SELECT * FROM json.`${DA.paths.kafka_events}`

Common Table Expression(CTE) they are short lived and act as variable within cell/function!

WITH cte_json
AS (SELECT * FROM json.`${DA.paths.kafka_events}`)
SELECT * FROM cte_json

Binaryfile and working with unstructured data and it’s metadata

SELECT * FROM binaryFile.`${DA.paths.kafka_events}`

Pivoting

you can use .pivot() to move values from within column to multiple columns
Code example

%python
transactionsDF = (item_purchasesDF
.groupBy("order_id",
"email",
"transaction_timestamp",
"total_item_quantity",
"purchase_revenue_in_usd",
"unique_items",
"items",
"item",
"name",
"price")
.pivot("item_id")
.sum("item.quantity")
)
display(transactionsDF)

Aside from sum you can use avg, count, min, max and first

Unpivoted table example:

 ID  Date  Food Name  Amount Eaten 
1August 1, 2019Sammich2
2August 1, 2019Pickle3
3August 1, 2019Apple1
4August 2, 2019Sammich1
5August 2, 2019Pickle1
6August 2, 2019Apple4
7August 3, 2019Cake2
8August 4, 2019Sammich1
9August 4, 2019Pickle2
10August 4, 2019Apple3
Pivoted table:
DAYSammichPickleAppleCake
2019-08-01231NULL
2019-08-02114NULL
2019-08-03NULLNULLNULL2
2019-08-04123NULL

Create schema from json payload

schema_of_json() can create schema from json excerpt, usually paired with from_json()

also you can use inferSchema = true

: vs explode

: allows you to filter before hand in e.g. string from json (value:value_name = ‘string’) while explode creates new rows and columns based on key contents

%python
display(events_stringsDF
.where("value:event_name = 'finalize'")
.orderBy("key")
.limit(1)
)
%python
from pyspark.sql.functions import explode, size
exploded_eventsDF = (parsed_eventsDF
.withColumn("item", explode("items"))
)
display(exploded_eventsDF.where(size("items") > 2))

Caveats

Distinct vs Unique

“Distinct” means total number of different values regardless how many times it appears in the dataset. A name appears in the list multiple times is counted as 1 distinct count.
Whereas, the “Unique” value is total number of values that only appear once.

Collect() in Dataframe API

The DataFrame API also offers the collect() method, but for extremely large DataFrames this is resource-heavy (expensive) and dangerous, as it can cause out-of-memory (OOM) exceptions.

Samples

List all Dataframes in session

from pyspark.sql import DataFrame
 
def list_dataframes():
    return [k for (k, v) in globals().items() if isinstance(v, DataFrame)]
# OR
for (k, v) in globals().items():
    if isinstance(v, DataFrame):
        print(k)