Source code for datalad_next.itertools.decode_bytes

"""Get strings decoded from chunks of bytes """

from __future__ import annotations

from typing import (
    Generator,
    Iterable,
)


__all__ = ['decode_bytes']


[docs] def decode_bytes( iterable: Iterable[bytes], encoding: str = 'utf-8', backslash_replace: bool = True, ) -> Generator[str, None, None]: """Decode bytes in an ``iterable`` into strings This function decodes ``bytes`` or ``bytearray`` into ``str`` objects, using the specified encoding. Importantly, the decoding input can be spread across multiple chunks of heterogeneous sizes, for example output read from a process or pieces of a download. Multi-byte encodings that are spread over multiple byte chunks are supported, and chunks are joined as necessary. For example, the utf-8 encoding for ö is ``b'\\xc3\\xb6'``. If the encoding is split in the middle because a chunk ends with ``b'\\xc3'`` and the next chunk starts with ``b'\\xb6'``, a naive decoding approach like the following would fail: .. code-block:: python >>> [chunk.decode() for chunk in [b'\\xc3', b'\\xb6']] # doctest: +SKIP Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<stdin>", line 1, in <listcomp> UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc3 in position 0: unexpected end of data Compared to: .. code-block:: python >>> from datalad_next.itertools import decode_bytes >>> tuple(decode_bytes([b'\\xc3', b'\\xb6'])) ('ö',) Input chunks are only joined, if it is necessary to properly decode bytes: .. code-block:: python >>> from datalad_next.itertools import decode_bytes >>> tuple(decode_bytes([b'\\xc3', b'\\xb6', b'a'])) ('ö', 'a') If ``backslash_replace`` is ``True``, undecodable bytes will be replaced with a backslash-substitution. Otherwise, undecodable bytes will raise a ``UnicodeDecodeError``: .. code-block:: python >>> tuple(decode_bytes([b'\\xc3'])) ('\\\\xc3',) >>> tuple(decode_bytes([b'\\xc3'], backslash_replace=False)) # doctest: +SKIP Traceback (most recent call last): ... UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc3 in position 1: invalid continuation byte Backslash-replacement of undecodable bytes is an ambiguous mapping, because, for example, ``b'\\xc3'`` can already be present in the input. Parameters ---------- iterable: Iterable[bytes] Iterable that yields bytes that should be decoded encoding: str (default: ``'utf-8'``) Encoding to be used for decoding. backslash_replace: bool (default: ``True``) If ``True``, backslash-escapes are used for undecodable bytes. If ``False``, a ``UnicodeDecodeError`` is raised if a byte sequence cannot be decoded. Yields ------ str Decoded strings that are generated by decoding the data yielded by ``iterable`` with the specified ``encoding`` Raises ------ UnicodeDecodeError If ``backslash_replace`` is ``False`` and the data yielded by ``iterable`` cannot be decoded with the specified ``encoding`` """ def handle_decoding_error(position: int, exc: UnicodeDecodeError ) -> tuple[int, str]: """ Handle a UnicodeDecodeError """ if not backslash_replace: # Signal the error to the caller raise exc else: return ( position + exc.end, joined_data[:position + exc.start].decode(encoding) + joined_data[position + exc.start:position + exc.end].decode( encoding, errors='backslashreplace' ), ) joined_data = b'' pending_error = None position = 0 for chunk in iterable: joined_data += chunk while position < len(joined_data): try: yield joined_data[position:].decode(encoding) joined_data = b'' except UnicodeDecodeError as e: # If an encoding error occurs, we first check whether it was # in the middle of `joined_data` or whether it extends until the # end of `joined_data`. # If it occurred in the middle of # `joined_data`, we replace it with backslash encoding or # re-raise the decoding error. # If it occurred at the end of `joined_data`, we wait for the # next chunk, which might fix the problem. if position + e.end == len(joined_data): # Wait for the next chunk, which might fix the problem pending_error = e break else: pending_error = None position, string = handle_decoding_error(position, e) yield string if pending_error: # If the last chunk has a decoding error at the end, process it. position, string = handle_decoding_error(position, pending_error) if string: yield string