Source code for pyathena.filesystem.s3_executor
from __future__ import annotations
import asyncio
from abc import ABCMeta, abstractmethod
from collections.abc import Callable
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Any, TypeVar
T = TypeVar("T")
[docs]
class S3Executor(metaclass=ABCMeta):
"""Abstract executor for parallel S3 operations.
Defines the interface used by ``S3File`` and ``S3FileSystem`` for submitting
work to run in parallel and for shutting down the executor when done.
Both ``submit`` and ``shutdown`` mirror the ``concurrent.futures.Executor``
interface so that ``as_completed()`` and ``Future.cancel()`` work unchanged.
"""
[docs]
@abstractmethod
def submit(self, fn: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]:
"""Submit a callable for execution and return a Future."""
...
[docs]
@abstractmethod
def shutdown(self, wait: bool = True) -> None:
"""Shut down the executor, freeing any resources."""
...
def __enter__(self) -> S3Executor:
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.shutdown(wait=True)
[docs]
class S3ThreadPoolExecutor(S3Executor):
"""Executor that delegates to a ``ThreadPoolExecutor``.
This is the default executor used by ``S3File`` and ``S3FileSystem``
for synchronous parallel operations.
"""
[docs]
def __init__(self, max_workers: int) -> None:
self._executor = ThreadPoolExecutor(max_workers=max_workers)
[docs]
def submit(self, fn: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]:
return self._executor.submit(fn, *args, **kwargs)
[docs]
def shutdown(self, wait: bool = True) -> None:
self._executor.shutdown(wait=wait)
[docs]
class S3AioExecutor(S3Executor):
"""Executor that schedules work on an asyncio event loop.
Uses ``asyncio.run_coroutine_threadsafe(asyncio.to_thread(fn), loop)`` to
dispatch blocking functions onto the event loop's thread pool, returning
``concurrent.futures.Future`` objects that are compatible with
``as_completed()`` and ``Future.cancel()``.
This avoids thread-in-thread nesting when ``S3File`` is used from within
``asyncio.to_thread()`` calls (the pattern used by ``AioS3FileSystem``).
Args:
loop: A running asyncio event loop.
Raises:
RuntimeError: If the event loop is not running when ``submit`` is called.
"""
[docs]
def __init__(self, loop: asyncio.AbstractEventLoop | None = None) -> None:
self._loop = loop
[docs]
def submit(self, fn: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]:
if self._loop is not None and self._loop.is_running():
return asyncio.run_coroutine_threadsafe(
asyncio.to_thread(fn, *args, **kwargs), self._loop
)
raise RuntimeError(
"S3AioExecutor requires a running event loop. "
"Use S3ThreadPoolExecutor for synchronous usage."
)
[docs]
def shutdown(self, wait: bool = True) -> None:
# No resources to release — work is dispatched to the event loop.
pass