Source code for pyathena.filesystem.s3_executor

from __future__ import annotations

import asyncio
from abc import ABCMeta, abstractmethod
from collections.abc import Callable
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Any, TypeVar

T = TypeVar("T")


[docs] class S3Executor(metaclass=ABCMeta): """Abstract executor for parallel S3 operations. Defines the interface used by ``S3File`` and ``S3FileSystem`` for submitting work to run in parallel and for shutting down the executor when done. Both ``submit`` and ``shutdown`` mirror the ``concurrent.futures.Executor`` interface so that ``as_completed()`` and ``Future.cancel()`` work unchanged. """
[docs] @abstractmethod def submit(self, fn: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]: """Submit a callable for execution and return a Future.""" ...
[docs] @abstractmethod def shutdown(self, wait: bool = True) -> None: """Shut down the executor, freeing any resources.""" ...
def __enter__(self) -> S3Executor: return self def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: self.shutdown(wait=True)
[docs] class S3ThreadPoolExecutor(S3Executor): """Executor that delegates to a ``ThreadPoolExecutor``. This is the default executor used by ``S3File`` and ``S3FileSystem`` for synchronous parallel operations. """
[docs] def __init__(self, max_workers: int) -> None: self._executor = ThreadPoolExecutor(max_workers=max_workers)
[docs] def submit(self, fn: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]: return self._executor.submit(fn, *args, **kwargs)
[docs] def shutdown(self, wait: bool = True) -> None: self._executor.shutdown(wait=wait)
[docs] class S3AioExecutor(S3Executor): """Executor that schedules work on an asyncio event loop. Uses ``asyncio.run_coroutine_threadsafe(asyncio.to_thread(fn), loop)`` to dispatch blocking functions onto the event loop's thread pool, returning ``concurrent.futures.Future`` objects that are compatible with ``as_completed()`` and ``Future.cancel()``. This avoids thread-in-thread nesting when ``S3File`` is used from within ``asyncio.to_thread()`` calls (the pattern used by ``AioS3FileSystem``). Args: loop: A running asyncio event loop. Raises: RuntimeError: If the event loop is not running when ``submit`` is called. """
[docs] def __init__(self, loop: asyncio.AbstractEventLoop | None = None) -> None: self._loop = loop
[docs] def submit(self, fn: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]: if self._loop is not None and self._loop.is_running(): return asyncio.run_coroutine_threadsafe( asyncio.to_thread(fn, *args, **kwargs), self._loop ) raise RuntimeError( "S3AioExecutor requires a running event loop. " "Use S3ThreadPoolExecutor for synchronous usage." )
[docs] def shutdown(self, wait: bool = True) -> None: # No resources to release — work is dispatched to the event loop. pass