Spark Integration¶
This section covers Apache Spark-specific cursors and execution management.
Spark Cursors¶
- class pyathena.spark.cursor.SparkCursor(session_id: str | None = None, description: str | None = None, engine_configuration: Dict[str, Any] | None = None, notebook_version: str | None = None, session_idle_timeout_minutes: int | None = None, **kwargs)[source]¶
Cursor for executing PySpark code on Amazon Athena for Apache Spark.
This cursor allows you to execute PySpark code directly on Athena’s managed Spark environment. It’s designed for big data processing, ETL operations, and machine learning workloads that require Spark’s distributed computing capabilities.
The cursor manages Spark sessions automatically and provides an interface similar to other PyAthena cursors but optimized for Spark calculations rather than SQL queries.
- session_id¶
The Athena Spark session ID.
- description¶
Optional description for the Spark session.
- engine_configuration¶
Spark engine configuration settings.
- calculation_id¶
ID of the current calculation being executed.
Example
>>> from pyathena.spark.cursor import SparkCursor >>> cursor = connection.cursor(SparkCursor) >>> >>> # Execute PySpark code >>> spark_code = ''' ... df = spark.read.table("my_database.my_table") ... result = df.groupBy("category").count() ... result.show() ... ''' >>> cursor.execute(spark_code) >>> result = cursor.fetchall()
# Configure Spark session >>> cursor = connection.cursor( … SparkCursor, … engine_configuration={ … ‘CoordinatorDpuSize’: 1, … ‘MaxConcurrentDpus’: 20, … ‘DefaultExecutorDpuSize’: 1 … } … )
Note
Requires an Athena workgroup configured for Spark calculations. Spark sessions have associated costs and idle timeout settings.
- __init__(session_id: str | None = None, description: str | None = None, engine_configuration: Dict[str, Any] | None = None, notebook_version: str | None = None, session_idle_timeout_minutes: int | None = None, **kwargs) None [source]¶
- property calculation_execution: AthenaCalculationExecution | None¶
- get_std_out() str | None [source]¶
Get the standard output from the Spark calculation execution.
Retrieves and returns the contents of the standard output generated during the Spark calculation execution, if available.
- Returns:
The standard output as a string, or None if no output is available or the calculation has not been executed.
- get_std_error() str | None [source]¶
Get the standard error from the Spark calculation execution.
Retrieves and returns the contents of the standard error generated during the Spark calculation execution, if available. This is useful for debugging failed or problematic Spark operations.
- Returns:
The standard error as a string, or None if no error output is available or the calculation has not been executed.
- execute(operation: str, parameters: Dict[str, Any] | List[str] | None = None, session_id: str | None = None, description: str | None = None, client_request_token: str | None = None, work_group: str | None = None, **kwargs) SparkCursor [source]¶
- LIST_DATABASES_MAX_RESULTS = 50¶
- LIST_QUERY_EXECUTIONS_MAX_RESULTS = 50¶
- LIST_TABLE_METADATA_MAX_RESULTS = 50¶
- property connection: Connection[Any]¶
- executemany(operation: str, seq_of_parameters: List[Dict[str, Any] | List[str] | None], **kwargs) None ¶
- static get_default_converter(unload: bool = False) DefaultTypeConverter | Any ¶
Get the default type converter for this cursor class.
- Parameters:
unload – Whether the converter is for UNLOAD operations. Some cursor types may return different converters for UNLOAD operations.
- Returns:
The default type converter instance for this cursor type.
- get_table_metadata(table_name: str, catalog_name: str | None = None, schema_name: str | None = None, logging_: bool = True) AthenaTableMetadata ¶
- list_table_metadata(catalog_name: str | None = None, schema_name: str | None = None, expression: str | None = None, max_results: int | None = None) List[AthenaTableMetadata] ¶
- setinputsizes(sizes)¶
Does nothing by default
- setoutputsize(size, column=None)¶
Does nothing by default
- class pyathena.spark.async_cursor.AsyncSparkCursor(session_id: str | None = None, description: str | None = None, engine_configuration: Dict[str, Any] | None = None, notebook_version: str | None = None, session_idle_timeout_minutes: int | None = None, max_workers: int = 20, **kwargs)[source]¶
Asynchronous cursor for executing PySpark code on Amazon Athena for Apache Spark.
This cursor provides asynchronous execution of PySpark code on Athena’s managed Spark environment. It’s designed for non-blocking big data processing, ETL operations, and machine learning workloads that require Spark’s distributed computing capabilities without blocking the main thread.
- Features:
Asynchronous PySpark code execution with concurrent futures
Non-blocking query submission and result polling
Managed Spark sessions with configurable resources
Access to standard output and error streams asynchronously
Automatic session lifecycle management
Thread pool executor for concurrent operations
- max_workers¶
Maximum number of worker threads for async operations.
- session_id¶
The Athena Spark session ID.
- engine_configuration¶
Spark engine configuration settings.
Example
>>> import asyncio >>> from pyathena.spark.async_cursor import AsyncSparkCursor >>> >>> cursor = connection.cursor( ... AsyncSparkCursor, ... engine_configuration={ ... 'CoordinatorDpuSize': 1, ... 'MaxConcurrentDpus': 20 ... } ... ) >>> >>> # Execute PySpark code asynchronously >>> spark_code = ''' ... df = spark.read.table("my_database.my_table") ... result = df.groupBy("category").count() ... result.show() ... ''' >>> calculation_id, future = cursor.execute(spark_code) >>> >>> # Get result when ready >>> calc_execution = await future >>> stdout_future = cursor.get_std_out(calc_execution) >>> if stdout_future: ... output = await stdout_future ... print(output)
Note
Requires an Athena workgroup configured for Spark calculations. Spark sessions have associated costs and idle timeout settings. The cursor manages a thread pool for asynchronous operations.
- __init__(session_id: str | None = None, description: str | None = None, engine_configuration: Dict[str, Any] | None = None, notebook_version: str | None = None, session_idle_timeout_minutes: int | None = None, max_workers: int = 20, **kwargs)[source]¶
- calculation_execution(query_id: str) Future[AthenaCalculationExecution] [source]¶
- get_std_out(calculation_execution: AthenaCalculationExecution) Future[str] | None [source]¶
- get_std_error(calculation_execution: AthenaCalculationExecution) Future[str] | None [source]¶
- poll(query_id: str) Future[AthenaCalculationExecution] [source]¶
- execute(operation: str, parameters: Dict[str, Any] | List[str] | None = None, session_id: str | None = None, description: str | None = None, client_request_token: str | None = None, work_group: str | None = None, **kwargs) Tuple[str, Future[AthenaQueryExecution | AthenaCalculationExecution]] [source]¶
- LIST_DATABASES_MAX_RESULTS = 50¶
- LIST_QUERY_EXECUTIONS_MAX_RESULTS = 50¶
- LIST_TABLE_METADATA_MAX_RESULTS = 50¶
- property connection: Connection[Any]¶
- executemany(operation: str, seq_of_parameters: List[Dict[str, Any] | List[str] | None], **kwargs) None ¶
- static get_default_converter(unload: bool = False) DefaultTypeConverter | Any ¶
Get the default type converter for this cursor class.
- Parameters:
unload – Whether the converter is for UNLOAD operations. Some cursor types may return different converters for UNLOAD operations.
- Returns:
The default type converter instance for this cursor type.
- get_table_metadata(table_name: str, catalog_name: str | None = None, schema_name: str | None = None, logging_: bool = True) AthenaTableMetadata ¶
- list_table_metadata(catalog_name: str | None = None, schema_name: str | None = None, expression: str | None = None, max_results: int | None = None) List[AthenaTableMetadata] ¶
- setinputsizes(sizes)¶
Does nothing by default
- setoutputsize(size, column=None)¶
Does nothing by default
Spark Base Classes¶
- class pyathena.spark.common.SparkBaseCursor(session_id: str | None = None, description: str | None = None, engine_configuration: Dict[str, Any] | None = None, notebook_version: str | None = None, session_idle_timeout_minutes: int | None = None, **kwargs)[source]¶
Abstract base class for Spark-enabled cursor implementations.
This class provides the foundational functionality for executing PySpark code on Amazon Athena for Apache Spark. It manages Spark sessions, handles calculation execution lifecycle, and provides utilities for reading results from S3.
- Features:
Automatic Spark session management and lifecycle
Configurable engine resources (DPU allocation)
Session idle timeout and automatic cleanup
Standard output and error stream access via S3
Calculation execution status monitoring
Session validation and error handling
- session_id¶
The Athena Spark session identifier.
- calculation_id¶
ID of the current calculation being executed.
- engine_configuration¶
DPU and resource configuration for Spark.
Note
This is an abstract base class used by concrete Spark cursor implementations like SparkCursor and AsyncSparkCursor. It should not be instantiated directly.
- __init__(session_id: str | None = None, description: str | None = None, engine_configuration: Dict[str, Any] | None = None, notebook_version: str | None = None, session_idle_timeout_minutes: int | None = None, **kwargs) None [source]¶
- executemany(operation: str, seq_of_parameters: List[Dict[str, Any] | List[str] | None], **kwargs) None [source]¶
- LIST_DATABASES_MAX_RESULTS = 50¶
- LIST_QUERY_EXECUTIONS_MAX_RESULTS = 50¶
- LIST_TABLE_METADATA_MAX_RESULTS = 50¶
- property connection: Connection[Any]¶
- static get_default_converter(unload: bool = False) DefaultTypeConverter | Any ¶
Get the default type converter for this cursor class.
- Parameters:
unload – Whether the converter is for UNLOAD operations. Some cursor types may return different converters for UNLOAD operations.
- Returns:
The default type converter instance for this cursor type.
- get_table_metadata(table_name: str, catalog_name: str | None = None, schema_name: str | None = None, logging_: bool = True) AthenaTableMetadata ¶
- list_table_metadata(catalog_name: str | None = None, schema_name: str | None = None, expression: str | None = None, max_results: int | None = None) List[AthenaTableMetadata] ¶
- setinputsizes(sizes)¶
Does nothing by default
- setoutputsize(size, column=None)¶
Does nothing by default
- class pyathena.spark.common.WithCalculationExecution[source]¶
Mixin class providing access to Spark calculation execution properties.
This mixin provides property accessors for calculation execution metadata and status information. It’s designed to be mixed with cursor classes that execute Spark calculations on Athena.
- Properties:
description: Human-readable description of the calculation
working_directory: S3 path where calculation files are stored
state: Current execution state (COMPLETED, FAILED, etc.)
state_change_reason: Explanation for state changes
submission_date_time: When the calculation was submitted
completion_date_time: When the calculation completed
dpu_execution_in_millis: DPU execution time in milliseconds
progress: Current execution progress information
std_out_s3_uri: S3 URI for standard output
std_error_s3_uri: S3 URI for standard error
result_s3_uri: S3 URI for calculation results
result_type: Type of result produced by the calculation
Note
This class requires that the implementing class provides calculation_execution, session_id, and calculation_id properties.
- abstract property calculation_execution: AthenaCalculationExecution | None¶