Skip to main content
The Spark best practices page describes several efficient ways to use Spark. These are guidelines for building efficient, maintainable ETL pipelines, and it includes some of the following:
  • Incremental loading - preferred:
    • Load only new or changed data since the last run.
    • Use watermarks or timestamp columns.
    • Dramatically reduces processing time and costs.
    • Example: WHERE updated_at > '2025-01-01'.
  • Deduplication:
    • Raw data often contains duplicates.
    • Use window functions with row_number() to identify the latest records.
    • Or use dropDuplicates() for simple cases.
    • Always deduplicate before final table writes.
  • Iceberg MERGE for upserts:
    • Recommended approach for slowly changing dimensions.
    • Provides ACID guarantees and efficient updates.
    • Handles both inserts and updates in a single operation.
    • Better than the delete and insert pattern.
  • Partitioning Strategy:
    • Partition by common filter columns such as date, region, or category.
    • Avoid over-partitioning, <1 GB per partition is ideal.
    • Use partition pruning in queries for performance.
    • Consider bucketing for high-cardinality columns.
The following example describes how to apply best practices such as incremental loading, deduplicating records, merging upserts into Iceberg, and partitioning output when writing DataFrames.
# Incremental loading pattern
last_run = "2025-01-01"
df_incremental = spark.read.table("source.orders") \
    .filter(f"order_date > '{last_run}'")

# Deduplication using window function
from pyspark.sql.window import Window
window = Window.partitionBy("customer_id").orderBy(F.desc("updated_at"))
df_deduped = df.withColumn("rank", F.row_number().over(window)) \
    .filter("rank = 1") \
    .drop("rank")

# Iceberg MERGE (upsert) - recommended
spark.sql(
  """
  MERGE INTO iceberg.prod.customers AS target
  USING staging.customer_updates AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *
  """
)

# Proper partitioning
df.writeTo("iceberg.prod.events") \
    .partitionBy("event_date", "event_type") \
    .create()
Be aware of Ranger policies that might restrict access to certain data sources or destinations. If you encounter permission errors, then verify your access rights with the Platform team.

Additional resources

  • To get an overview of Spark, refer to the Spark in NexusOne page.
  • To learn practical ways to use Spark in the NexusOne environment, refer to the Spark hands-on examples page.
  • For more details about Spark, refer to the Spark and Iceberg official documentation.