Source code for pyathena.sqlalchemy.pandas

from typing import TYPE_CHECKING

from pyathena.sqlalchemy.base import AthenaDialect
from pyathena.util import strtobool

if TYPE_CHECKING:
    from types import ModuleType


[docs] class AthenaPandasDialect(AthenaDialect): """SQLAlchemy dialect for Amazon Athena with pandas DataFrame result format. This dialect extends AthenaDialect to use PandasCursor, which returns query results as pandas DataFrames. This integration enables seamless use of Athena data in data analysis and machine learning workflows. Connection URL Format: ``awsathena+pandas://{access_key}:{secret_key}@athena.{region}.amazonaws.com/{schema}`` Query Parameters: In addition to the base dialect parameters: - unload: If "true", use UNLOAD for Parquet output (better performance for large datasets) - engine: CSV parsing engine ("c", "python", or "pyarrow") - chunksize: Number of rows per chunk for memory-efficient processing Example: >>> from sqlalchemy import create_engine >>> engine = create_engine( ... "awsathena+pandas://:@athena.us-west-2.amazonaws.com/default" ... "?s3_staging_dir=s3://my-bucket/athena-results/" ... "&unload=true&chunksize=10000" ... ) See Also: :class:`~pyathena.pandas.cursor.PandasCursor`: The underlying cursor implementation. :class:`~pyathena.sqlalchemy.base.AthenaDialect`: Base dialect class. """ driver = "pandas" supports_statement_cache = True
[docs] def create_connect_args(self, url): from pyathena.pandas.cursor import PandasCursor opts = super()._create_connect_args(url) opts.update({"cursor_class": PandasCursor}) cursor_kwargs = {} if "unload" in opts: cursor_kwargs.update({"unload": bool(strtobool(opts.pop("unload")))}) if "engine" in opts: cursor_kwargs.update({"engine": opts.pop("engine")}) if "chunksize" in opts: cursor_kwargs.update({"chunksize": int(opts.pop("chunksize"))}) # type: ignore[dict-item] if cursor_kwargs: opts.update({"cursor_kwargs": cursor_kwargs}) return [[], opts]
[docs] @classmethod def import_dbapi(cls) -> "ModuleType": return super().import_dbapi()