# -*- coding: utf-8 -*-
from __future__ import annotations
import logging
from concurrent.futures import Future
from multiprocessing import cpu_count
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, cast
from pyathena import ProgrammingError
from pyathena.async_cursor import AsyncCursor
from pyathena.common import CursorIterator
from pyathena.model import AthenaCompression, AthenaFileFormat, AthenaQueryExecution
from pyathena.pandas.converter import (
DefaultPandasTypeConverter,
DefaultPandasUnloadTypeConverter,
)
from pyathena.pandas.result_set import AthenaPandasResultSet
_logger = logging.getLogger(__name__) # type: ignore
[docs]
class AsyncPandasCursor(AsyncCursor):
"""Asynchronous cursor that returns results as pandas DataFrames.
This cursor extends AsyncCursor to provide asynchronous query execution
with results returned as pandas DataFrames. It's designed for data analysis
workflows where pandas integration is required and non-blocking query
execution is beneficial.
Features:
- Asynchronous query execution with concurrent futures
- Direct pandas DataFrame results for data analysis
- Configurable CSV and Parquet engines for optimal performance
- Support for chunked processing of large datasets
- UNLOAD operations for improved performance with large results
- Memory optimization through configurable chunking
Attributes:
arraysize: Number of rows to fetch per batch.
engine: Parsing engine ('auto', 'c', 'python', 'pyarrow').
chunksize: Number of rows per chunk for large datasets.
Example:
>>> import asyncio
>>> from pyathena.pandas.async_cursor import AsyncPandasCursor
>>>
>>> cursor = connection.cursor(AsyncPandasCursor, chunksize=10000)
>>> query_id, future = cursor.execute("SELECT * FROM large_table")
>>>
>>> # Get result when ready
>>> result_set = await future
>>> df = result_set.as_pandas()
>>>
>>> # Or iterate through chunks for large datasets
>>> for chunk_df in result_set:
... process_chunk(chunk_df)
Note:
Requires pandas to be installed. For large datasets, consider
using chunksize or UNLOAD operations for better memory efficiency.
"""
[docs]
def __init__(
self,
s3_staging_dir: Optional[str] = None,
schema_name: Optional[str] = None,
catalog_name: Optional[str] = None,
work_group: Optional[str] = None,
poll_interval: float = 1,
encryption_option: Optional[str] = None,
kms_key: Optional[str] = None,
kill_on_interrupt: bool = True,
max_workers: int = (cpu_count() or 1) * 5,
arraysize: int = CursorIterator.DEFAULT_FETCH_SIZE,
unload: bool = False,
engine: str = "auto",
chunksize: Optional[int] = None,
result_reuse_enable: bool = False,
result_reuse_minutes: int = CursorIterator.DEFAULT_RESULT_REUSE_MINUTES,
**kwargs,
) -> None:
super().__init__(
s3_staging_dir=s3_staging_dir,
schema_name=schema_name,
catalog_name=catalog_name,
work_group=work_group,
poll_interval=poll_interval,
encryption_option=encryption_option,
kms_key=kms_key,
kill_on_interrupt=kill_on_interrupt,
max_workers=max_workers,
arraysize=arraysize,
result_reuse_enable=result_reuse_enable,
result_reuse_minutes=result_reuse_minutes,
**kwargs,
)
self._unload = unload
self._engine = engine
self._chunksize = chunksize
[docs]
@staticmethod
def get_default_converter(
unload: bool = False,
) -> Union[DefaultPandasTypeConverter, Any]:
if unload:
return DefaultPandasUnloadTypeConverter()
return DefaultPandasTypeConverter()
@property
def arraysize(self) -> int:
return self._arraysize
@arraysize.setter
def arraysize(self, value: int) -> None:
if value <= 0:
raise ProgrammingError("arraysize must be a positive integer value.")
self._arraysize = value
def _collect_result_set(
self,
query_id: str,
keep_default_na: bool = False,
na_values: Optional[Iterable[str]] = ("",),
quoting: int = 1,
unload_location: Optional[str] = None,
kwargs: Optional[Dict[str, Any]] = None,
) -> AthenaPandasResultSet:
if kwargs is None:
kwargs = {}
query_execution = cast(AthenaQueryExecution, self._poll(query_id))
return AthenaPandasResultSet(
connection=self._connection,
converter=self._converter,
query_execution=query_execution,
arraysize=self._arraysize,
retry_config=self._retry_config,
keep_default_na=keep_default_na,
na_values=na_values,
quoting=quoting,
unload=self._unload,
unload_location=unload_location,
engine=kwargs.pop("engine", self._engine),
chunksize=kwargs.pop("chunksize", self._chunksize),
**kwargs,
)
[docs]
def execute(
self,
operation: str,
parameters: Optional[Union[Dict[str, Any], List[str]]] = None,
work_group: Optional[str] = None,
s3_staging_dir: Optional[str] = None,
cache_size: Optional[int] = 0,
cache_expiration_time: Optional[int] = 0,
result_reuse_enable: Optional[bool] = None,
result_reuse_minutes: Optional[int] = None,
paramstyle: Optional[str] = None,
keep_default_na: bool = False,
na_values: Optional[Iterable[str]] = ("",),
quoting: int = 1,
**kwargs,
) -> Tuple[str, "Future[Union[AthenaPandasResultSet, Any]]"]:
if self._unload:
s3_staging_dir = s3_staging_dir if s3_staging_dir else self._s3_staging_dir
assert s3_staging_dir, "If the unload option is used, s3_staging_dir is required."
operation, unload_location = self._formatter.wrap_unload(
operation,
s3_staging_dir=s3_staging_dir,
format_=AthenaFileFormat.FILE_FORMAT_PARQUET,
compression=AthenaCompression.COMPRESSION_SNAPPY,
)
else:
unload_location = None
query_id = self._execute(
operation,
parameters=parameters,
work_group=work_group,
s3_staging_dir=s3_staging_dir,
cache_size=cache_size,
cache_expiration_time=cache_expiration_time,
result_reuse_enable=result_reuse_enable,
result_reuse_minutes=result_reuse_minutes,
paramstyle=paramstyle,
)
return (
query_id,
self._executor.submit(
self._collect_result_set,
query_id,
keep_default_na,
na_values,
quoting,
unload_location,
kwargs,
),
)