Source code for pyathena.aio.connection
from __future__ import annotations
import asyncio
from typing import Any
from pyathena.aio.cursor import AioCursor
from pyathena.connection import Connection
[docs]
class AioConnection(Connection[AioCursor]):
"""Async-aware connection to Amazon Athena.
Wraps the synchronous ``Connection`` with async context manager support
and provides ``create()`` for non-blocking initialization.
Example:
>>> async with await AioConnection.create(
... s3_staging_dir="s3://bucket/path/",
... region_name="us-east-1",
... ) as conn:
... async with conn.cursor() as cursor:
... await cursor.execute("SELECT 1")
... print(await cursor.fetchone())
"""
[docs]
def __init__(self, **kwargs: Any) -> None:
if "cursor_class" not in kwargs:
kwargs["cursor_class"] = AioCursor
super().__init__(**kwargs)
[docs]
@classmethod
async def create(
cls,
**kwargs: Any,
) -> AioConnection:
"""Async factory for creating an ``AioConnection``.
Runs the (potentially blocking) ``__init__`` in a thread so that
STS calls (``role_arn`` / ``serial_number``) do not block the loop.
Args:
**kwargs: Arguments forwarded to ``AioConnection.__init__``.
Returns:
A fully initialized ``AioConnection``.
"""
return await asyncio.to_thread(cls, **kwargs)
async def __aenter__(self) -> AioConnection:
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()