S3FS Integration¶
This section covers lightweight S3FS-based cursors, CSV readers, and data converters.
S3FS Cursors¶
- class pyathena.s3fs.cursor.S3FSCursor(s3_staging_dir: str | None = None, schema_name: str | None = None, catalog_name: str | None = None, work_group: str | None = None, poll_interval: float = 1, encryption_option: str | None = None, kms_key: str | None = None, kill_on_interrupt: bool = True, result_reuse_enable: bool = False, result_reuse_minutes: int = 60, on_start_query_execution: Callable[[str], None] | None = None, csv_reader: type[DefaultCSVReader] | type[AthenaCSVReader] | None = None, **kwargs)[source]¶
Cursor for reading CSV results via S3FileSystem without pandas/pyarrow.
This cursor uses Python’s standard csv module and PyAthena’s S3FileSystem to read query results from S3. It provides a lightweight alternative to pandas and arrow cursors when those dependencies are not needed.
- The cursor is especially useful for:
Environments where pandas/pyarrow installation is not desired
Simple queries where advanced data processing is not required
Memory-constrained environments
- description¶
Sequence of column descriptions for the last query.
- rowcount¶
Number of rows affected by the last query (-1 for SELECT queries).
- arraysize¶
Default number of rows to fetch with fetchmany().
Example
>>> from pyathena.s3fs.cursor import S3FSCursor >>> cursor = connection.cursor(S3FSCursor) >>> cursor.execute("SELECT * FROM my_table") >>> rows = cursor.fetchall() # Returns list of tuples >>> >>> # Iterate over results >>> for row in cursor.execute("SELECT * FROM my_table"): ... print(row)
# Use with SQLAlchemy >>> from sqlalchemy import create_engine >>> engine = create_engine(“awsathena+s3fs://…”)
- __init__(s3_staging_dir: str | None = None, schema_name: str | None = None, catalog_name: str | None = None, work_group: str | None = None, poll_interval: float = 1, encryption_option: str | None = None, kms_key: str | None = None, kill_on_interrupt: bool = True, result_reuse_enable: bool = False, result_reuse_minutes: int = 60, on_start_query_execution: Callable[[str], None] | None = None, csv_reader: type[DefaultCSVReader] | type[AthenaCSVReader] | None = None, **kwargs) None[source]¶
Initialize an S3FSCursor.
- Parameters:
s3_staging_dir – S3 location for query results.
schema_name – Default schema name.
catalog_name – Default catalog name.
work_group – Athena workgroup name.
poll_interval – Query status polling interval in seconds.
encryption_option – S3 encryption option (SSE_S3, SSE_KMS, CSE_KMS).
kms_key – KMS key ARN for encryption.
kill_on_interrupt – Cancel running query on keyboard interrupt.
result_reuse_enable – Enable Athena query result reuse.
result_reuse_minutes – Minutes to reuse cached results.
on_start_query_execution – Callback invoked when query starts.
csv_reader – CSV reader class to use for parsing results. Use AthenaCSVReader (default) to distinguish between NULL (unquoted empty) and empty string (quoted empty “”). Use DefaultCSVReader for backward compatibility where empty strings are treated as NULL.
**kwargs – Additional connection parameters.
Example
>>> cursor = connection.cursor(S3FSCursor) >>> cursor.execute("SELECT * FROM my_table") >>> >>> # Use DefaultCSVReader for backward compatibility >>> from pyathena.s3fs.reader import DefaultCSVReader >>> cursor = connection.cursor(S3FSCursor, csv_reader=DefaultCSVReader)
- static get_default_converter(unload: bool = False) DefaultS3FSTypeConverter[source]¶
Get the default type converter for S3FS cursor.
- Parameters:
unload – Unused. S3FS cursor does not support UNLOAD operations.
- Returns:
DefaultS3FSTypeConverter instance.
- execute(operation: str, parameters: dict[str, Any] | list[str] | None = None, work_group: str | None = None, s3_staging_dir: str | None = None, cache_size: int | None = 0, cache_expiration_time: int | None = 0, result_reuse_enable: bool | None = None, result_reuse_minutes: int | None = None, paramstyle: str | None = None, on_start_query_execution: Callable[[str], None] | None = None, result_set_type_hints: dict[str | int, str] | None = None, **kwargs) S3FSCursor[source]¶
Execute a SQL query and return results.
Executes the SQL query on Amazon Athena and configures the result set for CSV-based output via S3FileSystem.
- Parameters:
operation – SQL query string to execute.
parameters – Query parameters for parameterized queries.
work_group – Athena workgroup to use for this query.
s3_staging_dir – S3 location for query results.
cache_size – Number of queries to check for result caching.
cache_expiration_time – Cache expiration time in seconds.
result_reuse_enable – Enable Athena result reuse for this query.
result_reuse_minutes – Minutes to reuse cached results.
paramstyle – Parameter style (‘qmark’ or ‘pyformat’).
on_start_query_execution – Callback called when query starts.
result_set_type_hints – Optional dictionary mapping column names to Athena DDL type signatures for precise type conversion within complex types.
**kwargs – Additional execution parameters.
- Returns:
Self reference for method chaining.
Example
>>> cursor.execute("SELECT * FROM my_table WHERE id = %(id)s", {"id": 123}) >>> rows = cursor.fetchall()
- DEFAULT_RESULT_REUSE_MINUTES = 60¶
- LIST_DATABASES_MAX_RESULTS = 50¶
- LIST_QUERY_EXECUTIONS_MAX_RESULTS = 50¶
- LIST_TABLE_METADATA_MAX_RESULTS = 50¶
- cancel() None¶
Cancel the currently executing query.
- Raises:
ProgrammingError – If no query is currently executing.
- property connection: Connection[Any]¶
- executemany(operation: str, seq_of_parameters: list[dict[str, Any] | list[str] | None], **kwargs) None¶
Execute a SQL query multiple times with different parameters.
- Parameters:
operation – SQL query string to execute.
seq_of_parameters – Sequence of parameter sets, one per execution.
**kwargs – Additional keyword arguments passed to each
execute().
- fetchall() list[tuple[Any | None, ...] | dict[Any, Any | None]]¶
Fetch all remaining rows from the result set.
- Returns:
List of tuples representing all remaining rows.
- Raises:
ProgrammingError – If no result set is available.
- fetchmany(size: int | None = None) list[tuple[Any | None, ...] | dict[Any, Any | None]]¶
Fetch multiple rows from the result set.
- Parameters:
size – Maximum number of rows to fetch. Defaults to arraysize.
- Returns:
List of tuples representing the fetched rows.
- Raises:
ProgrammingError – If no result set is available.
- fetchone() tuple[Any | None, ...] | dict[Any, Any | None] | None¶
Fetch the next row of the result set.
- Returns:
A tuple representing the next row, or None if no more rows.
- Raises:
ProgrammingError – If no result set is available.
- get_table_metadata(table_name: str, catalog_name: str | None = None, schema_name: str | None = None, logging_: bool = True) AthenaTableMetadata¶
- list_table_metadata(catalog_name: str | None = None, schema_name: str | None = None, expression: str | None = None, max_results: int | None = None) list[AthenaTableMetadata]¶
- property result_set: AthenaResultSet | None¶
- property rowcount: int¶
Get the number of rows affected by the last operation.
For SELECT statements, this returns -1 as per DB API 2.0 specification. For DML operations (INSERT, UPDATE, DELETE) and CTAS, this returns the number of affected rows.
- Returns:
The number of rows, or -1 if not applicable or unknown.
- setinputsizes(sizes)¶
Does nothing by default
- setoutputsize(size, column=None)¶
Does nothing by default
- class pyathena.s3fs.async_cursor.AsyncS3FSCursor(s3_staging_dir: str | None = None, schema_name: str | None = None, catalog_name: str | None = None, work_group: str | None = None, poll_interval: float = 1, encryption_option: str | None = None, kms_key: str | None = None, kill_on_interrupt: bool = True, max_workers: int = 20, arraysize: int = 1000, result_reuse_enable: bool = False, result_reuse_minutes: int = 60, csv_reader: type[DefaultCSVReader] | type[AthenaCSVReader] | None = None, **kwargs)[source]¶
Asynchronous cursor that reads CSV results via S3FileSystem.
This cursor extends AsyncCursor to provide asynchronous query execution with results read via PyAthena’s S3FileSystem. It’s a lightweight alternative when pandas/pyarrow are not needed.
- Features:
Asynchronous query execution with concurrent futures
Lightweight CSV parsing via pluggable readers
Uses PyAthena’s S3FileSystem for S3 access
No external dependencies beyond boto3
Memory-efficient streaming for large datasets
- arraysize¶
Number of rows to fetch per batch (configurable).
Example
>>> from pyathena.s3fs.async_cursor import AsyncS3FSCursor >>> >>> cursor = connection.cursor(AsyncS3FSCursor) >>> query_id, future = cursor.execute("SELECT * FROM my_table") >>> >>> # Get result when ready >>> result_set = future.result() >>> rows = result_set.fetchall()
Note
This cursor does not require pandas or pyarrow.
- __init__(s3_staging_dir: str | None = None, schema_name: str | None = None, catalog_name: str | None = None, work_group: str | None = None, poll_interval: float = 1, encryption_option: str | None = None, kms_key: str | None = None, kill_on_interrupt: bool = True, max_workers: int = 20, arraysize: int = 1000, result_reuse_enable: bool = False, result_reuse_minutes: int = 60, csv_reader: type[DefaultCSVReader] | type[AthenaCSVReader] | None = None, **kwargs) None[source]¶
Initialize an AsyncS3FSCursor.
- Parameters:
s3_staging_dir – S3 location for query results.
schema_name – Default schema name.
catalog_name – Default catalog name.
work_group – Athena workgroup name.
poll_interval – Query status polling interval in seconds.
encryption_option – S3 encryption option (SSE_S3, SSE_KMS, CSE_KMS).
kms_key – KMS key ARN for encryption.
kill_on_interrupt – Cancel running query on keyboard interrupt.
max_workers – Maximum number of workers for concurrent execution.
arraysize – Number of rows to fetch per batch.
result_reuse_enable – Enable Athena query result reuse.
result_reuse_minutes – Minutes to reuse cached results.
csv_reader – CSV reader class to use for parsing results. Use AthenaCSVReader (default) to distinguish between NULL (unquoted empty) and empty string (quoted empty “”). Use DefaultCSVReader for backward compatibility where empty strings are treated as NULL.
**kwargs – Additional connection parameters.
Example
>>> cursor = connection.cursor(AsyncS3FSCursor) >>> query_id, future = cursor.execute("SELECT * FROM my_table")
- static get_default_converter(unload: bool = False) DefaultS3FSTypeConverter[source]¶
Get the default type converter for S3FS cursor.
- Parameters:
unload – Unused. S3FS cursor does not support UNLOAD operations.
- Returns:
DefaultS3FSTypeConverter instance.
- LIST_DATABASES_MAX_RESULTS = 50¶
- LIST_QUERY_EXECUTIONS_MAX_RESULTS = 50¶
- LIST_TABLE_METADATA_MAX_RESULTS = 50¶
- cancel(query_id: str) Future[None]¶
Cancel a running query asynchronously.
Submits a cancellation request for the specified query. The cancellation itself runs asynchronously in the background.
- Parameters:
query_id – The Athena query execution ID to cancel.
- Returns:
Future object that completes when the cancellation request finishes.
Example
>>> query_id, future = cursor.execute("SELECT * FROM huge_table") >>> # Later, cancel the query >>> cancel_future = cursor.cancel(query_id) >>> cancel_future.result() # Wait for cancellation to complete
- property connection: Connection[Any]¶
- execute(operation: str, parameters: dict[str, Any] | list[str] | None = None, work_group: str | None = None, s3_staging_dir: str | None = None, cache_size: int | None = 0, cache_expiration_time: int | None = 0, result_reuse_enable: bool | None = None, result_reuse_minutes: int | None = None, paramstyle: str | None = None, result_set_type_hints: dict[str | int, str] | None = None, **kwargs) tuple[str, Future[AthenaS3FSResultSet | Any]][source]¶
Execute a SQL query asynchronously.
Submits the query to Athena and returns immediately with a query ID and a Future that will contain the result set when complete.
- Parameters:
operation – SQL query string to execute.
parameters – Query parameters for parameterized queries.
work_group – Athena workgroup to use for this query.
s3_staging_dir – S3 location for query results.
cache_size – Number of queries to check for result caching.
cache_expiration_time – Cache expiration time in seconds.
result_reuse_enable – Enable Athena result reuse for this query.
result_reuse_minutes – Minutes to reuse cached results.
paramstyle – Parameter style (‘qmark’ or ‘pyformat’).
result_set_type_hints – Optional dictionary mapping column names to Athena DDL type signatures for precise type conversion within complex types.
**kwargs – Additional execution parameters.
- Returns:
Tuple of (query_id, Future[AthenaS3FSResultSet]).
Example
>>> query_id, future = cursor.execute("SELECT * FROM my_table") >>> result_set = future.result() >>> rows = result_set.fetchall()
- executemany(operation: str, seq_of_parameters: list[dict[str, Any] | list[str] | None], **kwargs) None¶
Execute multiple queries asynchronously (not supported).
This method is not supported for asynchronous cursors because managing multiple concurrent queries would be complex and resource-intensive.
- Parameters:
operation – SQL query string.
seq_of_parameters – Sequence of parameter sets.
**kwargs – Additional arguments.
- Raises:
NotSupportedError – Always raised as this operation is not supported.
Note
For bulk operations, consider using execute() with parameterized queries or batch processing patterns instead.
- get_table_metadata(table_name: str, catalog_name: str | None = None, schema_name: str | None = None, logging_: bool = True) AthenaTableMetadata¶
- list_table_metadata(catalog_name: str | None = None, schema_name: str | None = None, expression: str | None = None, max_results: int | None = None) list[AthenaTableMetadata]¶
- poll(query_id: str) Future[AthenaQueryExecution]¶
Poll for query completion asynchronously.
Waits for the query to complete (succeed, fail, or be cancelled) and returns the final execution status. This method blocks until completion but runs the polling in a background thread.
- Parameters:
query_id – The Athena query execution ID to poll.
- Returns:
Future object containing the final AthenaQueryExecution status.
Note
This method performs polling internally, so it will take time proportional to your query execution duration.
- query_execution(query_id: str) Future[AthenaQueryExecution]¶
Get query execution details asynchronously.
Retrieves the current execution status and metadata for a query. This is useful for monitoring query progress without blocking.
- Parameters:
query_id – The Athena query execution ID.
- Returns:
Future object containing AthenaQueryExecution with query details.
- setinputsizes(sizes)¶
Does nothing by default
- setoutputsize(size, column=None)¶
Does nothing by default
S3FS CSV Readers¶
S3FSCursor supports pluggable CSV reader implementations to control how NULL values and empty strings are handled when parsing Athena’s CSV output.
- class pyathena.s3fs.reader.AthenaCSVReader(file_obj: Any, delimiter: str = ',')[source]¶
CSV reader that distinguishes between NULL and empty string.
This is the default reader for S3FSCursor.
Athena’s CSV output format distinguishes NULL values from empty strings: - NULL: unquoted empty field (e.g., ,, or ,field) - Empty string: quoted empty field (e.g., ,””, or ,””,field)
Python’s standard csv module parses both as empty strings, losing this distinction. This reader preserves the difference by returning None for NULL values and empty string for quoted empty values.
Example
>>> from io import StringIO >>> reader = AthenaCSVReader(StringIO(',"",text')) >>> list(reader) [[None, '', 'text']] # NULL and empty string are distinguished
Note
Use DefaultCSVReader if you need backward compatibility where both NULL and empty string are treated as empty string.
- __init__(file_obj: Any, delimiter: str = ',') None[source]¶
Initialize the reader.
- Parameters:
file_obj – File-like object to read from.
delimiter – Field delimiter character.
- __iter__() AthenaCSVReader[source]¶
Iterate over rows in the CSV file.
- __next__() list[str | None][source]¶
Read and parse the next line.
- Returns:
List of field values, with None for NULL and ‘’ for empty string.
- Raises:
StopIteration – When end of file is reached or reader is closed.
- __enter__() AthenaCSVReader[source]¶
Enter context manager.
- class pyathena.s3fs.reader.DefaultCSVReader(file_obj: Any, delimiter: str = ',')[source]¶
CSV reader using Python’s standard csv module.
This reader wraps Python’s standard csv.reader and treats empty fields as empty strings. It does not distinguish between NULL and empty strings in Athena’s CSV output - both become empty strings.
Use this reader when you need backward compatibility with the behavior where empty strings are treated the same as NULL values.
Example
>>> from io import StringIO >>> reader = DefaultCSVReader(StringIO(',"",text')) >>> list(reader) [['', '', 'text']] # Both NULL and empty string become ''
Note
The default reader for S3FSCursor is AthenaCSVReader, which distinguishes between NULL and empty string values.
- __init__(file_obj: Any, delimiter: str = ',') None[source]¶
Initialize the reader.
- Parameters:
file_obj – File-like object to read from.
delimiter – Field delimiter character.
- __iter__() DefaultCSVReader[source]¶
Iterate over rows in the CSV file.
- __next__() list[str][source]¶
Read and parse the next line.
- Returns:
List of field values as strings.
- Raises:
StopIteration – When end of file is reached or reader is closed.
- __enter__() DefaultCSVReader[source]¶
Enter context manager.
S3FS Data Converters¶
- class pyathena.s3fs.converter.DefaultS3FSTypeConverter[source]¶
Type converter for S3FS Cursor results.
This converter is specifically designed for the S3FSCursor and provides type conversion for CSV-based result files read via the S3 FileSystem. It converts Athena data types to Python types using the standard converter mappings.
The converter uses the same mappings as DefaultTypeConverter, providing consistent behavior with the standard Cursor while using the S3FileSystem for file access.
Example
>>> from pyathena.s3fs.converter import DefaultS3FSTypeConverter >>> converter = DefaultS3FSTypeConverter() >>> >>> # Used automatically by S3FSCursor >>> cursor = connection.cursor(S3FSCursor) >>> # converter is applied automatically to results
Note
This converter is used by default in S3FSCursor. Most users don’t need to instantiate it directly.
- convert(type_: str, value: str | None, type_hint: str | None = None) Any | None[source]¶
Convert a string value to the appropriate Python type.
Looks up the converter function for the given Athena type and applies it to the value. If the value is None, returns None without conversion.
- Parameters:
type – The Athena data type name (e.g., “integer”, “varchar”, “date”).
value – The string value to convert, or None.
type_hint – Optional Athena DDL type signature for precise complex type conversion (e.g., “array(varchar)”).
- Returns:
The converted Python value, or None if the input value was None.
S3FS Result Set¶
- class pyathena.s3fs.result_set.AthenaS3FSResultSet(connection: Connection[Any], converter: Converter, query_execution: AthenaQueryExecution, arraysize: int, retry_config: RetryConfig, block_size: int | None = None, csv_reader: CSVReaderType | None = None, filesystem_class: type[AbstractFileSystem] | None = None, result_set_type_hints: dict[str | int, str] | None = None, **kwargs)[source]¶
Result set that reads CSV results via S3FileSystem without pandas/pyarrow.
This result set uses PyAthena’s S3FileSystem to read query results from S3. It provides a lightweight alternative to pandas and arrow cursors when those dependencies are not needed.
- Features:
Lightweight CSV parsing via pluggable readers
Uses PyAthena’s S3FileSystem for S3 access
No external dependencies beyond boto3
Memory-efficient streaming for large datasets
- DEFAULT_BLOCK_SIZE¶
Default block size for S3 operations (128MB).
Example
>>> # Used automatically by S3FSCursor >>> cursor = connection.cursor(S3FSCursor) >>> cursor.execute("SELECT * FROM my_table") >>> >>> # Fetch results >>> rows = cursor.fetchall()
Note
This class is used internally by S3FSCursor and typically not instantiated directly by users.
- DEFAULT_BLOCK_SIZE = 134217728¶
- __init__(connection: Connection[Any], converter: Converter, query_execution: AthenaQueryExecution, arraysize: int, retry_config: RetryConfig, block_size: int | None = None, csv_reader: CSVReaderType | None = None, filesystem_class: type[AbstractFileSystem] | None = None, result_set_type_hints: dict[str | int, str] | None = None, **kwargs) None[source]¶
- fetchone() tuple[Any | None, ...] | dict[Any, Any | None] | None[source]¶
Fetch the next row of the result set.
- Returns:
A tuple representing the next row, or None if no more rows.
- fetchmany(size: int | None = None) list[tuple[Any | None, ...] | dict[Any, Any | None]][source]¶
Fetch the next set of rows of the result set.
- Parameters:
size – Maximum number of rows to fetch. Defaults to arraysize.
- Returns:
A list of tuples representing the rows.
- fetchall() list[tuple[Any | None, ...] | dict[Any, Any | None]][source]¶
Fetch all remaining rows of the result set.
- Returns:
A list of tuples representing all remaining rows.
- DEFAULT_RESULT_REUSE_MINUTES = 60¶
- property connection: Connection[Any]¶