Environments and runtime specifications
This section describes the NexusOne environments in which Spark jobs can execute and the runtime, such as the Spark software version, supported libraries, and default settings.Deployment context
Within NexusOne, Spark jobs can execute in multiple environments:- Jupyter Notebooks: An interactive REPL environment with a pre-initialized SparkSession. Optimal for exploratory data analysis, prototyping, and collaborative development. If you are using the NexusOne portal, you can launch a Jupyter Notebook using the NexusOne Build feature.
-
Apache Airflow: Provides production orchestration via Apache Airflow using
PySpark decorators.
- Batch scheduling: Jobs submitted run as scheduled cron tasks, monitored continuously, and have alerting configured.
- Execution method: Jobs run as containerized Spark applications in Kubernetes pods with full lifecycle management.
- Command line via Kyuubi: Provides command-line submission for external job execution. This is available for jobs submitted from outside the Kubernetes cluster.
Current version information
- Python version:
3.12 - Spark version:
3.5.6
Supported Spark interfaces
- PySpark via the Python API: Programmatic access to DataFrame and SQL APIs for pipeline development and data engineering workflows.
- Spark SQL: Write standard SQL queries that execute on Spark. Perfect for analysts and data warehouse teams who prefer SQL syntax for querying and transforming data.
Supported library and table format
Apache Iceberg:1.8
The Apache Iceberg table format provides snapshot isolation and schema evolution.
Default Spark configuration
Spark for data engineers
Data engineering workflows use Spark to build production data pipelines, transform raw data into analytics-ready tables, and ensure data quality. These tasks usually involve complex SQL transformations, multi-source data integration, and automated pipeline scheduling. The following sections describe related sample tasks you can perform using Apache Spark on NexusOne.When testing each task, replace the file location with a valid one.
Initialize a Spark session
You can initialize a Spark session with NexusOne’s default Spark configurations, which include an Iceberg catalog setup, executor/memory settings, and integration with the data lakehouse infrastructure. How to implement the Spark session:- Use SparkSession to create or retrieve an existing session.
- Default configurations are automatically applied:
- The Iceberg catalog provides lakehouse table access.
- Executor memory set to 8 GB, driver memory to 12 GB.
- Dynamic allocation manages resource optimization.
- Hive metastore integration handles for metadata management.
- Session is ready to read from and write to Iceberg tables in the lakehouse.
Read and write raw data
After initializing a Spark session, you can use Spark’s DataFrame Reader/Writer API to load common data formats such as Parquet, CSV, and JSON with configurable options and write outputs with specified modes and compression. How to read and write raw data:-
Read data: Use DataFrameReader with file format-specific methods.
- With CSV files, you configure options such as header detection, schema inference, or delimiters.
- With JSON, Spark handles nested structures automatically.
- Handle different file locations such as S3, local paths, or HDFS.
-
Write data: Use DataFrameWriter to write the data.
- Specify write mode, such as
overwrite,append,ignore, orerror. - Choose a compression format for optimization.
- Choose a new file format to save the DataFrame’s content in. In this case, Parquet.
- Specify write mode, such as
Create data frames programmatically
You can create Spark data frames from Python lists, dictionaries, or other data structures with either explicit or inferred schemas using data types such as StructType and StructField. There are several common ways to implement the creation of a data frame programmatically:-
Inferred schema: Best for quick prototyping or simple structures.
- Spark detects the schema from the provided data:
- You can pass a list of tuples or dictionaries.
- Spark automatically infers column types.
-
Explicit schema: You define the structure by defining the
StructTypewith aStructFieldfor each column. Then you specify exact data types such asStringTypeorIntegerType. This ensures type safety and prevents inference errors. Also, it’s recommended for production.
StructType and StructField.
Apply transformation patterns
You can use Spark’s DataFrame, Window, and Functions API to implement common ETL patterns such as data cleaning, lookups/joins, filtering, grouping, aggregations, and time-series analysis. There are several common ways to apply transformation patterns:- DataFrame API and Functions API - Cleaning transformations:
- Use
.withColumn()to add/modify columns. - Use
.filter()to remove invalid rows.
- Use
- DataFrame API - Joining patterns:
- Use
.join()with join types such as inner, left, right, or full. - Specify join conditions explicitly.
- Optimize with broadcast joins for small lookup tables.
- Use
- DataFrame API and Functions API - Aggregation:
- Use
.groupBy()with.agg()for summarization. - Apply aggregate functions:
count(),avg(), ormax().
- Use
- Window API and Functions API - Ranking:
- Define windows with
.partitionBy()and.orderBy(). - Apply ranking functions such as
rank(). - Calculate running totals or moving averages.
- Define windows with
Write to Iceberg tables
You can use Spark’s DataFrameWriterV2 API to write data to Iceberg tables. The API supports append, overwrite, and overwritePartitions operations. To perform row-level operations like a merge, use a SQL statement. There are several common ways to write to Iceberg tables:- Use
.writeTo()instead of.write():- Designed specifically for Iceberg tables.
- Provides better control over write operations.
- Enables advanced features like merging.
- Handle partitioning:
- Partition large tables by common filter columns such as date or region.
- Use
.partitionBy()when creating tables. - Improves query performance dramatically.
- NexusOne ships with the Iceberg catalog enabled by default.
- Tables are automatically registered in the Hive metastore.
- You reference tables using fully qualified names:
iceberg.schema.table.
Submit Spark applications
You can execute Spark applications on the NexusOne cluster using either Apache Airflow, the recommended app, or Jupyter notebooks. This is helpful for ad-hoc analysis with proper resource allocation and monitoring. There are several common ways to submit Spark applications:- Use Airflow within Kubernetes:
- Package your PySpark code in a DAG.
- Use the PySpark decorator for Spark tasks.
- Airflow handles scheduling, retries, and monitoring.
- Spark jobs run as containerized pods in Kubernetes.
- Use Jupyter within Kubernetes:
- Start Spark session directly in the notebook.
- Submit jobs interactively for exploration.
- Results appear inline in notebook cells.
- Outside Kubernetes:
- Use the Kyuubi submitter for external job submission.
- Not recommended for production workflows.
- Required elements:
- Python script or notebook with PySpark code.
- Kubernetes namespace and resource limits.
- OpenLineage tracking for metadata capture.
Optimize Spark performance
You can implement performance optimization techniques such as caching DataFrames, broadcast joins for small tables, partitioning strategies, and appropriate persistence levels to accelerate job execution. There are several common ways to optimize the performance of Spark, such as using:- Cached DataFrames:
- Use
.cache()or.persist()for reused DataFrames. - Stores data in memory for faster subsequent access.
- Best for iterative algorithms or multiple actions on the same data.
- Use
- Broadcast joins:
- Use
F.broadcast()for small lookup tables less than 10 MB. - Sends small tables to all executors, avoiding shuffle.
- Dramatically speeds up joins with dimension tables.
- Use
- Partitioning strategies:
- Repartition: Increase or decrease partitions. It triggers a full shuffle.
- Coalesce: Reduce partitions without shuffle for efficiency.
- Balance: Avoid underutilizing or overutilizing partitions.
- Persistence levels:
MEMORY_ONLY: Fastest, but might spill to disk.MEMORY_AND_DISK: Safer for large datasets.DISK_ONLY: When there isn’t enough memory.- Choose based on data size and reuse pattern.
Follow ETL best practices
These are guidelines for building efficient, maintainable ETL pipelines. This includes some of the following:- Incremental loading - preferred:
- Load only new or changed data since the last run.
- Use watermarks or timestamp columns.
- Dramatically reduces processing time and costs.
- Example:
WHERE updated_at > '2025-01-01'.
- Deduplication:
- Raw data often contains duplicates.
- Use window functions with
row_number()to identify the latest records. - Or use
dropDuplicates()for simple cases. - Always deduplicate before final table writes.
- Iceberg
MERGEfor upserts:- Recommended approach for slowly changing dimensions.
- Provides ACID guarantees and efficient updates.
- Handles both inserts and updates in a single operation.
- Better than the delete and insert pattern.
- Partitioning Strategy:
- Partition by common filter columns such as date, region, or category.
- Avoid over-partitioning,
<1 GBper partition is ideal. - Use partition pruning in queries for performance.
- Consider bucketing for high-cardinality columns.
Monitor Spark jobs
You can monitor a Spark app execution through the Spark History Server UI. It provides visibility into job metrics, stage performance, executor resource utilization, and SQL query execution plans for debugging and optimization.Accessing the Spark History Server
The Spark History Server provides a web-based interface to monitor completed and running Spark applications. You can access the server on the NexusOne cluster using the following URL:client with your client’s name.
Steps to access the Spark History Server:
- Navigate to the History Server URL in your browser.
- Authenticate using your NexusOne platform credentials.
- View the app list showing all Spark jobs.
- Click on an Application ID to drill into specific job details.
- Use the search/filter bar to find applications by name or date.
- Application ID.
- Application Name. As defined in your Spark configuration.
- Spark user who submitted the job.
- Start time, completed time, and duration.
- Status such as
COMPLETED,RUNNING, orFAILED.

History Server log directory
Job and status metrics
The History Server breaks down Spark applications into Jobs and Stages, providing detailed performance metrics for troubleshooting and optimization.-
Spark jobs tab: This tab displays the following:
- A list of all jobs within the selected app.
- Job duration, number of stages, and status.
- SQL query or action that triggered each job. For example,
df.write()ordf.count().
Spark jobs tab -
Spark job stages tab: Opening a job reveals its stages. Each stage groups tasks that can run in parallel.
The tab displays the following:
- Stages: Each job consists of multiple stages, also referred to as groups of tasks.
- Stage metrics:
- Duration: Total time to complete the stage.
- Status: Succeeded, Failed, Running, Pending.
- Tasks: Total number of tasks and their success/failure counts.
- Input/Output: Data read from source and written to destination.
- Shuffle Read/Write: Data exchanged between executors.
Spark job stages tab -
Task details: Opening a stage exposes the task-level metrics, such as:
- Executor ID
- Launch time
- Duration
- Status
- Locality level
- Tasks with unusually long durations. A data skew.
- Failed tasks with retry attempts.
- Tasks with excessive shuffle or spill to disk.
Executor metrics
Executors are the worker processes that run Spark tasks.-
Executors tab: This displays every executor instance and its resource utilization:
- Executor ID: Unique identifier for each executor instance.
- Address: Host and port where the executor is running.
- Status:
Active,Dead, orLost. - RDD blocks: Number of cached data partitions.
- Storage memory: Memory used for caching DataFrames/RDDs.
- Disk used: Spill-over storage when memory is full.
- Cores: Number of CPU cores allocated.
- Active tasks: Currently running tasks on this executor.
- Failed tasks: Count of task failures on this executor.
- Complete tasks: Successfully finished tasks.
- Total tasks: Cumulative task count.
Spark executors tab -
Key performance indicators: Within the executor tab, these are some of the KPIs:
- CPU utilization: High CPU utilization indicates compute-intensive operations.
- Memory usage: Monitor used vs available storage and execution memory.
-
GC time: Garbage collection overhead. It should be
< 10%of task time. -
Shuffle metrics:
- Shuffle read: Data pulled from other executors.
- Shuffle write: Data sent to other executors.
SQL query execution details
For Spark SQL queries and DataFrame operations, the History Server provides query execution plans and optimization details.-
SQL/DataFrame tab: This tab displays the following:
- All SQL queries executed in the app.
- Query text and description.
- Submission time and duration.
- Number of jobs triggered by the query.
SQL/DataFrame tab -
Execution plan details: These show how Spark runs a SQL query or DataFrame operation:
- Logical plan: High-level query structure before optimization.
- Physical plan: Actual execution strategy chosen by Spark.
- Code generation: Shows generated Java code for advanced debugging.
- Metrics per operator:
- Number of rows processed.
- Data size scanned.
- Time spent in each operation.
- Partition pruning effectiveness.
-
Common plan nodes to monitor: These operators reveal most of the performance behavior:
- Scan: Reads data from the source, then checks partition filters.
- Filter: Removes unnecessary rows. This should happen early.
- Exchange: Shuffle operations. It’s expensive, so minimize it when possible.
- HashAggregate: Groups and aggregates data efficiently.
- BroadcastHashJoin: Efficiently joins a small table with broadcast.
- SortMergeJoin: Joins large tables by sorting and shuffling. It’s more expensive.
SQL query execution details
Accessing driver and executor logs
Logs provide detailed diagnostic information for debugging failures, errors, and unexpected behavior. Steps to access the driver and executor logs:- Navigate to the Executors tab.
- Click “stderr” or “stdout” links for each executor.
- Driver logs appear under the “driver” executor entry.
- Download logs for offline analysis.
INFO: General execution flow and stage completion.WARN: Potential issues. For example, slow tasks or retries.ERROR: Failures requiring attention.DEBUG: Detailed internal operations.
OutOfMemoryError: Increase executor memory or reduce data per partition.FileNotFoundException: Check S3 paths and access permissions.org.apache.spark.shuffle.FetchFailedException: Executor lost during shuffle.java.io.IOException: No space left on device: Increase the disk space.
Spark for data warehouse
Data Warehouse workflow uses Spark for querying data, exploring datasets, and performing lightweight transformations. These tasks focus on SQL-based analysis, interactive exploration, and generating insights from lakehouse data. The following sections describe data warehouse tasks you can perform using Apache Spark on NexusOne.Query data using Spark SQL
You can execute SQL queries against Iceberg tables in the lakehouse usingspark.sql(),
with support for creating temporary views for multi-step analysis.
There are several common ways to query data using Spark SQL, such as:
- Direct SQL queries:
- Use
spark.sql("SELECT ...")to run any SQL query. - Query Iceberg tables using fully qualified names.
- Results return as Spark DataFrames for further processing.
- Use
- Create temporary views:
- Use
.createOrReplaceTempView()to name DataFrames. - Reference views in subsequent SQL queries.
- Views exist only within the current Spark session.
- Use
- Common SQL operations:
SELECT,WHERE,JOIN,GROUP BY, orORDER BY.- Window functions, CTEs, subqueries.
- Aggregate functions:
SUM,AVG, orCOUNT.
employees table and then query a temporary view of customers.
Query Iceberg tables in the lakehouse
You can access tables through the Iceberg catalog using Spark SQL with support for selecting, filtering, ordering, and leveraging Iceberg’s advanced features like time travel. There are several common ways to query Iceberg tables in the lakehouse, such as:- Table access pattern:
- Use fully qualified names:
catalog.schema.table. - The default catalog is
icebergin NexusOne.- For example,
iceberg.prod.sales.
- For example,
- Use fully qualified names:
- Basic query operations:
SELECTspecific columns or all columns*.- Filter with
WHEREclauses. - Sort with
ORDER BY. - Limit results with
LIMIT.
- Leverage Iceberg features:
- Time travel: Query historical snapshots.
- Partition pruning: Filter on partition columns for speed.
- Schema evolution: Tables adapt to schema changes.
orders, checking sales data as of a past date, and efficiently
querying events by partition.
Explore data in Jupyter Notebooks
Jupyter Notebooks provides methods for sampling datasets, converting Spark DataFrames to Pandas for visualization, and previewing data in notebook environments. There are several common ways to explore data in Jupyter Notebooks, such as:- Sampling:
- Use
.sample(fraction)for random sampling. - Use
.limit(n)for the first N rows. - Balance between representative data and performance.
- Use
- Conversion to Pandas:
- Use
.toPandas()to convert Spark DataFrame. - Warning: Only convert small datasets
< 100K rows. - Pandas enables rich visualizations with matplotlib/seaborn.
- Use
- Preview methods:
.show(n): Displays a formatted table in the console..display(): Rich display in notebooks such as those in Jupyter or Databricks..take(n): Returns the first N rows as a list.
- Descriptive statistics:
.describe(): Summary statistics for numeric columns..printSchema(): View column types..count(): Get total row count.
Perform lightweight transformations
You can apply simple filtering, grouping, and cleaning operations using the DataFrame API or SQL for exploratory analysis and quick reporting. There are several common ways to perform lightweight transformations, such as:- Filtering data:
- Use
.filter()or.where()with conditions. - Chain multiple conditions using
&forANDor|forOR. - SQL: Use
WHEREclause with multiple predicates.
- Use
- Grouping and aggregation:
- Use
.groupBy().agg()in DataFrame API. - SQL:
GROUP BYwith aggregate functions. - Common aggregates:
COUNT,SUM,AVG,MIN, orMAX.
- Use
- Simple cleaning:
- Remove nulls with
.filter(col.isNotNull()). - Rename columns with
.withColumnRenamed(). - Add computed columns with
.withColumn().
- Remove nulls with
Optimize queries for performance
You can write efficient queries by selecting only needed columns, partition filtering, and avoiding expensive operations like wide shuffles. There are several common ways to optimize queries for performance, such as:- Using projection, a way to select specific columns:
- Reduces data transfer and memory usage.
- Especially important with wide tables with many columns.
- Leveraging partition filtering:
- Filter on partition columns, usually date-based.
- Dramatically reduces the data scanned.
- Put partition filters in
WHEREclause.
- Avoiding wide shuffles:
- Minimize joins on high-cardinality columns.
- Use broadcast joins for small tables.
- Be careful with
GROUP BYon many distinct values.
- Predicate pushdown and selective retrieval:
- Filter early in the query to push down predicates.
- Use
LIMITwhen exploring data. - Avoid
SELECT *in production queries.