Source code for pyathena.arrow.converter

# -*- coding: utf-8 -*-
from __future__ import annotations

import logging
from builtins import isinstance
from copy import deepcopy
from datetime import date, datetime
from typing import Any, Callable, Dict, Optional, Type, Union

from pyathena.converter import (
    Converter,
    _to_binary,
    _to_decimal,
    _to_default,
    _to_json,
    _to_time,
)

_logger = logging.getLogger(__name__)  # type: ignore


def _to_date(value: Optional[Union[str, datetime]]) -> Optional[date]:
    if value is None:
        return None
    if isinstance(value, datetime):
        return value.date()
    return datetime.strptime(value, "%Y-%m-%d").date()


_DEFAULT_ARROW_CONVERTERS: Dict[str, Callable[[Optional[str]], Optional[Any]]] = {
    "date": _to_date,
    "time": _to_time,
    "decimal": _to_decimal,
    "varbinary": _to_binary,
    "json": _to_json,
}


[docs] class DefaultArrowTypeConverter(Converter): """Optimized type converter for Apache Arrow Table results. This converter is specifically designed for the ArrowCursor and provides optimized type conversion for Apache Arrow's columnar data format. It converts Athena data types to Python types that are efficiently handled by Apache Arrow. The converter focuses on: - Converting date/time types to appropriate Python objects - Handling decimal and binary types for Arrow compatibility - Preserving JSON and complex types - Maintaining high performance for columnar operations Example: >>> from pyathena.arrow.converter import DefaultArrowTypeConverter >>> converter = DefaultArrowTypeConverter() >>> >>> # Used automatically by ArrowCursor >>> cursor = connection.cursor(ArrowCursor) >>> # converter is applied automatically to results Note: This converter is used by default in ArrowCursor. Most users don't need to instantiate it directly. """
[docs] def __init__(self) -> None: super().__init__( mappings=deepcopy(_DEFAULT_ARROW_CONVERTERS), default=_to_default, types=self._dtypes, )
@property def _dtypes(self) -> Dict[str, Type[Any]]: if not hasattr(self, "__dtypes"): import pyarrow as pa self.__dtypes = { "boolean": pa.bool_(), "tinyint": pa.int8(), "smallint": pa.int16(), "integer": pa.int32(), "bigint": pa.int64(), "float": pa.float32(), "real": pa.float64(), "double": pa.float64(), "char": pa.string(), "varchar": pa.string(), "string": pa.string(), "timestamp": pa.timestamp("ms"), "date": pa.timestamp("ms"), "time": pa.string(), "varbinary": pa.string(), "array": pa.string(), "map": pa.string(), "row": pa.string(), "decimal": pa.string(), "json": pa.string(), } return self.__dtypes
[docs] def convert(self, type_: str, value: Optional[str]) -> Optional[Any]: converter = self.get(type_) return converter(value)
[docs] class DefaultArrowUnloadTypeConverter(Converter): """Type converter for Arrow UNLOAD operations. This converter is designed for use with UNLOAD queries that write results directly to Parquet files in S3. Since UNLOAD operations bypass the normal conversion process and write data in native Parquet format, this converter has minimal functionality. Note: Used automatically when ArrowCursor is configured with unload=True. UNLOAD results are read directly as Arrow tables from Parquet files. """
[docs] def __init__(self) -> None: super().__init__( mappings={}, default=_to_default, )
[docs] def convert(self, type_: str, value: Optional[str]) -> Optional[Any]: pass