Module bytewax.serde

Serialization for recovery and transport.

Expand source code
"""Serialization for recovery and transport."""

from abc import ABC, abstractmethod
from typing import Any

import jsonpickle

__all__ = [
    "JsonPickleSerde",
    "Serde",
]


class Serde(ABC):
    """A serialization format.

    This must support serializing arbitray Python objects and
    reconstituting them exactly. This means using things like
    `json.dumps` and `json.loads` directly will not work, as they do
    not support things like datetimes, integer keys, etc.

    Even if all of your dataflow's state is serializeable by a format,
    Bytewax generates Python objects to store internal data, and they
    must round-trip correctly or there will be errors.

    """

    @staticmethod
    @abstractmethod
    def ser(obj: Any) -> str:
        """Serialize the given object."""
        ...

    @staticmethod
    @abstractmethod
    def de(s: str) -> Any:
        """Deserialize the given object."""
        ...


class JsonPickleSerde(Serde):
    """Serialize objects using `jsonpickle`.

    See [`jsonpickle`](https://github.com/jsonpickle/jsonpickle) for
    more info.

    """

    @staticmethod
    def ser(obj):
        """See ABC docstring."""
        # Enable `keys`, otherwise all __dict__ keys are coereced to
        # strings, which might not be true in general.
        return jsonpickle.encode(obj, keys=True)

    @staticmethod
    def de(s):
        """See ABC docstring."""
        return jsonpickle.decode(s, keys=True)

Classes

class JsonPickleSerde

Serialize objects using jsonpickle.

See jsonpickle for more info.

Expand source code
class JsonPickleSerde(Serde):
    """Serialize objects using `jsonpickle`.

    See [`jsonpickle`](https://github.com/jsonpickle/jsonpickle) for
    more info.

    """

    @staticmethod
    def ser(obj):
        """See ABC docstring."""
        # Enable `keys`, otherwise all __dict__ keys are coereced to
        # strings, which might not be true in general.
        return jsonpickle.encode(obj, keys=True)

    @staticmethod
    def de(s):
        """See ABC docstring."""
        return jsonpickle.decode(s, keys=True)

Ancestors

Static methods

def de(s)

See ABC docstring.

Expand source code
@staticmethod
def de(s):
    """See ABC docstring."""
    return jsonpickle.decode(s, keys=True)
def ser(obj)

See ABC docstring.

Expand source code
@staticmethod
def ser(obj):
    """See ABC docstring."""
    # Enable `keys`, otherwise all __dict__ keys are coereced to
    # strings, which might not be true in general.
    return jsonpickle.encode(obj, keys=True)
class Serde

A serialization format.

This must support serializing arbitray Python objects and reconstituting them exactly. This means using things like json.dumps and json.loads directly will not work, as they do not support things like datetimes, integer keys, etc.

Even if all of your dataflow's state is serializeable by a format, Bytewax generates Python objects to store internal data, and they must round-trip correctly or there will be errors.

Expand source code
class Serde(ABC):
    """A serialization format.

    This must support serializing arbitray Python objects and
    reconstituting them exactly. This means using things like
    `json.dumps` and `json.loads` directly will not work, as they do
    not support things like datetimes, integer keys, etc.

    Even if all of your dataflow's state is serializeable by a format,
    Bytewax generates Python objects to store internal data, and they
    must round-trip correctly or there will be errors.

    """

    @staticmethod
    @abstractmethod
    def ser(obj: Any) -> str:
        """Serialize the given object."""
        ...

    @staticmethod
    @abstractmethod
    def de(s: str) -> Any:
        """Deserialize the given object."""
        ...

Ancestors

  • abc.ABC

Subclasses

Static methods

def de(s: str) ‑> Any

Deserialize the given object.

Expand source code
@staticmethod
@abstractmethod
def de(s: str) -> Any:
    """Deserialize the given object."""
    ...
def ser(obj: Any) ‑> str

Serialize the given object.

Expand source code
@staticmethod
@abstractmethod
def ser(obj: Any) -> str:
    """Serialize the given object."""
    ...