Source code for pyathena.spark.async_cursor

# -*- coding: utf-8 -*-
import logging
from concurrent.futures import Future, ThreadPoolExecutor
from multiprocessing import cpu_count
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast

from pyathena.model import AthenaCalculationExecution
from pyathena.spark.common import SparkBaseCursor

if TYPE_CHECKING:
    from pyathena.model import AthenaQueryExecution

_logger = logging.getLogger(__name__)  # type: ignore


[docs] class AsyncSparkCursor(SparkBaseCursor): """Asynchronous cursor for executing PySpark code on Amazon Athena for Apache Spark. This cursor provides asynchronous execution of PySpark code on Athena's managed Spark environment. It's designed for non-blocking big data processing, ETL operations, and machine learning workloads that require Spark's distributed computing capabilities without blocking the main thread. Features: - Asynchronous PySpark code execution with concurrent futures - Non-blocking query submission and result polling - Managed Spark sessions with configurable resources - Access to standard output and error streams asynchronously - Automatic session lifecycle management - Thread pool executor for concurrent operations Attributes: max_workers: Maximum number of worker threads for async operations. session_id: The Athena Spark session ID. engine_configuration: Spark engine configuration settings. Example: >>> import asyncio >>> from pyathena.spark.async_cursor import AsyncSparkCursor >>> >>> cursor = connection.cursor( ... AsyncSparkCursor, ... engine_configuration={ ... 'CoordinatorDpuSize': 1, ... 'MaxConcurrentDpus': 20 ... } ... ) >>> >>> # Execute PySpark code asynchronously >>> spark_code = ''' ... df = spark.read.table("my_database.my_table") ... result = df.groupBy("category").count() ... result.show() ... ''' >>> calculation_id, future = cursor.execute(spark_code) >>> >>> # Get result when ready >>> calc_execution = await future >>> stdout_future = cursor.get_std_out(calc_execution) >>> if stdout_future: ... output = await stdout_future ... print(output) Note: Requires an Athena workgroup configured for Spark calculations. Spark sessions have associated costs and idle timeout settings. The cursor manages a thread pool for asynchronous operations. """
[docs] def __init__( self, session_id: Optional[str] = None, description: Optional[str] = None, engine_configuration: Optional[Dict[str, Any]] = None, notebook_version: Optional[str] = None, session_idle_timeout_minutes: Optional[int] = None, max_workers: int = (cpu_count() or 1) * 5, **kwargs, ): super().__init__( session_id=session_id, description=description, engine_configuration=engine_configuration, notebook_version=notebook_version, session_idle_timeout_minutes=session_idle_timeout_minutes, **kwargs, ) self._max_workers = max_workers self._executor = ThreadPoolExecutor(max_workers=max_workers)
[docs] def close(self, wait: bool = False) -> None: super().close() self._executor.shutdown(wait=wait)
[docs] def calculation_execution(self, query_id: str) -> "Future[AthenaCalculationExecution]": return self._executor.submit(self._get_calculation_execution, query_id)
[docs] def get_std_out( self, calculation_execution: AthenaCalculationExecution ) -> "Optional[Future[str]]": if not calculation_execution.std_out_s3_uri: return None return self._executor.submit( self._read_s3_file_as_text, calculation_execution.std_out_s3_uri )
[docs] def get_std_error( self, calculation_execution: AthenaCalculationExecution ) -> "Optional[Future[str]]": if not calculation_execution.std_error_s3_uri: return None return self._executor.submit( self._read_s3_file_as_text, calculation_execution.std_error_s3_uri )
[docs] def poll(self, query_id: str) -> "Future[AthenaCalculationExecution]": return cast( "Future[AthenaCalculationExecution]", self._executor.submit(self._poll, query_id) )
[docs] def execute( self, operation: str, parameters: Optional[Union[Dict[str, Any], List[str]]] = None, session_id: Optional[str] = None, description: Optional[str] = None, client_request_token: Optional[str] = None, work_group: Optional[str] = None, **kwargs, ) -> Tuple[str, "Future[Union[AthenaQueryExecution, AthenaCalculationExecution]]"]: calculation_id = self._calculate( session_id=session_id if session_id else self._session_id, code_block=operation, description=description, client_request_token=client_request_token, ) return calculation_id, self._executor.submit(self._poll, calculation_id)
[docs] def cancel(self, query_id: str) -> "Future[None]": return self._executor.submit(self._cancel, query_id)