Skip to main content
Apache Spark is a distributed analytics engine and framework for large-scale data processing. The engine executes high-performance data transformations and analytics by leveraging in-memory computing for processing and falls back to disk when required. The framework provides unified APIs for SQL, streaming, machine learning, and graph processing. NexusOne deploys Spark as the primary execution engine for data platform workloads, spanning ingestion pipelines, complex ETL transformations, interactive analytics, and ad-hoc reporting.

Environments and runtime specifications

This section describes the NexusOne environments in which Spark jobs can execute and the runtime, such as the Spark software version, supported libraries, and default settings.

Deployment context

Within NexusOne, Spark jobs can execute in multiple environments:
  • Jupyter Notebooks: An interactive REPL environment with a pre-initialized SparkSession. Optimal for exploratory data analysis, prototyping, and collaborative development. If you are using the NexusOne portal, you can launch a Jupyter Notebook using the NexusOne Build feature.
  • Apache Airflow: Provides production orchestration via Apache Airflow using PySpark decorators.
    • Batch scheduling: Jobs submitted run as scheduled cron tasks, monitored continuously, and have alerting configured.
    • Execution method: Jobs run as containerized Spark applications in Kubernetes pods with full lifecycle management.
    If you are using the NexusOne portal, you can launch Apache Airflow using the NexusOne Monitor feature.
  • Command line via Kyuubi: Provides command-line submission for external job execution. This is available for jobs submitted from outside the Kubernetes cluster.

Current version information

  • Python version: 3.12
  • Spark version: 3.5.6

Supported Spark interfaces

  • PySpark via the Python API: Programmatic access to DataFrame and SQL APIs for pipeline development and data engineering workflows.
  • Spark SQL: Write standard SQL queries that execute on Spark. Perfect for analysts and data warehouse teams who prefer SQL syntax for querying and transforming data.

Supported library and table format

Apache Iceberg: 1.8 The Apache Iceberg table format provides snapshot isolation and schema evolution.

Default Spark configuration

spark.dynamicAllocation.enabled=true
spark.executor.memory=8g
spark.driver.memory=12g
spark.sql.catalog.iceberg.type=hive

Spark for data engineers

Data engineering workflows use Spark to build production data pipelines, transform raw data into analytics-ready tables, and ensure data quality. These tasks usually involve complex SQL transformations, multi-source data integration, and automated pipeline scheduling. The following sections describe related sample tasks you can perform using Apache Spark on NexusOne.
When testing each task, replace the file location with a valid one.

Initialize a Spark session

You can initialize a Spark session with NexusOne’s default Spark configurations, which include an Iceberg catalog setup, executor/memory settings, and integration with the data lakehouse infrastructure. How to implement the Spark session:
  • Use SparkSession to create or retrieve an existing session.
  • Default configurations are automatically applied:
    • The Iceberg catalog provides lakehouse table access.
    • Executor memory set to 8 GB, driver memory to 12 GB.
    • Dynamic allocation manages resource optimization.
    • Hive metastore integration handles for metadata management.
  • Session is ready to read from and write to Iceberg tables in the lakehouse.
The following example describes a created Spark session that uses the default NexusOne Spark configuration. It also verifies that the configurations exist.
from pyspark.sql import SparkSession

# Create or retrieve existing Spark session with NX1 defaults
spark = SparkSession.builder.getOrCreate()

# Verify session configuration
print(f"Spark version: {spark.version}")
print(f"Catalog type: {spark.conf.get('spark.sql.catalog.iceberg.type')}")

Read and write raw data

After initializing a Spark session, you can use Spark’s DataFrame Reader/Writer API to load common data formats such as Parquet, CSV, and JSON with configurable options and write outputs with specified modes and compression. How to read and write raw data:
  • Read data: Use DataFrameReader with file format-specific methods.
    • With CSV files, you configure options such as header detection, schema inference, or delimiters.
    • With JSON, Spark handles nested structures automatically.
    • Handle different file locations such as S3, local paths, or HDFS.
  • Write data: Use DataFrameWriter to write the data.
    • Specify write mode, such as overwrite, append, ignore, or error.
    • Choose a compression format for optimization.
    • Choose a new file format to save the DataFrame’s content in. In this case, Parquet.
The following example describes how to read data from CSV or JSON files into Spark DataFrames. Afterwards, it shows how to then write the DataFrame to Parquet with a specified write mode and compression.
# Reading CSV with options
# Assume "spark" is the Spark session created earlier
df = spark.read.options(
  header="true",
  inferSchema="true",
  delimiter=","
).csv("s3a://bucket/data/customers.csv")

# Reading JSON
df_json = spark.read.json("s3a://bucket/data/events.json")

# Writing Parquet with compression
df.write \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .parquet("s3a://bucket/processed/customers.parquet")
Be aware of Ranger policies that might restrict access to certain data sources or destinations. If you encounter permission errors, then verify your access rights with the Platform team.

Create data frames programmatically

You can create Spark data frames from Python lists, dictionaries, or other data structures with either explicit or inferred schemas using data types such as StructType and StructField. There are several common ways to implement the creation of a data frame programmatically:
  • Inferred schema: Best for quick prototyping or simple structures.
    • Spark detects the schema from the provided data:
    • You can pass a list of tuples or dictionaries.
    • Spark automatically infers column types.
  • Explicit schema: You define the structure by defining the StructType with a StructField for each column. Then you specify exact data types such as StringType or IntegerType. This ensures type safety and prevents inference errors. Also, it’s recommended for production.
The following example describes how to create Spark DataFrames programmatically. It does this using either inferred schemas with tuples or dictionaries, or an explicit schema defined with a StructType and StructField.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Method 1a: Inferred schema - List of tuples
data = [
    ("Alice", 30, "Engineering"),
    ("Bob", 25, "Marketing"),
    ("Charlie", 35, "Sales")
]
df = spark.createDataFrame(data, ["name", "age", "department"])

# Method 1b: Inferred schema - Dictionary
data_dict = [
    {"name": "Alice", "age": 30, "department": "Engineering"},
    {"name": "Bob", "age": 25, "department": "Marketing"}
]
df = spark.createDataFrame(data_dict)

# Method 2: Explicit schema (recommended)
schema = StructType([
    StructField("name", StringType(), False),
    StructField("age", IntegerType(), True),
    StructField("department", StringType(), True)
])
df = spark.createDataFrame(data, schema)

Apply transformation patterns

You can use Spark’s DataFrame, Window, and Functions API to implement common ETL patterns such as data cleaning, lookups/joins, filtering, grouping, aggregations, and time-series analysis. There are several common ways to apply transformation patterns:
  • DataFrame API and Functions API - Cleaning transformations:
    • Use .withColumn() to add/modify columns.
    • Use .filter() to remove invalid rows.
  • DataFrame API - Joining patterns:
    • Use .join() with join types such as inner, left, right, or full.
    • Specify join conditions explicitly.
    • Optimize with broadcast joins for small lookup tables.
  • DataFrame API and Functions API - Aggregation:
    • Use .groupBy() with .agg() for summarization.
    • Apply aggregate functions: count(), avg(), or max().
  • Window API and Functions API - Ranking:
    • Define windows with .partitionBy() and .orderBy().
    • Apply ranking functions such as rank().
    • Calculate running totals or moving averages.
The following example describes how to perform an ETL operation in Spark by cleaning data with filters and computed columns and joining it for enrichment. Then, aggregate it with a groupBy and functions, and apply a window for rankings or other calculations within partitions using specified ordering.
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# DataFrame API and Functions API: Cleaning
# Remove nulls and add computed column
df_clean = df \
    .filter(F.col("age").isNotNull()) \
    .withColumn("age_group", 
                F.when(F.col("age") < 30, "Young")
                 .when(F.col("age") < 50, "Middle")
                 .otherwise("Senior"))

# DataFrame API: Joining
# Left join orders with customer data to enrich the data
df_enriched = orders.join(customers, "customer_id", "left")

# DataFrame API and Functions API: Aggregation
# Group by department and calculate the statistics
df_summary = df.groupBy("department").agg(
    F.count("*").alias("employee_count"),
    F.avg("age").alias("avg_age"),
    F.max("age").alias("max_age")
)

# Window API and Functions API: Ranking
# Rank employees by salary within each department
window = Window.partitionBy("department").orderBy(F.desc("salary"))
	df_ranked = df.withColumn("rank", F.row_number().over(window))
Be aware of Ranger policies that might restrict access to tables you’re joining. Verify permissions if you encounter access errors.

Write to Iceberg tables

You can use Spark’s DataFrameWriterV2 API to write data to Iceberg tables. The API supports append, overwrite, and overwritePartitions operations. To perform row-level operations like a merge, use a SQL statement. There are several common ways to write to Iceberg tables:
  • Use .writeTo() instead of .write():
    • Designed specifically for Iceberg tables.
    • Provides better control over write operations.
    • Enables advanced features like merging.
  • Handle partitioning:
    • Partition large tables by common filter columns such as date or region.
    • Use .partitionBy() when creating tables.
    • Improves query performance dramatically.
When writing to Iceberg tables, NexusOne automatically uses the default catalog configuration. It’s also worth noting the following:
  • NexusOne ships with the Iceberg catalog enabled by default.
  • Tables are automatically registered in the Hive metastore.
  • You reference tables using fully qualified names: iceberg.schema.table.
The following example describes how to use the DataFrameWriterV2 API to append new data, overwrite tables or partitions, create partitioned tables, and rely on a separate SQL merge statement to perform an update-or-insert logic on Iceberg tables.
# Append new data
df.writeTo("iceberg.prod.sales") \
    .append()

# Overwrite entire table
df.writeTo("iceberg.staging.customers") \
    .overwrite()

# Overwrite specific partitions only
df.writeTo("iceberg.prod.events") \
    .overwritePartitions()

# Create partitioned table
df.writeTo("iceberg.prod.orders") \
    .partitionBy("order_date") \
    .create()

# MERGE operation using SQL
spark.sql(
  """
  MERGE INTO iceberg.prod.customers AS target
  USING staging.new_customers AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *
  """
)
Be aware of Ranger policies that might restrict write access to certain schemas or tables. Contact the Platform team if you encounter permission errors.

Submit Spark applications

You can execute Spark applications on the NexusOne cluster using either Apache Airflow, the recommended app, or Jupyter notebooks. This is helpful for ad-hoc analysis with proper resource allocation and monitoring. There are several common ways to submit Spark applications:
  • Use Airflow within Kubernetes:
    • Package your PySpark code in a DAG.
    • Use the PySpark decorator for Spark tasks.
    • Airflow handles scheduling, retries, and monitoring.
    • Spark jobs run as containerized pods in Kubernetes.
  • Use Jupyter within Kubernetes:
    • Start Spark session directly in the notebook.
    • Submit jobs interactively for exploration.
    • Results appear inline in notebook cells.
  • Outside Kubernetes:
    • Use the Kyuubi submitter for external job submission.
    • Not recommended for production workflows.
  • Required elements:
    • Python script or notebook with PySpark code.
    • Kubernetes namespace and resource limits.
    • OpenLineage tracking for metadata capture.
The following example describes how to define an Airflow DAG that runs daily at 2 AM and uses the PySpark decorator to execute a Spark task that reads raw data from S3, filters active records, and appends them to an Iceberg table.
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule_interval="0 2 * * *",  # Daily at 2 AM
    start_date=datetime(2025, 1, 1),
    catchup=False
)
def spark_etl_pipeline():
    
    @task.pyspark(
        conn_id="spark",
        config_kwargs={
            "spark.driver.memory": "8g",
            "spark.executor.memory": "8g"
        }
    )
    def process_data(spark, sc, **context):
        # Your Spark transformation logic
        df = spark.read.parquet("s3a://bucket/raw/data")
        df_transformed = df.filter("status = 'active'")
        df_transformed.writeTo("iceberg.prod.active_users").append()
        
    process_data()

dag = spark_etl_pipeline()

Optimize Spark performance

You can implement performance optimization techniques such as caching DataFrames, broadcast joins for small tables, partitioning strategies, and appropriate persistence levels to accelerate job execution. There are several common ways to optimize the performance of Spark, such as using:
  • Cached DataFrames:
    • Use .cache() or .persist() for reused DataFrames.
    • Stores data in memory for faster subsequent access.
    • Best for iterative algorithms or multiple actions on the same data.
  • Broadcast joins:
    • Use F.broadcast() for small lookup tables less than 10 MB.
    • Sends small tables to all executors, avoiding shuffle.
    • Dramatically speeds up joins with dimension tables.
  • Partitioning strategies:
    • Repartition: Increase or decrease partitions. It triggers a full shuffle.
    • Coalesce: Reduce partitions without shuffle for efficiency.
    • Balance: Avoid underutilizing or overutilizing partitions.
  • Persistence levels:
    • MEMORY_ONLY: Fastest, but might spill to disk.
    • MEMORY_AND_DISK: Safer for large datasets.
    • DISK_ONLY: When there isn’t enough memory.
    • Choose based on data size and reuse pattern.
The following example describes how to optimize Spark jobs by caching frequently accessed DataFrames, using broadcast joins for small tables, repartitioning large DataFrames for parallelism, and coalescing before writing to reduce output files.
from pyspark.sql import functions as F
from pyspark import StorageLevel

# Cache frequently accessed DataFrame
df_users = spark.read.table("iceberg.prod.users")
df_users.cache()  # or .persist(StorageLevel.MEMORY_AND_DISK)

# Broadcast join for small lookup table
df_countries = spark.read.table("iceberg.dim.countries")  # Small table
df_enriched = df_users.join(
    F.broadcast(df_countries),
    "country_code"
)

# Repartition for better parallelism
df_large = spark.read.table("iceberg.prod.transactions")
df_repartitioned = df_large.repartition(200, "transaction_date")

# Coalesce to reduce output files
df_final.coalesce(10).write.parquet("s3a://bucket/output/")

# Unpersist when done to free memory
df_users.unpersist()

Follow ETL best practices

These are guidelines for building efficient, maintainable ETL pipelines. This includes some of the following:
  • Incremental loading - preferred:
    • Load only new or changed data since the last run.
    • Use watermarks or timestamp columns.
    • Dramatically reduces processing time and costs.
    • Example: WHERE updated_at > '2025-01-01'.
  • Deduplication:
    • Raw data often contains duplicates.
    • Use window functions with row_number() to identify the latest records.
    • Or use dropDuplicates() for simple cases.
    • Always deduplicate before final table writes.
  • Iceberg MERGE for upserts:
    • Recommended approach for slowly changing dimensions.
    • Provides ACID guarantees and efficient updates.
    • Handles both inserts and updates in a single operation.
    • Better than the delete and insert pattern.
  • Partitioning Strategy:
    • Partition by common filter columns such as date, region, or category.
    • Avoid over-partitioning, <1 GB per partition is ideal.
    • Use partition pruning in queries for performance.
    • Consider bucketing for high-cardinality columns.
The following example describes how to apply best practices such as incremental loading, deduplicating records, merging upserts into Iceberg, and partitioning output when writing DataFrames.
# Incremental loading pattern
last_run = "2025-01-01"
df_incremental = spark.read.table("source.orders") \
    .filter(f"order_date > '{last_run}'")

# Deduplication using window function
from pyspark.sql.window import Window
window = Window.partitionBy("customer_id").orderBy(F.desc("updated_at"))
df_deduped = df.withColumn("rank", F.row_number().over(window)) \
    .filter("rank = 1") \
    .drop("rank")

# Iceberg MERGE (upsert) - recommended
spark.sql(
  """
  MERGE INTO iceberg.prod.customers AS target
  USING staging.customer_updates AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *
  """
)

# Proper partitioning
df.writeTo("iceberg.prod.events") \
    .partitionBy("event_date", "event_type") \
    .create()
Be aware of Ranger policies that might restrict access to certain data sources or destinations. If you encounter permission errors, then verify your access rights with the Platform team.

Monitor Spark jobs

You can monitor a Spark app execution through the Spark History Server UI. It provides visibility into job metrics, stage performance, executor resource utilization, and SQL query execution plans for debugging and optimization.

Accessing the Spark History Server

The Spark History Server provides a web-based interface to monitor completed and running Spark applications. You can access the server on the NexusOne cluster using the following URL:
https://spark-history.<client>.nx1cloud.com/
Replace client with your client’s name. Steps to access the Spark History Server:
  1. Navigate to the History Server URL in your browser.
  2. Authenticate using your NexusOne platform credentials.
  3. View the app list showing all Spark jobs.
  4. Click on an Application ID to drill into specific job details.
  5. Use the search/filter bar to find applications by name or date.
Each app in the list displays the following:
  • Application ID.
  • Application Name. As defined in your Spark configuration.
  • Spark user who submitted the job.
  • Start time, completed time, and duration.
  • Status such as COMPLETED, RUNNING, or FAILED.
01-history-server

History Server log directory

Job and status metrics

The History Server breaks down Spark applications into Jobs and Stages, providing detailed performance metrics for troubleshooting and optimization.
  • Spark jobs tab: This tab displays the following:
    • A list of all jobs within the selected app.
    • Job duration, number of stages, and status.
    • SQL query or action that triggered each job. For example, df.write() or df.count().
    02-spark-jobs-tab

    Spark jobs tab
  • Spark job stages tab: Opening a job reveals its stages. Each stage groups tasks that can run in parallel. The tab displays the following:
    • Stages: Each job consists of multiple stages, also referred to as groups of tasks.
    • Stage metrics:
      • Duration: Total time to complete the stage.
      • Status: Succeeded, Failed, Running, Pending.
      • Tasks: Total number of tasks and their success/failure counts.
      • Input/Output: Data read from source and written to destination.
      • Shuffle Read/Write: Data exchanged between executors.
    03-spark-job-stages-tab

    Spark job stages tab
  • Task details: Opening a stage exposes the task-level metrics, such as:
    • Executor ID
    • Launch time
    • Duration
    • Status
    • Locality level
    These metrics reveal bottlenecks such as:
    • Tasks with unusually long durations. A data skew.
    • Failed tasks with retry attempts.
    • Tasks with excessive shuffle or spill to disk.

Executor metrics

Executors are the worker processes that run Spark tasks.
  • Executors tab: This displays every executor instance and its resource utilization:
    • Executor ID: Unique identifier for each executor instance.
    • Address: Host and port where the executor is running.
    • Status: Active, Dead, or Lost.
    • RDD blocks: Number of cached data partitions.
    • Storage memory: Memory used for caching DataFrames/RDDs.
    • Disk used: Spill-over storage when memory is full.
    • Cores: Number of CPU cores allocated.
    • Active tasks: Currently running tasks on this executor.
    • Failed tasks: Count of task failures on this executor.
    • Complete tasks: Successfully finished tasks.
    • Total tasks: Cumulative task count.
    04-spark-executors

    Spark executors tab
  • Key performance indicators: Within the executor tab, these are some of the KPIs:
    • CPU utilization: High CPU utilization indicates compute-intensive operations.
    • Memory usage: Monitor used vs available storage and execution memory.
    • GC time: Garbage collection overhead. It should be < 10% of task time.
    • Shuffle metrics:
      • Shuffle read: Data pulled from other executors.
      • Shuffle write: Data sent to other executors.
      High shuffle indicates expensive joins or aggregations.

SQL query execution details

For Spark SQL queries and DataFrame operations, the History Server provides query execution plans and optimization details.
  • SQL/DataFrame tab: This tab displays the following:
    • All SQL queries executed in the app.
    • Query text and description.
    • Submission time and duration.
    • Number of jobs triggered by the query.
    05-sql-dataframe-tab

    SQL/DataFrame tab
  • Execution plan details: These show how Spark runs a SQL query or DataFrame operation:
    • Logical plan: High-level query structure before optimization.
    • Physical plan: Actual execution strategy chosen by Spark.
    • Code generation: Shows generated Java code for advanced debugging.
    • Metrics per operator:
      • Number of rows processed.
      • Data size scanned.
      • Time spent in each operation.
      • Partition pruning effectiveness.
  • Common plan nodes to monitor: These operators reveal most of the performance behavior:
    • Scan: Reads data from the source, then checks partition filters.
    • Filter: Removes unnecessary rows. This should happen early.
    • Exchange: Shuffle operations. It’s expensive, so minimize it when possible.
    • HashAggregate: Groups and aggregates data efficiently.
    • BroadcastHashJoin: Efficiently joins a small table with broadcast.
    • SortMergeJoin: Joins large tables by sorting and shuffling. It’s more expensive.
    06-sql-query-execution-details

    SQL query execution details

Accessing driver and executor logs

Logs provide detailed diagnostic information for debugging failures, errors, and unexpected behavior. Steps to access the driver and executor logs:
  1. Navigate to the Executors tab.
  2. Click “stderr” or “stdout” links for each executor.
  3. Driver logs appear under the “driver” executor entry.
  4. Download logs for offline analysis.
The log levels:
  • INFO: General execution flow and stage completion.
  • WARN: Potential issues. For example, slow tasks or retries.
  • ERROR: Failures requiring attention.
  • DEBUG: Detailed internal operations.
Common error patterns and how to fix them:
  • OutOfMemoryError: Increase executor memory or reduce data per partition.
  • FileNotFoundException: Check S3 paths and access permissions.
  • org.apache.spark.shuffle.FetchFailedException: Executor lost during shuffle.
  • java.io.IOException: No space left on device: Increase the disk space.

Spark for data warehouse

Data Warehouse workflow uses Spark for querying data, exploring datasets, and performing lightweight transformations. These tasks focus on SQL-based analysis, interactive exploration, and generating insights from lakehouse data. The following sections describe data warehouse tasks you can perform using Apache Spark on NexusOne.

Query data using Spark SQL

You can execute SQL queries against Iceberg tables in the lakehouse using spark.sql(), with support for creating temporary views for multi-step analysis. There are several common ways to query data using Spark SQL, such as:
  • Direct SQL queries:
    • Use spark.sql("SELECT ...") to run any SQL query.
    • Query Iceberg tables using fully qualified names.
    • Results return as Spark DataFrames for further processing.
  • Create temporary views:
    • Use .createOrReplaceTempView() to name DataFrames.
    • Reference views in subsequent SQL queries.
    • Views exist only within the current Spark session.
  • Common SQL operations:
    • SELECT, WHERE, JOIN, GROUP BY, or ORDER BY.
    • Window functions, CTEs, subqueries.
    • Aggregate functions: SUM, AVG, or COUNT.
The following example describes how to run SQL queries on Iceberg tables in Spark, such as aggregations on an employees table and then query a temporary view of customers.
# Direct SQL query on Iceberg table
df_result = spark.sql(
  """
  SELECT 
      department,
      COUNT(*) as employee_count,
      AVG(salary) as avg_salary
  FROM iceberg.prod.employees
  WHERE hire_date >= '2024-01-01'
  GROUP BY department
  ORDER BY avg_salary DESC
  """
)

df_result.show()

# Create temporary view for reuse
df_customers = spark.read.table("iceberg.prod.customers")
df_customers.createOrReplaceTempView("customers_view")

# Query the temporary view
spark.sql(
  """
  SELECT country, COUNT(*) as customer_count
  FROM customers_view
  WHERE status = 'active'
  GROUP BY country
  """
).show()

Query Iceberg tables in the lakehouse

You can access tables through the Iceberg catalog using Spark SQL with support for selecting, filtering, ordering, and leveraging Iceberg’s advanced features like time travel. There are several common ways to query Iceberg tables in the lakehouse, such as:
  • Table access pattern:
    • Use fully qualified names: catalog.schema.table.
    • The default catalog is iceberg in NexusOne.
      • For example, iceberg.prod.sales.
  • Basic query operations:
    • SELECT specific columns or all columns *.
    • Filter with WHERE clauses.
    • Sort with ORDER BY.
    • Limit results with LIMIT.
  • Leverage Iceberg features:
    • Time travel: Query historical snapshots.
    • Partition pruning: Filter on partition columns for speed.
    • Schema evolution: Tables adapt to schema changes.
The following example describes how to run SQL queries on Iceberg tables in Spark, like getting top orders, checking sales data as of a past date, and efficiently querying events by partition.
-- Basic query with filtering and ordering
SELECT 
    customer_id,
    order_date,
    total_amount
FROM iceberg.prod.orders
WHERE order_date >= '2025-01-01'
    AND status = 'completed'
ORDER BY total_amount DESC
LIMIT 100

-- Time travel: Query data as it existed yesterday
SELECT * 
FROM iceberg.prod.sales
FOR SYSTEM_TIME AS OF '2025-01-23'
WHERE region = 'US'

-- Partition pruning for performance
SELECT *
FROM iceberg.prod.events
WHERE event_date = '2025-01-24'  -- Partition column
    AND event_type = 'click'

Explore data in Jupyter Notebooks

Jupyter Notebooks provides methods for sampling datasets, converting Spark DataFrames to Pandas for visualization, and previewing data in notebook environments. There are several common ways to explore data in Jupyter Notebooks, such as:
  • Sampling:
    • Use .sample(fraction) for random sampling.
    • Use .limit(n) for the first N rows.
    • Balance between representative data and performance.
  • Conversion to Pandas:
    • Use .toPandas() to convert Spark DataFrame.
    • Warning: Only convert small datasets < 100K rows.
    • Pandas enables rich visualizations with matplotlib/seaborn.
  • Preview methods:
    • .show(n): Displays a formatted table in the console.
    • .display(): Rich display in notebooks such as those in Jupyter or Databricks.
    • .take(n): Returns the first N rows as a list.
  • Descriptive statistics:
    • .describe(): Summary statistics for numeric columns.
    • .printSchema(): View column types.
    • .count(): Get total row count.
The following example describes how to load an Iceberg table in Spark, take a random sample for exploration, preview rows, convert a small subset to Pandas for visualization, and inspect summary statistics and schema.
# Load table and sample data
df = spark.read.table("iceberg.prod.transactions")

# Random 1% sample for exploration
df_sample = df.sample(fraction=0.01, seed=42)

# Preview first 20 rows
df_sample.show(20, truncate=False)

# Convert small dataset to Pandas for visualization
df_small = df.limit(1000).toPandas()

# Visualize with matplotlib
import matplotlib.pyplot as plt
df_small['amount'].hist(bins=50)
plt.title('Transaction Amount Distribution')
plt.show()

# Get summary statistics
df.describe(['amount', 'quantity']).show()

# View schema
df.printSchema()

Perform lightweight transformations

You can apply simple filtering, grouping, and cleaning operations using the DataFrame API or SQL for exploratory analysis and quick reporting. There are several common ways to perform lightweight transformations, such as:
  • Filtering data:
    • Use .filter() or .where() with conditions.
    • Chain multiple conditions using & for AND or | for OR.
    • SQL: Use WHERE clause with multiple predicates.
  • Grouping and aggregation:
    • Use .groupBy().agg() in DataFrame API.
    • SQL: GROUP BY with aggregate functions.
    • Common aggregates: COUNT, SUM, AVG, MIN, or MAX.
  • Simple cleaning:
    • Remove nulls with .filter(col.isNotNull()).
    • Rename columns with .withColumnRenamed().
    • Add computed columns with .withColumn().
The following example describes how to analyze Iceberg sales data by filtering high-value completed orders, aggregating metrics by region and product category, and ordering results by total revenue. Do this using both the DataFrame API and SQL.
from pyspark.sql import functions as F

# DataFrame API: Filter and group
df = spark.read.table("iceberg.prod.sales")

df_summary = df \
    .filter(F.col("status") == "completed") \
    .filter(F.col("amount") > 100) \
    .groupBy("region", "product_category") \
    .agg(
        F.count("*").alias("order_count"),
        F.sum("amount").alias("total_revenue"),
        F.avg("amount").alias("avg_order_value")
    ) \
    .orderBy(F.desc("total_revenue"))

df_summary.show()

# SQL equivalent
spark.sql(
  """
  SELECT 
      region,
      product_category,
      COUNT(*) as order_count,
      SUM(amount) as total_revenue,
      AVG(amount) as avg_order_value
  FROM iceberg.prod.sales
  WHERE status = 'completed'
      AND amount > 100
  GROUP BY region, product_category
  ORDER BY total_revenue DESC
  """
).show()

Optimize queries for performance

You can write efficient queries by selecting only needed columns, partition filtering, and avoiding expensive operations like wide shuffles. There are several common ways to optimize queries for performance, such as:
  • Using projection, a way to select specific columns:
    • Reduces data transfer and memory usage.
    • Especially important with wide tables with many columns.
  • Leveraging partition filtering:
    • Filter on partition columns, usually date-based.
    • Dramatically reduces the data scanned.
    • Put partition filters in WHERE clause.
  • Avoiding wide shuffles:
    • Minimize joins on high-cardinality columns.
    • Use broadcast joins for small tables.
    • Be careful with GROUP BY on many distinct values.
  • Predicate pushdown and selective retrieval:
    • Filter early in the query to push down predicates.
    • Use LIMIT when exploring data.
    • Avoid SELECT * in production queries.
The following example describes how to show Spark optimizations on Iceberg tables using column selection, partition filters, broadcast joins, low-cardinality aggregations, and limiting scans.
from pyspark.sql import functions as F

# BAD: Reads all columns and all data
df_bad = spark.sql("SELECT * FROM iceberg.prod.events")

# GOOD: Projection + partition filter
df_good = spark.sql(
  """
  SELECT 
      user_id,
      event_type,
      timestamp
  FROM iceberg.prod.events
  WHERE event_date = '2025-01-24'  -- Partition filter
      AND event_type = 'purchase'
  """
)

# DataFrame API equivalent
df_optimized = spark.read.table("iceberg.prod.events") \
    .select("user_id", "event_type", "timestamp") \
    .filter(F.col("event_date") == "2025-01-24") \
    .filter(F.col("event_type") == "purchase")

# Use LIMIT for exploration
spark.sql(
  """
  SELECT col1, col2, col3
  FROM iceberg.prod.large_table
  WHERE date = '2025-01-24'
  LIMIT 100
  """
).show()

# BAD: Join on high-cardinality column causing shuffle
df_bad_join = large_table.join(
    another_large_table,
    "transaction_id"  # High cardinality = expensive shuffle
)

# GOOD: Broadcast join for small dimension table
df_good_join = transactions.join(
    F.broadcast(categories),  # Small lookup table
    "category_id"
)

# BAD: GROUP BY on too many distinct values
df_bad_agg = spark.sql(
  """
  SELECT transaction_id, SUM(amount)
  FROM sales
  GROUP BY transaction_id  -- Millions of unique IDs
  """
)

# GOOD: GROUP BY on reasonable cardinality
df_good_agg = spark.sql(
  """
  SELECT 
      region,
      product_category,
      SUM(amount) as total_sales
  FROM sales
  WHERE sale_date >= '2025-01-01'  -- Filter early
  GROUP BY region, product_category  -- Low cardinality
  """
)

# Performance comparison examples
# Without optimization: scans 500GB, takes 15 minutes
spark.sql(
  """
  SELECT *
  FROM iceberg.prod.transactions
  WHERE amount > 1000
  """
)

# With optimization: scans 5GB, takes 30 seconds
spark.sql(
  """
  SELECT 
      transaction_id,
      customer_id,
      amount
  FROM iceberg.prod.transactions
  WHERE transaction_date = '2025-01-24'  -- Partition filter first
      AND amount > 1000
  """
)

Additional resource

For more details about Spark, refer to the Spark and Iceberg official documentation.