S3FS Integration

This section covers lightweight S3FS-based cursors, CSV readers, and data converters.

S3FS Cursors

class pyathena.s3fs.cursor.S3FSCursor(s3_staging_dir: str | None = None, schema_name: str | None = None, catalog_name: str | None = None, work_group: str | None = None, poll_interval: float = 1, encryption_option: str | None = None, kms_key: str | None = None, kill_on_interrupt: bool = True, result_reuse_enable: bool = False, result_reuse_minutes: int = 60, on_start_query_execution: Callable[[str], None] | None = None, csv_reader: type[DefaultCSVReader] | type[AthenaCSVReader] | None = None, **kwargs)[source]

Cursor for reading CSV results via S3FileSystem without pandas/pyarrow.

This cursor uses Python’s standard csv module and PyAthena’s S3FileSystem to read query results from S3. It provides a lightweight alternative to pandas and arrow cursors when those dependencies are not needed.

The cursor is especially useful for:
  • Environments where pandas/pyarrow installation is not desired

  • Simple queries where advanced data processing is not required

  • Memory-constrained environments

description

Sequence of column descriptions for the last query.

rowcount

Number of rows affected by the last query (-1 for SELECT queries).

arraysize

Default number of rows to fetch with fetchmany().

Example

>>> from pyathena.s3fs.cursor import S3FSCursor
>>> cursor = connection.cursor(S3FSCursor)
>>> cursor.execute("SELECT * FROM my_table")
>>> rows = cursor.fetchall()  # Returns list of tuples
>>>
>>> # Iterate over results
>>> for row in cursor.execute("SELECT * FROM my_table"):
...     print(row)

# Use with SQLAlchemy >>> from sqlalchemy import create_engine >>> engine = create_engine(“awsathena+s3fs://…”)

__init__(s3_staging_dir: str | None = None, schema_name: str | None = None, catalog_name: str | None = None, work_group: str | None = None, poll_interval: float = 1, encryption_option: str | None = None, kms_key: str | None = None, kill_on_interrupt: bool = True, result_reuse_enable: bool = False, result_reuse_minutes: int = 60, on_start_query_execution: Callable[[str], None] | None = None, csv_reader: type[DefaultCSVReader] | type[AthenaCSVReader] | None = None, **kwargs) None[source]

Initialize an S3FSCursor.

Parameters:
  • s3_staging_dir – S3 location for query results.

  • schema_name – Default schema name.

  • catalog_name – Default catalog name.

  • work_group – Athena workgroup name.

  • poll_interval – Query status polling interval in seconds.

  • encryption_option – S3 encryption option (SSE_S3, SSE_KMS, CSE_KMS).

  • kms_key – KMS key ARN for encryption.

  • kill_on_interrupt – Cancel running query on keyboard interrupt.

  • result_reuse_enable – Enable Athena query result reuse.

  • result_reuse_minutes – Minutes to reuse cached results.

  • on_start_query_execution – Callback invoked when query starts.

  • csv_reader – CSV reader class to use for parsing results. Use AthenaCSVReader (default) to distinguish between NULL (unquoted empty) and empty string (quoted empty “”). Use DefaultCSVReader for backward compatibility where empty strings are treated as NULL.

  • **kwargs – Additional connection parameters.

Example

>>> cursor = connection.cursor(S3FSCursor)
>>> cursor.execute("SELECT * FROM my_table")
>>>
>>> # Use DefaultCSVReader for backward compatibility
>>> from pyathena.s3fs.reader import DefaultCSVReader
>>> cursor = connection.cursor(S3FSCursor, csv_reader=DefaultCSVReader)
static get_default_converter(unload: bool = False) DefaultS3FSTypeConverter[source]

Get the default type converter for S3FS cursor.

Parameters:

unload – Unused. S3FS cursor does not support UNLOAD operations.

Returns:

DefaultS3FSTypeConverter instance.

execute(operation: str, parameters: dict[str, Any] | list[str] | None = None, work_group: str | None = None, s3_staging_dir: str | None = None, cache_size: int | None = 0, cache_expiration_time: int | None = 0, result_reuse_enable: bool | None = None, result_reuse_minutes: int | None = None, paramstyle: str | None = None, on_start_query_execution: Callable[[str], None] | None = None, result_set_type_hints: dict[str | int, str] | None = None, **kwargs) S3FSCursor[source]

Execute a SQL query and return results.

Executes the SQL query on Amazon Athena and configures the result set for CSV-based output via S3FileSystem.

Parameters:
  • operation – SQL query string to execute.

  • parameters – Query parameters for parameterized queries.

  • work_group – Athena workgroup to use for this query.

  • s3_staging_dir – S3 location for query results.

  • cache_size – Number of queries to check for result caching.

  • cache_expiration_time – Cache expiration time in seconds.

  • result_reuse_enable – Enable Athena result reuse for this query.

  • result_reuse_minutes – Minutes to reuse cached results.

  • paramstyle – Parameter style (‘qmark’ or ‘pyformat’).

  • on_start_query_execution – Callback called when query starts.

  • result_set_type_hints – Optional dictionary mapping column names to Athena DDL type signatures for precise type conversion within complex types.

  • **kwargs – Additional execution parameters.

Returns:

Self reference for method chaining.

Example

>>> cursor.execute("SELECT * FROM my_table WHERE id = %(id)s", {"id": 123})
>>> rows = cursor.fetchall()
DEFAULT_FETCH_SIZE: int = 1000
DEFAULT_RESULT_REUSE_MINUTES = 60
LIST_DATABASES_MAX_RESULTS = 50
LIST_QUERY_EXECUTIONS_MAX_RESULTS = 50
LIST_TABLE_METADATA_MAX_RESULTS = 50
property arraysize: int
cancel() None

Cancel the currently executing query.

Raises:

ProgrammingError – If no query is currently executing.

property catalog: str | None
close() None

Close the cursor and release associated resources.

property completion_date_time: datetime | None
property connection: Connection[Any]
property data_manifest_location: str | None
property data_scanned_in_bytes: int | None
property database: str | None
property description: list[tuple[str, str, None, None, int, int, str]] | None
property effective_engine_version: str | None
property encryption_option: str | None
property engine_execution_time_in_millis: int | None
property error_category: int | None
property error_message: str | None
property error_type: int | None
executemany(operation: str, seq_of_parameters: list[dict[str, Any] | list[str] | None], **kwargs) None

Execute a SQL query multiple times with different parameters.

Parameters:
  • operation – SQL query string to execute.

  • seq_of_parameters – Sequence of parameter sets, one per execution.

  • **kwargs – Additional keyword arguments passed to each execute().

property execution_parameters: list[str]
property expected_bucket_owner: str | None
fetchall() list[tuple[Any | None, ...] | dict[Any, Any | None]]

Fetch all remaining rows from the result set.

Returns:

List of tuples representing all remaining rows.

Raises:

ProgrammingError – If no result set is available.

fetchmany(size: int | None = None) list[tuple[Any | None, ...] | dict[Any, Any | None]]

Fetch multiple rows from the result set.

Parameters:

size – Maximum number of rows to fetch. Defaults to arraysize.

Returns:

List of tuples representing the fetched rows.

Raises:

ProgrammingError – If no result set is available.

fetchone() tuple[Any | None, ...] | dict[Any, Any | None] | None

Fetch the next row of the result set.

Returns:

A tuple representing the next row, or None if no more rows.

Raises:

ProgrammingError – If no result set is available.

get_table_metadata(table_name: str, catalog_name: str | None = None, schema_name: str | None = None, logging_: bool = True) AthenaTableMetadata
property has_result_set: bool
property kms_key: str | None
list_databases(catalog_name: str | None, max_results: int | None = None) list[AthenaDatabase]
list_table_metadata(catalog_name: str | None = None, schema_name: str | None = None, expression: str | None = None, max_results: int | None = None) list[AthenaTableMetadata]
property output_location: str | None
property query: str | None
property query_id: str | None
property query_planning_time_in_millis: int | None
property query_queue_time_in_millis: int | None
property result_reuse_enabled: bool | None
property result_reuse_minutes: int | None
property result_set: AthenaResultSet | None
property retryable: bool | None
property reused_previous_result: bool | None
property rowcount: int

Get the number of rows affected by the last operation.

For SELECT statements, this returns -1 as per DB API 2.0 specification. For DML operations (INSERT, UPDATE, DELETE) and CTAS, this returns the number of affected rows.

Returns:

The number of rows, or -1 if not applicable or unknown.

property rownumber: int | None
property s3_acl_option: str | None
property selected_engine_version: str | None
property service_processing_time_in_millis: int | None
setinputsizes(sizes)

Does nothing by default

setoutputsize(size, column=None)

Does nothing by default

property state: str | None
property state_change_reason: str | None
property statement_type: str | None
property submission_date_time: datetime | None
property substatement_type: str | None
property total_execution_time_in_millis: int | None
property work_group: str | None
class pyathena.s3fs.async_cursor.AsyncS3FSCursor(s3_staging_dir: str | None = None, schema_name: str | None = None, catalog_name: str | None = None, work_group: str | None = None, poll_interval: float = 1, encryption_option: str | None = None, kms_key: str | None = None, kill_on_interrupt: bool = True, max_workers: int = 20, arraysize: int = 1000, result_reuse_enable: bool = False, result_reuse_minutes: int = 60, csv_reader: type[DefaultCSVReader] | type[AthenaCSVReader] | None = None, **kwargs)[source]

Asynchronous cursor that reads CSV results via S3FileSystem.

This cursor extends AsyncCursor to provide asynchronous query execution with results read via PyAthena’s S3FileSystem. It’s a lightweight alternative when pandas/pyarrow are not needed.

Features:
  • Asynchronous query execution with concurrent futures

  • Lightweight CSV parsing via pluggable readers

  • Uses PyAthena’s S3FileSystem for S3 access

  • No external dependencies beyond boto3

  • Memory-efficient streaming for large datasets

arraysize

Number of rows to fetch per batch (configurable).

Example

>>> from pyathena.s3fs.async_cursor import AsyncS3FSCursor
>>>
>>> cursor = connection.cursor(AsyncS3FSCursor)
>>> query_id, future = cursor.execute("SELECT * FROM my_table")
>>>
>>> # Get result when ready
>>> result_set = future.result()
>>> rows = result_set.fetchall()

Note

This cursor does not require pandas or pyarrow.

__init__(s3_staging_dir: str | None = None, schema_name: str | None = None, catalog_name: str | None = None, work_group: str | None = None, poll_interval: float = 1, encryption_option: str | None = None, kms_key: str | None = None, kill_on_interrupt: bool = True, max_workers: int = 20, arraysize: int = 1000, result_reuse_enable: bool = False, result_reuse_minutes: int = 60, csv_reader: type[DefaultCSVReader] | type[AthenaCSVReader] | None = None, **kwargs) None[source]

Initialize an AsyncS3FSCursor.

Parameters:
  • s3_staging_dir – S3 location for query results.

  • schema_name – Default schema name.

  • catalog_name – Default catalog name.

  • work_group – Athena workgroup name.

  • poll_interval – Query status polling interval in seconds.

  • encryption_option – S3 encryption option (SSE_S3, SSE_KMS, CSE_KMS).

  • kms_key – KMS key ARN for encryption.

  • kill_on_interrupt – Cancel running query on keyboard interrupt.

  • max_workers – Maximum number of workers for concurrent execution.

  • arraysize – Number of rows to fetch per batch.

  • result_reuse_enable – Enable Athena query result reuse.

  • result_reuse_minutes – Minutes to reuse cached results.

  • csv_reader – CSV reader class to use for parsing results. Use AthenaCSVReader (default) to distinguish between NULL (unquoted empty) and empty string (quoted empty “”). Use DefaultCSVReader for backward compatibility where empty strings are treated as NULL.

  • **kwargs – Additional connection parameters.

Example

>>> cursor = connection.cursor(AsyncS3FSCursor)
>>> query_id, future = cursor.execute("SELECT * FROM my_table")
static get_default_converter(unload: bool = False) DefaultS3FSTypeConverter[source]

Get the default type converter for S3FS cursor.

Parameters:

unload – Unused. S3FS cursor does not support UNLOAD operations.

Returns:

DefaultS3FSTypeConverter instance.

property arraysize: int

Get the number of rows to fetch at a time.

LIST_DATABASES_MAX_RESULTS = 50
LIST_QUERY_EXECUTIONS_MAX_RESULTS = 50
LIST_TABLE_METADATA_MAX_RESULTS = 50
cancel(query_id: str) Future[None]

Cancel a running query asynchronously.

Submits a cancellation request for the specified query. The cancellation itself runs asynchronously in the background.

Parameters:

query_id – The Athena query execution ID to cancel.

Returns:

Future object that completes when the cancellation request finishes.

Example

>>> query_id, future = cursor.execute("SELECT * FROM huge_table")
>>> # Later, cancel the query
>>> cancel_future = cursor.cancel(query_id)
>>> cancel_future.result()  # Wait for cancellation to complete
close(wait: bool = False) None
property connection: Connection[Any]
description(query_id: str) Future[list[tuple[str, str, None, None, int, int, str]] | None]
execute(operation: str, parameters: dict[str, Any] | list[str] | None = None, work_group: str | None = None, s3_staging_dir: str | None = None, cache_size: int | None = 0, cache_expiration_time: int | None = 0, result_reuse_enable: bool | None = None, result_reuse_minutes: int | None = None, paramstyle: str | None = None, result_set_type_hints: dict[str | int, str] | None = None, **kwargs) tuple[str, Future[AthenaS3FSResultSet | Any]][source]

Execute a SQL query asynchronously.

Submits the query to Athena and returns immediately with a query ID and a Future that will contain the result set when complete.

Parameters:
  • operation – SQL query string to execute.

  • parameters – Query parameters for parameterized queries.

  • work_group – Athena workgroup to use for this query.

  • s3_staging_dir – S3 location for query results.

  • cache_size – Number of queries to check for result caching.

  • cache_expiration_time – Cache expiration time in seconds.

  • result_reuse_enable – Enable Athena result reuse for this query.

  • result_reuse_minutes – Minutes to reuse cached results.

  • paramstyle – Parameter style (‘qmark’ or ‘pyformat’).

  • result_set_type_hints – Optional dictionary mapping column names to Athena DDL type signatures for precise type conversion within complex types.

  • **kwargs – Additional execution parameters.

Returns:

Tuple of (query_id, Future[AthenaS3FSResultSet]).

Example

>>> query_id, future = cursor.execute("SELECT * FROM my_table")
>>> result_set = future.result()
>>> rows = result_set.fetchall()
executemany(operation: str, seq_of_parameters: list[dict[str, Any] | list[str] | None], **kwargs) None

Execute multiple queries asynchronously (not supported).

This method is not supported for asynchronous cursors because managing multiple concurrent queries would be complex and resource-intensive.

Parameters:
  • operation – SQL query string.

  • seq_of_parameters – Sequence of parameter sets.

  • **kwargs – Additional arguments.

Raises:

NotSupportedError – Always raised as this operation is not supported.

Note

For bulk operations, consider using execute() with parameterized queries or batch processing patterns instead.

get_table_metadata(table_name: str, catalog_name: str | None = None, schema_name: str | None = None, logging_: bool = True) AthenaTableMetadata
list_databases(catalog_name: str | None, max_results: int | None = None) list[AthenaDatabase]
list_table_metadata(catalog_name: str | None = None, schema_name: str | None = None, expression: str | None = None, max_results: int | None = None) list[AthenaTableMetadata]
poll(query_id: str) Future[AthenaQueryExecution]

Poll for query completion asynchronously.

Waits for the query to complete (succeed, fail, or be cancelled) and returns the final execution status. This method blocks until completion but runs the polling in a background thread.

Parameters:

query_id – The Athena query execution ID to poll.

Returns:

Future object containing the final AthenaQueryExecution status.

Note

This method performs polling internally, so it will take time proportional to your query execution duration.

query_execution(query_id: str) Future[AthenaQueryExecution]

Get query execution details asynchronously.

Retrieves the current execution status and metadata for a query. This is useful for monitoring query progress without blocking.

Parameters:

query_id – The Athena query execution ID.

Returns:

Future object containing AthenaQueryExecution with query details.

setinputsizes(sizes)

Does nothing by default

setoutputsize(size, column=None)

Does nothing by default

S3FS CSV Readers

S3FSCursor supports pluggable CSV reader implementations to control how NULL values and empty strings are handled when parsing Athena’s CSV output.

class pyathena.s3fs.reader.AthenaCSVReader(file_obj: Any, delimiter: str = ',')[source]

CSV reader that distinguishes between NULL and empty string.

This is the default reader for S3FSCursor.

Athena’s CSV output format distinguishes NULL values from empty strings: - NULL: unquoted empty field (e.g., ,, or ,field) - Empty string: quoted empty field (e.g., ,””, or ,””,field)

Python’s standard csv module parses both as empty strings, losing this distinction. This reader preserves the difference by returning None for NULL values and empty string for quoted empty values.

Example

>>> from io import StringIO
>>> reader = AthenaCSVReader(StringIO(',"",text'))
>>> list(reader)
[[None, '', 'text']]  # NULL and empty string are distinguished

Note

Use DefaultCSVReader if you need backward compatibility where both NULL and empty string are treated as empty string.

__init__(file_obj: Any, delimiter: str = ',') None[source]

Initialize the reader.

Parameters:
  • file_obj – File-like object to read from.

  • delimiter – Field delimiter character.

__iter__() AthenaCSVReader[source]

Iterate over rows in the CSV file.

__next__() list[str | None][source]

Read and parse the next line.

Returns:

List of field values, with None for NULL and ‘’ for empty string.

Raises:

StopIteration – When end of file is reached or reader is closed.

close() None[source]

Close the underlying file object.

__enter__() AthenaCSVReader[source]

Enter context manager.

__exit__(exc_type: Any, exc_val: Any, exc_tb: Any) None[source]

Exit context manager and close resources.

class pyathena.s3fs.reader.DefaultCSVReader(file_obj: Any, delimiter: str = ',')[source]

CSV reader using Python’s standard csv module.

This reader wraps Python’s standard csv.reader and treats empty fields as empty strings. It does not distinguish between NULL and empty strings in Athena’s CSV output - both become empty strings.

Use this reader when you need backward compatibility with the behavior where empty strings are treated the same as NULL values.

Example

>>> from io import StringIO
>>> reader = DefaultCSVReader(StringIO(',"",text'))
>>> list(reader)
[['', '', 'text']]  # Both NULL and empty string become ''

Note

The default reader for S3FSCursor is AthenaCSVReader, which distinguishes between NULL and empty string values.

__init__(file_obj: Any, delimiter: str = ',') None[source]

Initialize the reader.

Parameters:
  • file_obj – File-like object to read from.

  • delimiter – Field delimiter character.

__iter__() DefaultCSVReader[source]

Iterate over rows in the CSV file.

__next__() list[str][source]

Read and parse the next line.

Returns:

List of field values as strings.

Raises:

StopIteration – When end of file is reached or reader is closed.

close() None[source]

Close the underlying file object.

__enter__() DefaultCSVReader[source]

Enter context manager.

__exit__(exc_type: Any, exc_val: Any, exc_tb: Any) None[source]

Exit context manager and close resources.

S3FS Data Converters

class pyathena.s3fs.converter.DefaultS3FSTypeConverter[source]

Type converter for S3FS Cursor results.

This converter is specifically designed for the S3FSCursor and provides type conversion for CSV-based result files read via the S3 FileSystem. It converts Athena data types to Python types using the standard converter mappings.

The converter uses the same mappings as DefaultTypeConverter, providing consistent behavior with the standard Cursor while using the S3FileSystem for file access.

Example

>>> from pyathena.s3fs.converter import DefaultS3FSTypeConverter
>>> converter = DefaultS3FSTypeConverter()
>>>
>>> # Used automatically by S3FSCursor
>>> cursor = connection.cursor(S3FSCursor)
>>> # converter is applied automatically to results

Note

This converter is used by default in S3FSCursor. Most users don’t need to instantiate it directly.

__init__() None[source]
convert(type_: str, value: str | None, type_hint: str | None = None) Any | None[source]

Convert a string value to the appropriate Python type.

Looks up the converter function for the given Athena type and applies it to the value. If the value is None, returns None without conversion.

Parameters:
  • type – The Athena data type name (e.g., “integer”, “varchar”, “date”).

  • value – The string value to convert, or None.

  • type_hint – Optional Athena DDL type signature for precise complex type conversion (e.g., “array(varchar)”).

Returns:

The converted Python value, or None if the input value was None.

S3FS Result Set

class pyathena.s3fs.result_set.AthenaS3FSResultSet(connection: Connection[Any], converter: Converter, query_execution: AthenaQueryExecution, arraysize: int, retry_config: RetryConfig, block_size: int | None = None, csv_reader: CSVReaderType | None = None, filesystem_class: type[AbstractFileSystem] | None = None, result_set_type_hints: dict[str | int, str] | None = None, **kwargs)[source]

Result set that reads CSV results via S3FileSystem without pandas/pyarrow.

This result set uses PyAthena’s S3FileSystem to read query results from S3. It provides a lightweight alternative to pandas and arrow cursors when those dependencies are not needed.

Features:
  • Lightweight CSV parsing via pluggable readers

  • Uses PyAthena’s S3FileSystem for S3 access

  • No external dependencies beyond boto3

  • Memory-efficient streaming for large datasets

DEFAULT_BLOCK_SIZE

Default block size for S3 operations (128MB).

Example

>>> # Used automatically by S3FSCursor
>>> cursor = connection.cursor(S3FSCursor)
>>> cursor.execute("SELECT * FROM my_table")
>>>
>>> # Fetch results
>>> rows = cursor.fetchall()

Note

This class is used internally by S3FSCursor and typically not instantiated directly by users.

DEFAULT_FETCH_SIZE: int = 1000
DEFAULT_BLOCK_SIZE = 134217728
__init__(connection: Connection[Any], converter: Converter, query_execution: AthenaQueryExecution, arraysize: int, retry_config: RetryConfig, block_size: int | None = None, csv_reader: CSVReaderType | None = None, filesystem_class: type[AbstractFileSystem] | None = None, result_set_type_hints: dict[str | int, str] | None = None, **kwargs) None[source]
fetchone() tuple[Any | None, ...] | dict[Any, Any | None] | None[source]

Fetch the next row of the result set.

Returns:

A tuple representing the next row, or None if no more rows.

fetchmany(size: int | None = None) list[tuple[Any | None, ...] | dict[Any, Any | None]][source]

Fetch the next set of rows of the result set.

Parameters:

size – Maximum number of rows to fetch. Defaults to arraysize.

Returns:

A list of tuples representing the rows.

fetchall() list[tuple[Any | None, ...] | dict[Any, Any | None]][source]

Fetch all remaining rows of the result set.

Returns:

A list of tuples representing all remaining rows.

close() None[source]

Close the result set and release resources.

DEFAULT_RESULT_REUSE_MINUTES = 60
property arraysize: int
property catalog: str | None
property completion_date_time: datetime | None
property connection: Connection[Any]
property data_manifest_location: str | None
property data_scanned_in_bytes: int | None
property database: str | None
property description: list[tuple[str, str, None, None, int, int, str]] | None
property effective_engine_version: str | None
property encryption_option: str | None
property engine_execution_time_in_millis: int | None
property error_category: int | None
property error_message: str | None
property error_type: int | None
property execution_parameters: list[str]
property expected_bucket_owner: str | None
property is_closed: bool
property is_unload: bool

Check if the query is an UNLOAD statement.

Returns:

True if the query is an UNLOAD statement, False otherwise.

property kms_key: str | None
property output_location: str | None
property query: str | None
property query_id: str | None
property query_planning_time_in_millis: int | None
property query_queue_time_in_millis: int | None
property result_reuse_enabled: bool | None
property result_reuse_minutes: int | None
property retryable: bool | None
property reused_previous_result: bool | None
property rowcount: int
property rownumber: int | None
property s3_acl_option: str | None
property selected_engine_version: str | None
property service_processing_time_in_millis: int | None
property state: str | None
property state_change_reason: str | None
property statement_type: str | None
property submission_date_time: datetime | None
property substatement_type: str | None
property total_execution_time_in_millis: int | None
property work_group: str | None