Source code for pyathena.s3fs.async_cursor

from __future__ import annotations

import logging
from concurrent.futures import Future
from multiprocessing import cpu_count
from typing import Any, cast

from pyathena.async_cursor import AsyncCursor
from pyathena.common import CursorIterator
from pyathena.error import ProgrammingError
from pyathena.model import AthenaQueryExecution
from pyathena.s3fs.converter import DefaultS3FSTypeConverter
from pyathena.s3fs.result_set import AthenaS3FSResultSet, CSVReaderType

_logger = logging.getLogger(__name__)


[docs] class AsyncS3FSCursor(AsyncCursor): """Asynchronous cursor that reads CSV results via S3FileSystem. This cursor extends AsyncCursor to provide asynchronous query execution with results read via PyAthena's S3FileSystem. It's a lightweight alternative when pandas/pyarrow are not needed. Features: - Asynchronous query execution with concurrent futures - Lightweight CSV parsing via pluggable readers - Uses PyAthena's S3FileSystem for S3 access - No external dependencies beyond boto3 - Memory-efficient streaming for large datasets Attributes: arraysize: Number of rows to fetch per batch (configurable). Example: >>> from pyathena.s3fs.async_cursor import AsyncS3FSCursor >>> >>> cursor = connection.cursor(AsyncS3FSCursor) >>> query_id, future = cursor.execute("SELECT * FROM my_table") >>> >>> # Get result when ready >>> result_set = future.result() >>> rows = result_set.fetchall() Note: This cursor does not require pandas or pyarrow. """
[docs] def __init__( self, s3_staging_dir: str | None = None, schema_name: str | None = None, catalog_name: str | None = None, work_group: str | None = None, poll_interval: float = 1, encryption_option: str | None = None, kms_key: str | None = None, kill_on_interrupt: bool = True, max_workers: int = (cpu_count() or 1) * 5, arraysize: int = CursorIterator.DEFAULT_FETCH_SIZE, result_reuse_enable: bool = False, result_reuse_minutes: int = CursorIterator.DEFAULT_RESULT_REUSE_MINUTES, csv_reader: CSVReaderType | None = None, **kwargs, ) -> None: """Initialize an AsyncS3FSCursor. Args: s3_staging_dir: S3 location for query results. schema_name: Default schema name. catalog_name: Default catalog name. work_group: Athena workgroup name. poll_interval: Query status polling interval in seconds. encryption_option: S3 encryption option (SSE_S3, SSE_KMS, CSE_KMS). kms_key: KMS key ARN for encryption. kill_on_interrupt: Cancel running query on keyboard interrupt. max_workers: Maximum number of workers for concurrent execution. arraysize: Number of rows to fetch per batch. result_reuse_enable: Enable Athena query result reuse. result_reuse_minutes: Minutes to reuse cached results. csv_reader: CSV reader class to use for parsing results. Use AthenaCSVReader (default) to distinguish between NULL (unquoted empty) and empty string (quoted empty ""). Use DefaultCSVReader for backward compatibility where empty strings are treated as NULL. **kwargs: Additional connection parameters. Example: >>> cursor = connection.cursor(AsyncS3FSCursor) >>> query_id, future = cursor.execute("SELECT * FROM my_table") """ super().__init__( s3_staging_dir=s3_staging_dir, schema_name=schema_name, catalog_name=catalog_name, work_group=work_group, poll_interval=poll_interval, encryption_option=encryption_option, kms_key=kms_key, kill_on_interrupt=kill_on_interrupt, max_workers=max_workers, arraysize=arraysize, result_reuse_enable=result_reuse_enable, result_reuse_minutes=result_reuse_minutes, **kwargs, ) self._csv_reader = csv_reader
[docs] @staticmethod def get_default_converter( unload: bool = False, ) -> DefaultS3FSTypeConverter: """Get the default type converter for S3FS cursor. Args: unload: Unused. S3FS cursor does not support UNLOAD operations. Returns: DefaultS3FSTypeConverter instance. """ return DefaultS3FSTypeConverter()
@property def arraysize(self) -> int: """Get the number of rows to fetch at a time.""" return self._arraysize @arraysize.setter def arraysize(self, value: int) -> None: """Set the number of rows to fetch at a time. Args: value: Number of rows (must be positive). Raises: ProgrammingError: If value is not positive. """ if value <= 0: raise ProgrammingError("arraysize must be a positive integer value.") self._arraysize = value def _collect_result_set( self, query_id: str, result_set_type_hints: dict[str | int, str] | None = None, kwargs: dict[str, Any] | None = None, ) -> AthenaS3FSResultSet: """Collect result set after query execution. Args: query_id: The Athena query execution ID. result_set_type_hints: Optional dictionary mapping column names to Athena DDL type signatures for precise type conversion within complex types. kwargs: Additional keyword arguments for result set. Returns: AthenaS3FSResultSet containing the query results. """ if kwargs is None: kwargs = {} query_execution = cast(AthenaQueryExecution, self._poll(query_id)) return AthenaS3FSResultSet( connection=self._connection, converter=self._converter, query_execution=query_execution, arraysize=self._arraysize, retry_config=self._retry_config, csv_reader=self._csv_reader, result_set_type_hints=result_set_type_hints, **kwargs, )
[docs] def execute( self, operation: str, parameters: dict[str, Any] | list[str] | None = None, work_group: str | None = None, s3_staging_dir: str | None = None, cache_size: int | None = 0, cache_expiration_time: int | None = 0, result_reuse_enable: bool | None = None, result_reuse_minutes: int | None = None, paramstyle: str | None = None, result_set_type_hints: dict[str | int, str] | None = None, **kwargs, ) -> tuple[str, Future[AthenaS3FSResultSet | Any]]: """Execute a SQL query asynchronously. Submits the query to Athena and returns immediately with a query ID and a Future that will contain the result set when complete. Args: operation: SQL query string to execute. parameters: Query parameters for parameterized queries. work_group: Athena workgroup to use for this query. s3_staging_dir: S3 location for query results. cache_size: Number of queries to check for result caching. cache_expiration_time: Cache expiration time in seconds. result_reuse_enable: Enable Athena result reuse for this query. result_reuse_minutes: Minutes to reuse cached results. paramstyle: Parameter style ('qmark' or 'pyformat'). result_set_type_hints: Optional dictionary mapping column names to Athena DDL type signatures for precise type conversion within complex types. **kwargs: Additional execution parameters. Returns: Tuple of (query_id, Future[AthenaS3FSResultSet]). Example: >>> query_id, future = cursor.execute("SELECT * FROM my_table") >>> result_set = future.result() >>> rows = result_set.fetchall() """ query_id = self._execute( operation, parameters=parameters, work_group=work_group, s3_staging_dir=s3_staging_dir, cache_size=cache_size, cache_expiration_time=cache_expiration_time, result_reuse_enable=result_reuse_enable, result_reuse_minutes=result_reuse_minutes, paramstyle=paramstyle, ) return ( query_id, self._executor.submit( self._collect_result_set, query_id, result_set_type_hints, kwargs, ), )