New Activity
Play Fill in the Blanks Game
Scenario 10 : Aggregating Elements in an Array
Problem : Compute the average of numeric values stored in an array column .

Solution :

from pyspark . sql . functions import explode , avg

df = df . withColumn ( " exploded " , explode ( df [ " array_column " ] ) )
aggregated_df = df . groupBy ( " id " ) . agg ( avg ( " exploded " ) . alias ( " average_value " ) )






____________________

____________________ " ] ) )
____________________ " ) )



Explanation :

explode ( ) is used to transform each element of an array into a separate row , replicating all other column values . This is then followed by grouping and averaging the exploded values . This technique is useful for performing aggregations on data stored in arrays .
Scenario 19 : Performing an Efficient Join
Join Operations and Optimizations

Problem : You need to join two large DataFrames on a common key but want to avoid performance issues like data skew or excessive shuffling .

Solution :

from pyspark . sql . functions import broadcast

# Assuming df_small is the smaller DataFrame and df_large is the larger one
joined_df = df_large . join ( broadcast ( df_small ) , df_large [ " key " ] = = df_small [ " key " ] )


- - -







____________________

# Assuming df_small is the smaller DataFrame and df_large is the larger one
____________________ " ] )


Explanation :

The broadcast ( ) function is used in joins where one DataFrame is significantly smaller than the other . It broadcasts the smaller DataFrame to all nodes in the cluster , reducing the need for shuffling the larger DataFrame across the network and speeding up the join process .
Scenario 13 : Aggregating Data Using Group By in SQL

Problem : You need to calculate the average sales by department from a sales DataFrame using SQL .

Solution :

df . createOrReplaceTempView ( " sales " )
avg_sales_df = spark . sql ( " SELECT department , AVG ( sales ) AS average_sales FROM sales GROUP BY department " )





____________________ " )
____________________ " )



Explanation :

SQL's GROUP BY clause is used here to aggregate sales data by department , calculating the average sales per department . This approach leverages SQL's powerful aggregation capabilities , making the query easy to understand and maintain .
Scenario 12 : Joining DataFrames Using SQL
Using SQL Queries in PySpark

Problem : You have two DataFrames and need to perform an inner join on them using SQL .

Solution :

df1 . createOrReplaceTempView ( " table1 " )
df2 . createOrReplaceTempView ( " table2 " )
joined_df = spark . sql ( " SELECT * FROM table1 INNER JOIN table2 ON table1 . id = table2 . id " )




____________________ " )
____________________ " )
____________________ " )



Explanation :

Temp views allow you to run SQL queries on DataFrame objects . After creating views for each DataFrame , you can perform joins and other SQL operations just as you would in a database query environment , leveraging SQL's expressive and familiar syntax .

Scenario 21 : Optimizing Joins on Large Datasets
Problem : You need to join two large DataFrames and ensure the operation is as efficient as possible .

Solution :

# Ensure both DataFrames are partitioned on the join key
df1 = df1 . repartition ( " join_key " )
df2 = df2 . repartition ( " join_key " )

# Perform the join
optimized_join_df = df1 . join ( df2 , " join_key " )

- - - -





# Ensure both DataFrames are partitioned on the join key
____________________ " )
____________________ " )

# Perform the join
____________________ " )



Explanation :

Repartitioning both DataFrames on the join key before performing the join helps in colocating rows with the same key on the same node , reducing the data that needs to be shuffled across the cluster when the join is performed . This method significantly improves join efficiency and scalability .
Scenario 15 : Filtering Out Null Values
Handling Nulls and Dirty Data


Problem : You need to exclude rows where any key column contains a null value to maintain data integrity for analysis .

Solution :

clean_df = df . filter ( df [ " key_column " ] . isNotNull ( ) )




____________________ ( ) )


Explanation :

The . isNotNull ( ) method is used to check for non - null values in a DataFrame column . This function is part of a filter that removes rows containing nulls in critical columns , ensuring the robustness of subsequent data processing .
Scenario 14 : Filtering and Sorting Data in SQL

Problem : Filter the data to include only entries from the last year and then sort these entries by date in descending order .

Solution :

df . createOrReplaceTempView ( " events " )
filtered_sorted_df = spark . sql ( " " "
SELECT * FROM events
WHERE event_date > = date_sub ( current_date ( ) , 365 )
ORDER BY event_date DESC
" " " )




____________________ " )
____________________ ( " " "
____________________
____________________ )
____________________
" " " )


Explanation :

The SQL query filters events from the last 365 days using date_sub ( current_date ( ) , 365 ) and sorts them in descending order by event_date . This showcases how to integrate date calculations and sorting in SQL , useful for time - series analyses .
Scenario 22 : Using Semi - Joins to Filter Data
Problem : You want to filter rows in one DataFrame based on the presence of keys in another DataFrame without needing the columns from the second DataFrame .

Solution :

# Using semi join to filter df1 based on presence of keys in df2
semi_joined_df = df1 . join ( df2 , df1 [ " key " ] = = df2 [ " key " ] , " left_semi " )

- - - -





# Using semi join to filter df1 based on presence of keys in df2
____________________ " )


Explanation :

A left_semi join includes all rows from the first DataFrame where there is a match in the second DataFrame , effectively filtering the first DataFrame . This is useful when you only need to check the existence of a key and do not require data from the second DataFrame .
Scenario 11 : Creating Temporary Views
Using SQL Queries in PySpark

Problem : You need to perform several complex SQL operations on a DataFrame , and prefer to use SQL syntax .

Solution :

df . createOrReplaceTempView ( " temp_view " )
result_df = spark . sql ( " SELECT * FROM temp_view WHERE age > 30 " )




____________________ " )
____________________ " )


Explanation :

By creating a temporary view using createOrReplaceTempView ( " temp_view " ) , you can execute SQL queries directly on the data stored in the DataFrame . This is particularly useful for users familiar with SQL , allowing complex queries without extensive DataFrame manipulation code .
Scenario 18 : Handling Missing or Incomplete Data
Problem : You discover that some entries in your DataFrame are missing values in multiple columns which are required for a specific analysis .

Solution :

# Define columns that must not be null for your analysis
required_columns = [ " col1 " , " col2 " , " col3 " ]

# Filter out rows where any of the required columns are null
complete_data_df = df . dropna ( subset = required_columns )

- - - - - -





# Define columns that must not be null for your analysis
____________________ " ]

# Filter out rows where any of the required columns are null
____________________ )


Explanation :

dropna ( ) is used to remove rows with null values in specified columns . By setting subset = required_columns , it ensures that only rows with complete data in the required fields are retained . This is essential for analyses that need complete records to produce valid results .
Scenario 16 : Replacing Nulls with Default Values
Problem : In a customer data DataFrame , replace nulls in the 'age' column with the average age and nulls in the 'country' column with 'Unknown' .

Solution :

from pyspark . sql . functions import coalesce , lit , mean

avg_age = df . select ( mean ( df [ " age " ] ) ) . collect ( ) [ 0 ] [ 0 ]
df = df . withColumn ( " age " , coalesce ( df [ " age " ] , lit ( avg_age ) ) )
df = df . withColumn ( " country " , coalesce ( df [ " country " ] , lit ( " Unknown " ) ) )

- -





____________________

____________________ ]
____________________ ) ) )
____________________ " ) ) )



Explanation :

coalesce ( ) finds the first non - null value among its arguments . Here , it's used along with lit ( ) , which creates a literal column . For 'age' , it replaces nulls with the average age calculated from the dataset . For 'country' , it substitutes nulls with the string " Unknown " .
Scenario 17 : Cleaning Dirty Data ( Invalid Entries )
Problem : Entries in the 'email' column of a customer data DataFrame contain some invalid emails that you want to filter out .

Solution :

from pyspark . sql . functions import col , regexp_extract

# Define a regular expression for valid emails
email_regex = r' \ b [ A - Za - z0 - 9 . _% + - ] + @ [ A - Za - z0 - 9 . - ] + \ . [ A - Z | a - z ] { 2 , } \ b'

# Filter data to include only rows with valid emails
valid_emails_df = df . filter ( regexp_extract ( col ( " email " ) , email_regex , 0 ) ! = " " )

- - - -






____________________

# Define a regular expression for valid emails
____________________

# Filter data to include only rows with valid emails
____________________ , ____________________ ) ! = " " )


Explanation :

Using regexp_extract ( ) with a regular expression for email validation allows you to filter rows based on whether the 'email' column contains a valid email . This method is effective for ensuring data quality by removing rows with invalid email formats .
Scenario 1 : Selecting Specific Columns
Problem : You want to reduce the dataset to only include a few relevant columns for analysis .

Solution :

selected_df = df . select ( " column1 " , " column2 " , " column3 " )



____________________ " )


Explanation :

The select ( ) function is used to specify a subset of columns from the DataFrame . This is useful for focusing on relevant data and can improve performance by reducing the amount of data processed in subsequent transformations .
Scenario 2 : Filtering Data Based on Conditions
Problem : You need to filter the dataset to include only records where the age is greater than 30 .

Solution :

filtered_df = df . filter ( df [ " age " ] > 30 )




____________________ )


Explanation :

filter ( ) applies a condition to each row in the DataFrame and retains rows that meet the criteria . This is essential for narrowing down data to relevant records before performing more intensive operations .
Scenario 20 : Handling Data Skew in Joins
Problem : You are experiencing slow join operations due to data skew where certain keys dominate the dataset .

Solution :

from pyspark . sql . functions import rand

# Add a random suffix to the key in both DataFrames
df1_skewed = df1 . withColumn ( " key " , concat ( df1 [ " key " ] , lit ( " _ " ) , ( rand ( ) * 10 ) . cast ( " int " ) ) )
df2_skewed = df2 . withColumn ( " key " , concat ( df2 [ " key " ] , lit ( " _ " ) , ( rand ( ) * 10 ) . cast ( " int " ) ) )

# Perform the join
skewed_join_df = df1_skewed . join ( df2_skewed , df1_skewed [ " key " ] = = df2_skewed [ " key " ] )

# Remove the suffix to recover the original key
final_df = skewed_join_df . withColumn ( " key " , expr ( " substring ( key , 1 , length ( key ) - 2 ) " ) )









____________________

# Add a random suffix to the key in both DataFrames
____________________ " ) ) )
____________________ ( ) * ____________________ " ) ) )

# Perform the join
____________________ " ] )

# Remove the suffix to recover the original key
____________________ ) " ) )


Explanation :

By adding a random suffix to the join key , you distribute the data more evenly across the cluster , mitigating the effect of skewed keys . This reduces the workload on any single node and balances the processing load , enhancing performance .
Scenario 3 : Grouping and Aggregating Data
Problem : Calculate the average sales by department in a retail dataset .

Solution :

from pyspark . sql import functions as F
aggregated_df = df . groupBy ( " department " ) . agg ( F . avg ( " sales " ) . alias ( " average_sales " ) )




____________________
____________________ " ) )


Explanation :

groupBy ( ) followed by agg ( ) allows for complex aggregations , such as computing averages , sums , and counts , grouped by specific fields . This is pivotal in generating insights and summaries from large datasets .
Scenario 4 : Adding a Computed Column
Problem : Add a new column that shows the total cost , calculated as the product of quantity and price per unit .

Solution :

df = df . withColumn ( " total_cost " , df [ " quantity " ] * df [ " price_per_unit " ] )





____________________ " ] )



Explanation :

withColumn ( ) is used to add a new column to the DataFrame , which is a result of an expression or calculation involving existing columns . This method is commonly used to enrich data with additional metrics or to prepare data for further analysis .
Scenario 5 : Removing Duplicates
Problem : Remove duplicate records in the dataset based on specific columns .

Solution :

unique_df = df . dropDuplicates ( [ " column1 " , " column2 " ] )




____________________ " ] )



Explanation :

dropDuplicates ( ) helps in removing duplicate rows from a DataFrame based on all or a subset of columns . This is especially useful when processing datasets where records have been entered more than once .
Scenario 6 : Renaming Columns
Problem : Rename a column from " old_name " to " new_name " for clarity or standardization .

Solution :

df = df . withColumnRenamed ( " old_name " , " new_name " )




____________________ " )


Explanation :

withColumnRenamed ( ) changes the name of a DataFrame column , which is particularly useful in standardizing column names across different data sources or making them more descriptive .
Scenario 7 : Working with Array Type
Problem : You need to filter out records where the array column contains a specific value .

Solution :

from pyspark . sql . functions import array_contains

filtered_df = df . filter ( array_contains ( df [ " array_column " ] , " value_to_check " ) )




____________________

____________________ " ) )



Explanation :

array_contains ( ) checks whether the specified value exists within an array column . This function is particularly useful when you want to filter rows based on the content of an array .
Scenario 8 : Extracting Data from Structs
Problem : You have a DataFrame with a struct column info that contains fields age and name . You need to access these fields to create new columns .

Solution :

df = df . withColumn ( " age " , df [ " info . age " ] ) . withColumn ( " name " , df [ " info . name " ] )




____________________ " ] )


Explanation :

Accessing fields in a struct is done using a dot notation ( struct_name . field_name ) . This allows you to flatten structures for easier analysis and manipulation .
Scenario 9 : Using Maps
Problem : You want to retrieve a value from a map column based on a specific key .

Solution :

from pyspark . sql . functions import col

df = df . withColumn ( " specific_value " , col ( " map_column " ) [ " key " ] )





____________________

____________________ " ] )


Explanation :

In PySpark , accessing a map's value by its key is straightforward using the bracket notation ( map_column [ " key " ] ) . This is efficient for retrieving related data grouped as key - value pairs within a single DataFrame column .