Source code for pyathena.s3fs.result_set

from __future__ import annotations

import logging
from io import TextIOWrapper
from typing import TYPE_CHECKING, Any

from fsspec import AbstractFileSystem

from pyathena.converter import Converter
from pyathena.error import OperationalError, ProgrammingError
from pyathena.filesystem.s3 import S3FileSystem
from pyathena.model import AthenaQueryExecution
from pyathena.result_set import AthenaResultSet
from pyathena.s3fs.reader import AthenaCSVReader, DefaultCSVReader
from pyathena.util import RetryConfig, parse_output_location

if TYPE_CHECKING:
    from pyathena.connection import Connection

CSVReaderType = type[DefaultCSVReader] | type[AthenaCSVReader]

_logger = logging.getLogger(__name__)


[docs] class AthenaS3FSResultSet(AthenaResultSet): """Result set that reads CSV results via S3FileSystem without pandas/pyarrow. This result set uses PyAthena's S3FileSystem to read query results from S3. It provides a lightweight alternative to pandas and arrow cursors when those dependencies are not needed. Features: - Lightweight CSV parsing via pluggable readers - Uses PyAthena's S3FileSystem for S3 access - No external dependencies beyond boto3 - Memory-efficient streaming for large datasets Attributes: DEFAULT_BLOCK_SIZE: Default block size for S3 operations (128MB). Example: >>> # Used automatically by S3FSCursor >>> cursor = connection.cursor(S3FSCursor) >>> cursor.execute("SELECT * FROM my_table") >>> >>> # Fetch results >>> rows = cursor.fetchall() Note: This class is used internally by S3FSCursor and typically not instantiated directly by users. """ DEFAULT_FETCH_SIZE: int = 1000 DEFAULT_BLOCK_SIZE = 1024 * 1024 * 128
[docs] def __init__( self, connection: Connection[Any], converter: Converter, query_execution: AthenaQueryExecution, arraysize: int, retry_config: RetryConfig, block_size: int | None = None, csv_reader: CSVReaderType | None = None, filesystem_class: type[AbstractFileSystem] | None = None, result_set_type_hints: dict[str | int, str] | None = None, **kwargs, ) -> None: super().__init__( connection=connection, converter=converter, query_execution=query_execution, arraysize=1, # Fetch one row to retrieve metadata retry_config=retry_config, result_set_type_hints=result_set_type_hints, ) # Save pre-fetched rows (from Athena API) in case CSV reading is not available pre_fetched_rows = list(self._rows) self._rows.clear() self._arraysize = arraysize self._block_size = block_size if block_size else self.DEFAULT_BLOCK_SIZE self._csv_reader_class: CSVReaderType = csv_reader or AthenaCSVReader self._filesystem_class: type[AbstractFileSystem] = filesystem_class or S3FileSystem self._fs = self._create_s3_file_system() self._csv_reader: Any | None = None if self.state == AthenaQueryExecution.STATE_SUCCEEDED and self.output_location: self._init_csv_reader() elif self.state == AthenaQueryExecution.STATE_SUCCEEDED: # Managed query result storage: no output_location, use API rows = self._fetch_all_rows() self._rows.extend(rows) # If CSV reader was not initialized (e.g., CTAS, DDL), # fall back to pre-fetched data from Athena API if not self._csv_reader and not self._rows and pre_fetched_rows: self._rows.extend(pre_fetched_rows)
def _create_s3_file_system(self) -> AbstractFileSystem: """Create S3FileSystem using connection settings.""" return self._filesystem_class( connection=self.connection, default_block_size=self._block_size, ) def _init_csv_reader(self) -> None: """Initialize CSV reader for the output file.""" if not self.output_location: raise ProgrammingError("OutputLocation is none or empty.") if not self.output_location.endswith((".csv", ".txt")): return # Skip for UPDATE/DELETE/MERGE/VACUUM operations if self.substatement_type and self.substatement_type.upper() in ( "UPDATE", "DELETE", "MERGE", "VACUUM_TABLE", ): return length = self._get_content_length() if not length: return bucket, key = parse_output_location(self.output_location) path = f"{bucket}/{key}" try: csv_file = self._fs._open(path, mode="rb") text_wrapper = TextIOWrapper(csv_file, encoding="utf-8") if self.output_location.endswith(".txt"): # Tab-separated format (no header row) self._csv_reader = self._csv_reader_class(text_wrapper, delimiter="\t") else: # Standard CSV format (has header row, skip it) self._csv_reader = self._csv_reader_class(text_wrapper, delimiter=",") next(self._csv_reader) except Exception as e: _logger.exception("Failed to open %s.", path) raise OperationalError(*e.args) from e def _fetch(self) -> None: """Fetch next batch of rows from CSV.""" if not self._csv_reader: return col_types = self._column_types if not col_types: description = self.description if self.description else [] col_types = tuple(d[1] for d in description) col_hints = self._column_type_hints rows_fetched = 0 while rows_fetched < self._arraysize: try: row = next(self._csv_reader) except StopIteration: break # Convert row values using converters # AthenaCSVReader returns None for NULL values directly, # DefaultCSVReader returns empty string which needs conversion if self._csv_reader_class is DefaultCSVReader: if col_hints: converted_row = tuple( self._converter.convert( col_type, value if value != "" else None, type_hint=hint ) if hint else self._converter.convert(col_type, value if value != "" else None) for col_type, value, hint in zip(col_types, row, col_hints, strict=False) ) else: converted_row = tuple( self._converter.convert(col_type, value if value != "" else None) for col_type, value in zip(col_types, row, strict=False) ) else: if col_hints: converted_row = tuple( self._converter.convert(col_type, value, type_hint=hint) if hint else self._converter.convert(col_type, value) for col_type, value, hint in zip(col_types, row, col_hints, strict=False) ) else: converted_row = tuple( self._converter.convert(col_type, value) for col_type, value in zip(col_types, row, strict=False) ) self._rows.append(converted_row) rows_fetched += 1
[docs] def fetchone( self, ) -> tuple[Any | None, ...] | dict[Any, Any | None] | None: """Fetch the next row of the result set. Returns: A tuple representing the next row, or None if no more rows. """ if not self._rows: self._fetch() if not self._rows: return None if self._rownumber is None: self._rownumber = 0 self._rownumber += 1 return self._rows.popleft()
[docs] def fetchmany( self, size: int | None = None ) -> list[tuple[Any | None, ...] | dict[Any, Any | None]]: """Fetch the next set of rows of the result set. Args: size: Maximum number of rows to fetch. Defaults to arraysize. Returns: A list of tuples representing the rows. """ if not size or size <= 0: size = self._arraysize rows = [] for _ in range(size): row = self.fetchone() if row: rows.append(row) else: break return rows
[docs] def fetchall( self, ) -> list[tuple[Any | None, ...] | dict[Any, Any | None]]: """Fetch all remaining rows of the result set. Returns: A list of tuples representing all remaining rows. """ rows = [] while True: row = self.fetchone() if row: rows.append(row) else: break return rows
[docs] def close(self) -> None: """Close the result set and release resources.""" super().close() if self._csv_reader: self._csv_reader.close() self._csv_reader = None