The Spark best practices page describes several efficient ways to use Spark.
These are guidelines for building efficient, maintainable ETL pipelines, and
it includes some of the following:
- Incremental loading - preferred:
- Load only new or changed data since the last run.
- Use watermarks or timestamp columns.
- Dramatically reduces processing time and costs.
- Example:
WHERE updated_at > '2025-01-01'.
- Deduplication:
- Raw data often contains duplicates.
- Use window functions with
row_number() to identify the latest records.
- Or use
dropDuplicates() for simple cases.
- Always deduplicate before final table writes.
- Iceberg
MERGE for upserts:
- Recommended approach for slowly changing dimensions.
- Provides ACID guarantees and efficient updates.
- Handles both inserts and updates in a single operation.
- Better than the delete and insert pattern.
- Partitioning Strategy:
- Partition by common filter columns such as date, region, or category.
- Avoid over-partitioning,
<1 GB per partition is ideal.
- Use partition pruning in queries for performance.
- Consider bucketing for high-cardinality columns.
The following example describes how to apply best practices such as incremental loading,
deduplicating records, merging upserts into Iceberg, and partitioning output when writing DataFrames.
# Incremental loading pattern
last_run = "2025-01-01"
df_incremental = spark.read.table("source.orders") \
.filter(f"order_date > '{last_run}'")
# Deduplication using window function
from pyspark.sql.window import Window
window = Window.partitionBy("customer_id").orderBy(F.desc("updated_at"))
df_deduped = df.withColumn("rank", F.row_number().over(window)) \
.filter("rank = 1") \
.drop("rank")
# Iceberg MERGE (upsert) - recommended
spark.sql(
"""
MERGE INTO iceberg.prod.customers AS target
USING staging.customer_updates AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
)
# Proper partitioning
df.writeTo("iceberg.prod.events") \
.partitionBy("event_date", "event_type") \
.create()
Be aware of Ranger policies that might restrict access to certain data sources or destinations.
If you encounter permission errors, then verify your access rights with the Platform team.
Additional resources