Skip to content

Accumulator

This module offers tools for accumulating and processing data while managing state. With it, you can:

  • Accumulate data over time
  • Maintain state across messages
  • Process accumulated data

Classes

Message dataclass

Message(
    value: bytes,
    keys: list[str] = None,
    tags: list[str] = None,
    watermark: datetime = None,
    event_time: datetime = None,
    headers: dict[str, str] = None,
    id: str = None,
)

Basic datatype for data passing to the next vertex/vertices.

Parameters:

Name Type Description Default
value bytes

data in bytes

required
keys list[str]

[]string keys for vertex (optional)

None
tags list[str]

[]string tags for conditional forwarding (optional)

None
watermark datetime

watermark for this message (optional)

None
event_time datetime

event time for this message (optional)

None
headers dict[str, str]

headers for this message (optional)

None
id str

message id (optional)

None
Source code in pynumaflow/accumulator/_dtypes.py
def __init__(
    self,
    value: bytes,
    keys: list[str] = None,
    tags: list[str] = None,
    watermark: datetime = None,
    event_time: datetime = None,
    headers: dict[str, str] = None,
    id: str = None,
):
    """
    Creates a Message object to send value to a vertex.
    """
    self._keys = keys or []
    self._tags = tags or []
    self._value = value or b""
    self._watermark = watermark
    self._event_time = event_time
    self._headers = headers or {}
    self._id = id or ""

value property

value: bytes

Returns the message payload value.

Returns:

Type Description
bytes

The message payload data as bytes.

keys property

keys: list[str]

Returns the message keys.

Returns:

Type Description
list[str]

A list of string keys associated with this message.

tags property

tags: list[str]

Returns the message tags for conditional forwarding.

Returns:

Type Description
list[str]

A list of string tags used for conditional forwarding.

watermark property

watermark: datetime

Returns the watermark timestamp for this message.

Returns:

Type Description
datetime

The watermark timestamp, or None if not set.

event_time property

event_time: datetime

Returns the event time for this message.

Returns:

Type Description
datetime

The event time timestamp, or None if not set.

headers property

headers: dict[str, str]

Returns the message headers.

Returns:

Type Description
dict[str, str]

A dictionary containing header key-value pairs for this message.

id property

id: str

Returns the message ID.

Returns:

Type Description
str

The unique identifier for this message.

to_drop classmethod

to_drop() -> M

Creates a Message instance that indicates the message should be dropped.

Returns:

Type Description
M

A Message instance with empty value and DROP tag indicating the message should be dropped.

Source code in pynumaflow/accumulator/_dtypes.py
@classmethod
def to_drop(cls: type[M]) -> M:
    """Creates a Message instance that indicates the message should be dropped.

    Returns:
        M: A Message instance with empty value and DROP tag indicating
           the message should be dropped.
    """
    return cls(b"", None, [DROP])

from_datum classmethod

from_datum(datum: Datum)

Create a Message instance from a Datum object.

Parameters:

Name Type Description Default
datum Datum

The Datum object to convert

required

Returns:

Type Description
Message

A new Message instance with data from the datum

Source code in pynumaflow/accumulator/_dtypes.py
@classmethod
def from_datum(cls, datum: Datum):
    """Create a Message instance from a Datum object.

    Args:
        datum: The Datum object to convert

    Returns:
        Message: A new Message instance with data from the datum
    """
    return cls(
        value=datum.value,
        keys=datum.keys,
        watermark=datum.watermark,
        event_time=datum.event_time,
        headers=datum.headers,
        id=datum.id,
    )

Datum dataclass

Datum(
    keys: list[str],
    value: bytes,
    event_time: datetime,
    watermark: datetime,
    id_: str,
    headers: Optional[dict[str, str]] = None,
)

Class to define the important information for the event.

Parameters:

Name Type Description Default
keys list[str]

the keys of the event.

required
value bytes

the payload of the event.

required
event_time datetime

the event time of the event.

required
watermark datetime

the watermark of the event.

required

Example usage

from pynumaflow.accumulator import Datum
from datetime import datetime, timezone

d = Datum(
      keys=["test_key"],
      value=b"test_mock_message",
      event_time=datetime.fromtimestamp(1662998400, timezone.utc),
      watermark=datetime.fromtimestamp(1662998460, timezone.utc),
      headers={"key1": "value1", "key2": "value2"},
   )

Source code in pynumaflow/accumulator/_dtypes.py
def __init__(
    self,
    keys: list[str],
    value: bytes,
    event_time: datetime,
    watermark: datetime,
    id_: str,
    headers: Optional[dict[str, str]] = None,
):
    self._keys = keys or list()
    self._value = value or b""
    if not isinstance(event_time, datetime):
        raise TypeError(f"Wrong data type: {type(event_time)} for Datum.event_time")
    self._event_time = event_time
    if not isinstance(watermark, datetime):
        raise TypeError(f"Wrong data type: {type(watermark)} for Datum.watermark")
    self._watermark = watermark
    self._headers = headers or {}
    self._id = id_

keys property

keys: list[str]

Returns the keys of the event.

Returns:

Type Description
list[str]

A list of string keys associated with this event.

value property

value: bytes

Returns the value of the event.

Returns:

Type Description
bytes

The payload data of the event as bytes.

event_time property

event_time: datetime

Returns the event time of the event.

Returns:

Type Description
datetime

The timestamp when the event occurred.

watermark property

watermark: datetime

Returns the watermark of the event.

Returns:

Type Description
datetime

The watermark timestamp indicating the progress of event time.

headers property

headers: dict[str, str]

Returns the headers of the event.

Returns:

Type Description
dict[str, str]

A dictionary containing header key-value pairs for this event.

id property

id: str

Returns the id of the event.

Returns:

Type Description
str

The unique identifier for this event.

IntervalWindow dataclass

IntervalWindow(start: datetime, end: datetime)

Defines the start and end of the interval window for the event.

Source code in pynumaflow/accumulator/_dtypes.py
def __init__(self, start: datetime, end: datetime):
    self._start = start
    self._end = end

start property

start: datetime

Returns the start point of the interval window.

Returns:

Type Description
datetime

The start timestamp of the interval window.

end property

end: datetime

Returns the end point of the interval window.

Returns:

Type Description
datetime

The end timestamp of the interval window.

KeyedWindow dataclass

KeyedWindow(
    start: datetime,
    end: datetime,
    slot: str = "",
    keys: list[str] = [],
)

Defines the window for a accumulator operation which includes the interval window along with the slot.

Source code in pynumaflow/accumulator/_dtypes.py
def __init__(self, start: datetime, end: datetime, slot: str = "", keys: list[str] = []):
    self._window = IntervalWindow(start=start, end=end)
    self._slot = slot
    self._keys = keys

start property

start: datetime

Returns the start point of the interval window.

Returns:

Type Description
datetime

The start timestamp of the interval window.

end property

end: datetime

Returns the end point of the interval window.

Returns:

Type Description
datetime

The end timestamp of the interval window.

slot property

slot: str

Returns the slot from the window.

Returns:

Type Description
str

The slot identifier for this window.

window property

Returns the interval window.

Returns:

Type Description
IntervalWindow

The underlying interval window object.

keys property

keys: list[str]

Returns the keys for window.

Returns:

Type Description
list[str]

A list of keys associated with this window.

Accumulator

Accumulate can read unordered from the input stream and emit the ordered data to the output stream. Once the watermark (WM) of the output stream progresses, the data in WAL until that WM will be garbage collected. NOTE: A message can be silently dropped if need be, and it will be cleared from the WAL when the WM progresses.

handler abstractmethod async

handler(
    datums: AsyncIterable[Datum],
    output: NonBlockingIterator,
)

Implement this handler function which implements the AccumulatorStreamCallable interface.

Source code in pynumaflow/accumulator/_dtypes.py
@abstractmethod
async def handler(
    self,
    datums: AsyncIterable[Datum],
    output: NonBlockingIterator,
):
    """
    Implement this handler function which implements the AccumulatorStreamCallable interface.
    """
    pass

AccumulatorAsyncServer

AccumulatorAsyncServer(
    accumulator_instance: AccumulatorStreamCallable,
    init_args: tuple = (),
    init_kwargs: Optional[dict] = None,
    sock_path=ACCUMULATOR_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=ACCUMULATOR_SERVER_INFO_FILE_PATH,
)

Bases: NumaflowServer

Class for a new Accumulator Server instance. A new servicer instance is created and attached to the server. The server instance is returned.

Parameters:

Name Type Description Default
accumulator_instance AccumulatorStreamCallable

The accumulator instance to be used for Accumulator UDF

required
init_args tuple

The arguments to be passed to the accumulator_handler

()
init_kwargs Optional[dict]

The keyword arguments to be passed to the accumulator_handler

None
sock_path

The UNIX socket path to be used for the server

ACCUMULATOR_SOCK_PATH
max_message_size

The max message size in bytes the server can receive and send

MAX_MESSAGE_SIZE
max_threads

The max number of threads to be spawned;

NUM_THREADS_DEFAULT
server_info_file

The path to the server info file

ACCUMULATOR_SERVER_INFO_FILE_PATH

Example invocation:

import os
from collections.abc import AsyncIterable
from datetime import datetime

from pynumaflow.accumulator import Accumulator, AccumulatorAsyncServer
from pynumaflow.accumulator import Message, Datum
from pynumaflow.shared.asynciter import NonBlockingIterator

class StreamSorter(Accumulator):
    def __init__(self, counter):
        self.latest_wm = datetime.fromtimestamp(-1)
        self.sorted_buffer: list[Datum] = []

    async def handler(
        self,
        datums: AsyncIterable[Datum],
        output: NonBlockingIterator,
    ):
        async for _ in datums:
            # Process the datums and send output
            if datum.watermark and datum.watermark > self.latest_wm:
                self.latest_wm = datum.watermark
                await self.flush_buffer(output)

            self.insert_sorted(datum)

    def insert_sorted(self, datum: Datum):
        # Binary insert to keep sorted buffer in order
        left, right = 0, len(self.sorted_buffer)
        while left < right:
            mid = (left + right) // 2
            if self.sorted_buffer[mid].event_time > datum.event_time:
                right = mid
            else:
                left = mid + 1
        self.sorted_buffer.insert(left, datum)

    async def flush_buffer(self, output: NonBlockingIterator):
        i = 0
        for datum in self.sorted_buffer:
            if datum.event_time > self.latest_wm:
                break
            await output.put(Message.from_datum(datum))
            i += 1
        # Remove flushed items
        self.sorted_buffer = self.sorted_buffer[i:]


if __name__ == "__main__":
    grpc_server = AccumulatorAsyncServer(StreamSorter)
    grpc_server.start()

Source code in pynumaflow/accumulator/async_server.py
def __init__(
    self,
    accumulator_instance: AccumulatorStreamCallable,
    init_args: tuple = (),
    init_kwargs: Optional[dict] = None,
    sock_path=ACCUMULATOR_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=ACCUMULATOR_SERVER_INFO_FILE_PATH,
):
    init_kwargs = init_kwargs or {}
    self.accumulator_handler = get_handler(accumulator_instance, init_args, init_kwargs)
    self.sock_path = f"unix://{sock_path}"
    self.max_message_size = max_message_size
    self.max_threads = min(max_threads, MAX_NUM_THREADS)
    self.server_info_file = server_info_file

    self._server_options = [
        ("grpc.max_send_message_length", self.max_message_size),
        ("grpc.max_receive_message_length", self.max_message_size),
    ]
    # Get the servicer instance for the async server
    self.servicer = AsyncAccumulatorServicer(self.accumulator_handler)

start

start()

Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context

Source code in pynumaflow/accumulator/async_server.py
def start(self):
    """
    Starter function for the Async server class, need a separate caller
    so that all the async coroutines can be started from a single context
    """
    _LOGGER.info(
        "Starting Async Accumulator Server",
    )
    aiorun.run(self.aexec(), use_uvloop=True)

aexec async

aexec()

Starts the Async gRPC server on the given UNIX socket with given max threads.

Source code in pynumaflow/accumulator/async_server.py
async def aexec(self):
    """
    Starts the Async gRPC server on the given UNIX socket with
    given max threads.
    """
    # As the server is async, we need to create a new server instance in the
    # same thread as the event loop so that all the async calls are made in the
    # same context
    # Create a new async server instance and add the servicer to it
    server = grpc.aio.server(options=self._server_options)
    server.add_insecure_port(self.sock_path)
    accumulator_pb2_grpc.add_AccumulatorServicer_to_server(self.servicer, server)

    serv_info = ServerInfo.get_default_server_info()
    serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[ContainerType.Accumulator]
    await start_async_server(
        server_async=server,
        sock_path=self.sock_path,
        max_threads=self.max_threads,
        cleanup_coroutines=list(),
        server_info_file=self.server_info_file,
        server_info=serv_info,
    )