Source code for pyathena.s3fs.reader

from __future__ import annotations

import csv
from collections.abc import Iterator
from typing import Any


[docs] class DefaultCSVReader(Iterator[list[str]]): """CSV reader using Python's standard csv module. This reader wraps Python's standard csv.reader and treats empty fields as empty strings. It does not distinguish between NULL and empty strings in Athena's CSV output - both become empty strings. Use this reader when you need backward compatibility with the behavior where empty strings are treated the same as NULL values. Example: >>> from io import StringIO >>> reader = DefaultCSVReader(StringIO(',"",text')) >>> list(reader) [['', '', 'text']] # Both NULL and empty string become '' Note: The default reader for S3FSCursor is AthenaCSVReader, which distinguishes between NULL and empty string values. """
[docs] def __init__(self, file_obj: Any, delimiter: str = ",") -> None: """Initialize the reader. Args: file_obj: File-like object to read from. delimiter: Field delimiter character. """ self._file: Any | None = file_obj self._reader = csv.reader(file_obj, delimiter=delimiter)
[docs] def __iter__(self) -> DefaultCSVReader: """Iterate over rows in the CSV file.""" return self
[docs] def __next__(self) -> list[str]: """Read and parse the next line. Returns: List of field values as strings. Raises: StopIteration: When end of file is reached or reader is closed. """ if self._file is None: raise StopIteration row = next(self._reader) # Python's csv.reader returns [] for empty lines; normalize to [''] # to represent a single empty field (consistent with single-value handling) if not row: return [""] return row
[docs] def close(self) -> None: """Close the underlying file object.""" if self._file is not None: self._file.close() self._file = None
[docs] def __enter__(self) -> DefaultCSVReader: """Enter context manager.""" return self
[docs] def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """Exit context manager and close resources.""" self.close()
[docs] class AthenaCSVReader(Iterator[list[str | None]]): """CSV reader that distinguishes between NULL and empty string. This is the default reader for S3FSCursor. Athena's CSV output format distinguishes NULL values from empty strings: - NULL: unquoted empty field (e.g., `,,` or `,field`) - Empty string: quoted empty field (e.g., `,"",` or `,"",field`) Python's standard csv module parses both as empty strings, losing this distinction. This reader preserves the difference by returning None for NULL values and empty string for quoted empty values. Example: >>> from io import StringIO >>> reader = AthenaCSVReader(StringIO(',"",text')) >>> list(reader) [[None, '', 'text']] # NULL and empty string are distinguished Note: Use DefaultCSVReader if you need backward compatibility where both NULL and empty string are treated as empty string. """
[docs] def __init__(self, file_obj: Any, delimiter: str = ",") -> None: """Initialize the reader. Args: file_obj: File-like object to read from. delimiter: Field delimiter character. """ self._file: Any | None = file_obj self._delimiter = delimiter
[docs] def __iter__(self) -> AthenaCSVReader: """Iterate over rows in the CSV file.""" return self
[docs] def __next__(self) -> list[str | None]: """Read and parse the next line. Returns: List of field values, with None for NULL and '' for empty string. Raises: StopIteration: When end of file is reached or reader is closed. """ if self._file is None: raise StopIteration line = self._file.readline() if not line: raise StopIteration # Handle multi-line quoted fields: keep reading until quotes are balanced # Track quote state incrementally - only scan each new line once in_quotes = self._check_quote_state(line) while in_quotes: next_line = self._file.readline() if not next_line: # EOF reached with unclosed quote; parse what we have break line += next_line # Only scan the new line, passing current quote state in_quotes = self._check_quote_state(next_line, in_quotes) return self._parse_line(line.rstrip("\r\n"))
def _check_quote_state(self, text: str, starting_state: bool = False) -> bool: """Check quote state after processing text. Args: text: Text to scan for quotes. starting_state: Whether we start inside a quoted field. Returns: True if we end inside an unclosed quote. """ in_quotes = starting_state i = 0 while i < len(text): if text[i] == '"': if in_quotes and i + 1 < len(text) and text[i + 1] == '"': # Escaped quote inside quoted field, skip both i += 2 continue in_quotes = not in_quotes i += 1 return in_quotes def _parse_line(self, line: str) -> list[str | None]: """Parse a single CSV line preserving NULL vs empty string distinction. Args: line: Raw CSV line without trailing newline. Returns: List of field values. """ # Empty line = single NULL field (e.g., SELECT NULL produces empty data line) if not line: return [None] fields: list[str | None] = [] pos = 0 length = len(line) while pos < length: if line[pos] == '"': # Quoted field value, pos = self._parse_quoted_field(line, pos) fields.append(value) else: # Unquoted field value, pos = self._parse_unquoted_field(line, pos) # Unquoted empty field = NULL fields.append(None if value == "" else value) # Handle trailing empty field (line ends with delimiter) if line and line[-1] == self._delimiter: fields.append(None) return fields def _parse_quoted_field(self, line: str, pos: int) -> tuple[str, int]: """Parse a quoted field starting at pos. Args: line: The CSV line. pos: Starting position (at the opening quote). Returns: Tuple of (field value, next position after delimiter). """ pos += 1 # Skip opening quote value_parts = [] length = len(line) while pos < length: if line[pos] == '"': if pos + 1 < length and line[pos + 1] == '"': # Escaped quote value_parts.append('"') pos += 2 else: # End of quoted field pos += 1 # Skip closing quote break else: value_parts.append(line[pos]) pos += 1 # Skip delimiter if present if pos < length and line[pos] == self._delimiter: pos += 1 return "".join(value_parts), pos def _parse_unquoted_field(self, line: str, pos: int) -> tuple[str, int]: """Parse an unquoted field starting at pos. Args: line: The CSV line. pos: Starting position. Returns: Tuple of (field value, next position after delimiter). """ start = pos length = len(line) while pos < length and line[pos] != self._delimiter: pos += 1 value = line[start:pos] # Skip delimiter if present if pos < length and line[pos] == self._delimiter: pos += 1 return value, pos
[docs] def close(self) -> None: """Close the underlying file object.""" if self._file is not None: self._file.close() self._file = None
[docs] def __enter__(self) -> AthenaCSVReader: """Enter context manager.""" return self
[docs] def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """Exit context manager and close resources.""" self.close()