Spark Integration

This section covers Apache Spark-specific cursors and execution management.

Spark Cursors

class pyathena.spark.cursor.SparkCursor(session_id: str | None = None, description: str | None = None, engine_configuration: Dict[str, Any] | None = None, notebook_version: str | None = None, session_idle_timeout_minutes: int | None = None, **kwargs)[source]

Cursor for executing PySpark code on Amazon Athena for Apache Spark.

This cursor allows you to execute PySpark code directly on Athena’s managed Spark environment. It’s designed for big data processing, ETL operations, and machine learning workloads that require Spark’s distributed computing capabilities.

The cursor manages Spark sessions automatically and provides an interface similar to other PyAthena cursors but optimized for Spark calculations rather than SQL queries.

session_id

The Athena Spark session ID.

description

Optional description for the Spark session.

engine_configuration

Spark engine configuration settings.

calculation_id

ID of the current calculation being executed.

Example

>>> from pyathena.spark.cursor import SparkCursor
>>> cursor = connection.cursor(SparkCursor)
>>>
>>> # Execute PySpark code
>>> spark_code = '''
... df = spark.read.table("my_database.my_table")
... result = df.groupBy("category").count()
... result.show()
... '''
>>> cursor.execute(spark_code)
>>> result = cursor.fetchall()

# Configure Spark session >>> cursor = connection.cursor( … SparkCursor, … engine_configuration={ … ‘CoordinatorDpuSize’: 1, … ‘MaxConcurrentDpus’: 20, … ‘DefaultExecutorDpuSize’: 1 … } … )

Note

Requires an Athena workgroup configured for Spark calculations. Spark sessions have associated costs and idle timeout settings.

__init__(session_id: str | None = None, description: str | None = None, engine_configuration: Dict[str, Any] | None = None, notebook_version: str | None = None, session_idle_timeout_minutes: int | None = None, **kwargs) None[source]
property calculation_execution: AthenaCalculationExecution | None
get_std_out() str | None[source]

Get the standard output from the Spark calculation execution.

Retrieves and returns the contents of the standard output generated during the Spark calculation execution, if available.

Returns:

The standard output as a string, or None if no output is available or the calculation has not been executed.

get_std_error() str | None[source]

Get the standard error from the Spark calculation execution.

Retrieves and returns the contents of the standard error generated during the Spark calculation execution, if available. This is useful for debugging failed or problematic Spark operations.

Returns:

The standard error as a string, or None if no error output is available or the calculation has not been executed.

execute(operation: str, parameters: Dict[str, Any] | List[str] | None = None, session_id: str | None = None, description: str | None = None, client_request_token: str | None = None, work_group: str | None = None, **kwargs) SparkCursor[source]
cancel() None[source]
LIST_DATABASES_MAX_RESULTS = 50
LIST_QUERY_EXECUTIONS_MAX_RESULTS = 50
LIST_TABLE_METADATA_MAX_RESULTS = 50
property calculation_id: str | None
close() None
property completion_date_time: datetime | None
property connection: Connection[Any]
property description: str | None
property dpu_execution_in_millis: int | None
executemany(operation: str, seq_of_parameters: List[Dict[str, Any] | List[str] | None], **kwargs) None
static get_default_converter(unload: bool = False) DefaultTypeConverter | Any

Get the default type converter for this cursor class.

Parameters:

unload – Whether the converter is for UNLOAD operations. Some cursor types may return different converters for UNLOAD operations.

Returns:

The default type converter instance for this cursor type.

static get_default_engine_configuration() Dict[str, Any]
get_table_metadata(table_name: str, catalog_name: str | None = None, schema_name: str | None = None, logging_: bool = True) AthenaTableMetadata
list_databases(catalog_name: str | None, max_results: int | None = None) List[AthenaDatabase]
list_table_metadata(catalog_name: str | None = None, schema_name: str | None = None, expression: str | None = None, max_results: int | None = None) List[AthenaTableMetadata]
property progress: str | None
property result_s3_uri: str | None
property result_type: str | None
property session_id: str
setinputsizes(sizes)

Does nothing by default

setoutputsize(size, column=None)

Does nothing by default

property state: str | None
property state_change_reason: str | None
property std_error_s3_uri: str | None
property std_out_s3_uri: str | None
property submission_date_time: datetime | None
property working_directory: str | None
class pyathena.spark.async_cursor.AsyncSparkCursor(session_id: str | None = None, description: str | None = None, engine_configuration: Dict[str, Any] | None = None, notebook_version: str | None = None, session_idle_timeout_minutes: int | None = None, max_workers: int = 20, **kwargs)[source]

Asynchronous cursor for executing PySpark code on Amazon Athena for Apache Spark.

This cursor provides asynchronous execution of PySpark code on Athena’s managed Spark environment. It’s designed for non-blocking big data processing, ETL operations, and machine learning workloads that require Spark’s distributed computing capabilities without blocking the main thread.

Features:
  • Asynchronous PySpark code execution with concurrent futures

  • Non-blocking query submission and result polling

  • Managed Spark sessions with configurable resources

  • Access to standard output and error streams asynchronously

  • Automatic session lifecycle management

  • Thread pool executor for concurrent operations

max_workers

Maximum number of worker threads for async operations.

session_id

The Athena Spark session ID.

engine_configuration

Spark engine configuration settings.

Example

>>> import asyncio
>>> from pyathena.spark.async_cursor import AsyncSparkCursor
>>>
>>> cursor = connection.cursor(
...     AsyncSparkCursor,
...     engine_configuration={
...         'CoordinatorDpuSize': 1,
...         'MaxConcurrentDpus': 20
...     }
... )
>>>
>>> # Execute PySpark code asynchronously
>>> spark_code = '''
... df = spark.read.table("my_database.my_table")
... result = df.groupBy("category").count()
... result.show()
... '''
>>> calculation_id, future = cursor.execute(spark_code)
>>>
>>> # Get result when ready
>>> calc_execution = await future
>>> stdout_future = cursor.get_std_out(calc_execution)
>>> if stdout_future:
...     output = await stdout_future
...     print(output)

Note

Requires an Athena workgroup configured for Spark calculations. Spark sessions have associated costs and idle timeout settings. The cursor manages a thread pool for asynchronous operations.

__init__(session_id: str | None = None, description: str | None = None, engine_configuration: Dict[str, Any] | None = None, notebook_version: str | None = None, session_idle_timeout_minutes: int | None = None, max_workers: int = 20, **kwargs)[source]
close(wait: bool = False) None[source]
calculation_execution(query_id: str) Future[AthenaCalculationExecution][source]
get_std_out(calculation_execution: AthenaCalculationExecution) Future[str] | None[source]
get_std_error(calculation_execution: AthenaCalculationExecution) Future[str] | None[source]
poll(query_id: str) Future[AthenaCalculationExecution][source]
execute(operation: str, parameters: Dict[str, Any] | List[str] | None = None, session_id: str | None = None, description: str | None = None, client_request_token: str | None = None, work_group: str | None = None, **kwargs) Tuple[str, Future[AthenaQueryExecution | AthenaCalculationExecution]][source]
cancel(query_id: str) Future[None][source]
LIST_DATABASES_MAX_RESULTS = 50
LIST_QUERY_EXECUTIONS_MAX_RESULTS = 50
LIST_TABLE_METADATA_MAX_RESULTS = 50
property calculation_id: str | None
property connection: Connection[Any]
executemany(operation: str, seq_of_parameters: List[Dict[str, Any] | List[str] | None], **kwargs) None
static get_default_converter(unload: bool = False) DefaultTypeConverter | Any

Get the default type converter for this cursor class.

Parameters:

unload – Whether the converter is for UNLOAD operations. Some cursor types may return different converters for UNLOAD operations.

Returns:

The default type converter instance for this cursor type.

static get_default_engine_configuration() Dict[str, Any]
get_table_metadata(table_name: str, catalog_name: str | None = None, schema_name: str | None = None, logging_: bool = True) AthenaTableMetadata
list_databases(catalog_name: str | None, max_results: int | None = None) List[AthenaDatabase]
list_table_metadata(catalog_name: str | None = None, schema_name: str | None = None, expression: str | None = None, max_results: int | None = None) List[AthenaTableMetadata]
property session_id: str
setinputsizes(sizes)

Does nothing by default

setoutputsize(size, column=None)

Does nothing by default

Spark Base Classes

class pyathena.spark.common.SparkBaseCursor(session_id: str | None = None, description: str | None = None, engine_configuration: Dict[str, Any] | None = None, notebook_version: str | None = None, session_idle_timeout_minutes: int | None = None, **kwargs)[source]

Abstract base class for Spark-enabled cursor implementations.

This class provides the foundational functionality for executing PySpark code on Amazon Athena for Apache Spark. It manages Spark sessions, handles calculation execution lifecycle, and provides utilities for reading results from S3.

Features:
  • Automatic Spark session management and lifecycle

  • Configurable engine resources (DPU allocation)

  • Session idle timeout and automatic cleanup

  • Standard output and error stream access via S3

  • Calculation execution status monitoring

  • Session validation and error handling

session_id

The Athena Spark session identifier.

calculation_id

ID of the current calculation being executed.

engine_configuration

DPU and resource configuration for Spark.

Note

This is an abstract base class used by concrete Spark cursor implementations like SparkCursor and AsyncSparkCursor. It should not be instantiated directly.

__init__(session_id: str | None = None, description: str | None = None, engine_configuration: Dict[str, Any] | None = None, notebook_version: str | None = None, session_idle_timeout_minutes: int | None = None, **kwargs) None[source]
property session_id: str
property calculation_id: str | None
static get_default_engine_configuration() Dict[str, Any][source]
close() None[source]
executemany(operation: str, seq_of_parameters: List[Dict[str, Any] | List[str] | None], **kwargs) None[source]
LIST_DATABASES_MAX_RESULTS = 50
LIST_QUERY_EXECUTIONS_MAX_RESULTS = 50
LIST_TABLE_METADATA_MAX_RESULTS = 50
property connection: Connection[Any]
abstract execute(operation: str, parameters: Dict[str, Any] | List[str] | None = None, **kwargs)
static get_default_converter(unload: bool = False) DefaultTypeConverter | Any

Get the default type converter for this cursor class.

Parameters:

unload – Whether the converter is for UNLOAD operations. Some cursor types may return different converters for UNLOAD operations.

Returns:

The default type converter instance for this cursor type.

get_table_metadata(table_name: str, catalog_name: str | None = None, schema_name: str | None = None, logging_: bool = True) AthenaTableMetadata
list_databases(catalog_name: str | None, max_results: int | None = None) List[AthenaDatabase]
list_table_metadata(catalog_name: str | None = None, schema_name: str | None = None, expression: str | None = None, max_results: int | None = None) List[AthenaTableMetadata]
setinputsizes(sizes)

Does nothing by default

setoutputsize(size, column=None)

Does nothing by default

class pyathena.spark.common.WithCalculationExecution[source]

Mixin class providing access to Spark calculation execution properties.

This mixin provides property accessors for calculation execution metadata and status information. It’s designed to be mixed with cursor classes that execute Spark calculations on Athena.

Properties:
  • description: Human-readable description of the calculation

  • working_directory: S3 path where calculation files are stored

  • state: Current execution state (COMPLETED, FAILED, etc.)

  • state_change_reason: Explanation for state changes

  • submission_date_time: When the calculation was submitted

  • completion_date_time: When the calculation completed

  • dpu_execution_in_millis: DPU execution time in milliseconds

  • progress: Current execution progress information

  • std_out_s3_uri: S3 URI for standard output

  • std_error_s3_uri: S3 URI for standard error

  • result_s3_uri: S3 URI for calculation results

  • result_type: Type of result produced by the calculation

Note

This class requires that the implementing class provides calculation_execution, session_id, and calculation_id properties.

__init__()[source]
abstract property calculation_execution: AthenaCalculationExecution | None
abstract property session_id: str
abstract property calculation_id: str | None
property description: str | None
property working_directory: str | None
property state: str | None
property state_change_reason: str | None
property submission_date_time: datetime | None
property completion_date_time: datetime | None
property dpu_execution_in_millis: int | None
property progress: str | None
property std_out_s3_uri: str | None
property std_error_s3_uri: str | None
property result_s3_uri: str | None
property result_type: str | None