Native Asyncio Cursors¶
PyAthena provides native asyncio cursor implementations under pyathena.aio.
These cursors use asyncio.sleep for polling and asyncio.to_thread for boto3 calls,
keeping the event loop free without relying on thread pools for concurrency.
Why native asyncio?¶
PyAthena has two families of async cursors:
AsyncCursor |
AioCursor |
|
|---|---|---|
Concurrency model |
|
Native |
Event loop |
Blocks a thread per query |
Non-blocking |
Connection |
|
|
execute() returns |
|
Awaitable cursor (self) |
Fetch methods |
Sync (via |
|
Iteration |
|
|
Context manager |
|
|
Best for |
Adding concurrency to sync code |
Async frameworks (FastAPI, aiohttp, etc.) |
Choose AioCursor when your application already uses asyncio (e.g., web frameworks,
async pipelines). Choose AsyncCursor when you want simple parallel query execution
from synchronous code.
Connection¶
Use the aio_connect() function to create an async connection.
It returns an AioConnection that produces AioCursor instances by default.
from pyathena import aio_connect
conn = await aio_connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2")
The connection supports the async context manager protocol:
from pyathena import aio_connect
async with await aio_connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2") as conn:
cursor = conn.cursor()
await cursor.execute("SELECT 1")
print(await cursor.fetchone())
AioCursor¶
AioCursor is a native asyncio cursor that uses await for query execution and result fetching.
It follows the DB API 2.0 interface adapted for async usage.
from pyathena import aio_connect
from pyathena.aio.cursor import AioCursor
async with await aio_connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2") as conn:
cursor = conn.cursor()
await cursor.execute("SELECT * FROM many_rows")
print(await cursor.fetchone())
print(await cursor.fetchmany(10))
print(await cursor.fetchall())
The cursor supports the async with context manager:
from pyathena import aio_connect
async with await aio_connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2") as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT * FROM many_rows")
rows = await cursor.fetchall()
You can iterate over results with async for:
from pyathena import aio_connect
async with await aio_connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2") as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT * FROM many_rows")
async for row in cursor:
print(row)
Execution information of the query can also be retrieved:
from pyathena import aio_connect
async with await aio_connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2") as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT * FROM many_rows")
print(cursor.state)
print(cursor.state_change_reason)
print(cursor.completion_date_time)
print(cursor.submission_date_time)
print(cursor.data_scanned_in_bytes)
print(cursor.engine_execution_time_in_millis)
print(cursor.query_queue_time_in_millis)
print(cursor.total_execution_time_in_millis)
print(cursor.query_planning_time_in_millis)
print(cursor.service_processing_time_in_millis)
print(cursor.output_location)
To cancel a running query:
from pyathena import aio_connect
async with await aio_connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2") as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT * FROM many_rows")
await cursor.cancel()
AioDictCursor¶
AioDictCursor is an AioCursor that returns rows as dictionaries with column names as keys.
from pyathena import aio_connect
from pyathena.aio.cursor import AioDictCursor
async with await aio_connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2") as conn:
cursor = conn.cursor(AioDictCursor)
await cursor.execute("SELECT * FROM many_rows LIMIT 10")
async for row in cursor:
print(row["a"])
If you want to change the dictionary type (e.g., use OrderedDict):
from collections import OrderedDict
from pyathena import aio_connect
from pyathena.aio.cursor import AioDictCursor
async with await aio_connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2") as conn:
cursor = conn.cursor(AioDictCursor, dict_type=OrderedDict)
await cursor.execute("SELECT * FROM many_rows LIMIT 10")
async for row in cursor:
print(row)
Specialized Aio Cursors¶
Native asyncio versions are available for all cursor types:
Cursor |
Module |
Result format |
|---|---|---|
|
pandas DataFrame |
|
|
pyarrow Table |
|
|
polars DataFrame |
|
|
Row tuples (lightweight) |
|
|
PySpark execution |
Fetch behavior¶
All aio cursors use await for fetch operations. The S3 download (CSV or Parquet)
happens inside execute(), wrapped in asyncio.to_thread(). Fetch methods are also
wrapped in asyncio.to_thread() to ensure the event loop is never blocked — this is
especially important when chunksize is set, as fetch calls trigger lazy S3 reads.
await cursor.execute("SELECT * FROM many_rows")
row = await cursor.fetchone()
rows = await cursor.fetchall()
df = cursor.as_pandas() # In-memory conversion, no await needed
The as_pandas(), as_arrow(), and as_polars() convenience methods operate on
already-loaded data and remain synchronous.
See each cursor’s documentation page for detailed usage examples.
AioS3FileSystem¶
AioS3FileSystem is a native asyncio filesystem interface for Amazon S3, built on
fsspec’s AsyncFileSystem. It provides the same functionality as S3FileSystem but
uses asyncio.gather with asyncio.to_thread for parallel operations instead of
ThreadPoolExecutor.
Why AioS3FileSystem?¶
The synchronous S3FileSystem uses ThreadPoolExecutor for parallel S3 operations
(batch deletes, multipart uploads, range reads). When used from within an asyncio
application via AioS3FSCursor, this creates a thread-in-thread pattern:
the cursor wraps calls in asyncio.to_thread(), and inside that thread
S3FileSystem spawns additional threads via ThreadPoolExecutor.
AioS3FileSystem eliminates this inefficiency by dispatching all parallel
operations through the asyncio event loop.
S3FileSystem |
AioS3FileSystem |
|
|---|---|---|
Parallelism |
|
|
File handles |
|
|
Bulk delete |
Thread pool per batch |
|
Multipart copy |
Thread pool per part |
|
Best for |
Synchronous applications |
Async frameworks (FastAPI, aiohttp, etc.) |
Executor strategy¶
S3FileSystem and S3File use a pluggable executor abstraction (S3Executor) for
parallel operations. Two implementations are provided:
S3ThreadPoolExecutor— wrapsThreadPoolExecutor(default for sync usage)S3AioExecutor— dispatches work viaasyncio.run_coroutine_threadsafe+asyncio.to_thread
AioS3FileSystem automatically uses S3AioExecutor for file handles, so multipart
uploads and parallel range reads are executed on the event loop without spawning
additional threads.
Usage with AioS3FSCursor¶
AioS3FSCursor automatically uses AioS3FileSystem internally. No additional
configuration is needed:
from pyathena import aio_connect
from pyathena.aio.s3fs.cursor import AioS3FSCursor
async with await aio_connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2") as conn:
cursor = conn.cursor(AioS3FSCursor)
await cursor.execute("SELECT * FROM many_rows")
async for row in cursor:
print(row)
Standalone usage¶
AioS3FileSystem can also be used directly for S3 operations:
from pyathena.filesystem.s3_async import AioS3FileSystem
# Async context
fs = AioS3FileSystem(asynchronous=True)
files = await fs._ls("s3://my-bucket/data/")
data = await fs._cat_file("s3://my-bucket/data/file.csv")
await fs._rm("s3://my-bucket/data/old/", recursive=True)
# Sync wrappers are auto-generated by fsspec
files = fs.ls("s3://my-bucket/data/")