Skip to content

Reduce Streamer

The Reduce Streamer module provides classes and functions for implementing ReduceStream UDFs that emit results incrementally during reduction. Unlike regular Reduce which outputs only when the window closes, Reduce Stream emits results as they're computed. This is useful for early alerts or real-time dashboards.

Classes

Message dataclass

Message(
    value: bytes,
    keys: list[str] = None,
    tags: list[str] = None,
)

Basic datatype for data passing to the next vertex/vertices.

Parameters:

Name Type Description Default
value bytes

data in bytes

required
keys list[str]

[]string keys for vertex (optional)

None
tags list[str]

[]string tags for conditional forwarding (optional)

None
Source code in pynumaflow/reducestreamer/_dtypes.py
def __init__(
    self,
    value: bytes,
    keys: list[str] = None,
    tags: list[str] = None,
):
    """
    Creates a Message object to send value to a vertex.
    """
    self._keys = keys or []
    self._tags = tags or []
    self._value = value or b""

Datum dataclass

Datum(
    keys: list[str],
    value: bytes,
    event_time: datetime,
    watermark: datetime,
    headers: Optional[dict[str, str]] = None,
)

Class to define the important information for the event.

Parameters:

Name Type Description Default
keys list[str]

the keys of the event.

required
value bytes

the payload of the event.

required
event_time datetime

the event time of the event.

required
watermark datetime

the watermark of the event.

required

Example usage

from pynumaflow.reducer import Datum
from datetime import datetime, timezone

d = Datum(
      keys=["test_key"],
      value="test_mock_message".encode(),
      event_time=datetime.fromtimestamp(1662998400, timezone.utc),
      watermark=datetime.fromtimestamp(1662998460, timezone.utc),
      headers={"key1": "value1", "key2": "value2"}
   )
Source code in pynumaflow/reducestreamer/_dtypes.py
def __init__(
    self,
    keys: list[str],
    value: bytes,
    event_time: datetime,
    watermark: datetime,
    headers: Optional[dict[str, str]] = None,
):
    self._keys = keys or list()
    self._value = value or b""
    if not isinstance(event_time, datetime):
        raise TypeError(f"Wrong data type: {type(event_time)} for Datum.event_time")
    self._event_time = event_time
    if not isinstance(watermark, datetime):
        raise TypeError(f"Wrong data type: {type(watermark)} for Datum.watermark")
    self._watermark = watermark
    self._headers = headers or {}

keys property

keys: list[str]

Returns the keys of the event

value property

value: bytes

Returns the value of the event.

event_time property

event_time: datetime

Returns the event time of the event.

watermark property

watermark: datetime

Returns the watermark of the event.

headers property

headers: dict[str, str]

Returns the headers of the event.

IntervalWindow dataclass

IntervalWindow(start: datetime, end: datetime)

Defines the start and end of the interval window for the event.

Source code in pynumaflow/reducestreamer/_dtypes.py
def __init__(self, start: datetime, end: datetime):
    self._start = start
    self._end = end

start property

start

Returns the start point of the interval window.

end property

end

Returns the end point of the interval window.

Metadata dataclass

Metadata(interval_window: IntervalWindow)

Defines the metadata for the event.

Source code in pynumaflow/reducestreamer/_dtypes.py
def __init__(self, interval_window: IntervalWindow):
    self._interval_window = interval_window

interval_window property

interval_window

Returns the interval window for the event.

ReduceStreamer

Provides an interface to write a ReduceStreamer which will be exposed over a gRPC server.

handler abstractmethod async

handler(
    keys: list[str],
    datums: AsyncIterable[Datum],
    output: NonBlockingIterator,
    md: Metadata,
)

Implement this handler function which implements the ReduceStreamCallable interface.

Source code in pynumaflow/reducestreamer/_dtypes.py
@abstractmethod
async def handler(
    self,
    keys: list[str],
    datums: AsyncIterable[Datum],
    output: NonBlockingIterator,
    md: Metadata,
):
    """
    Implement this handler function which implements the ReduceStreamCallable interface.
    """
    pass

ReduceWindow dataclass

ReduceWindow(
    start: datetime, end: datetime, slot: str = ""
)

Defines the window for a reduce operation which includes the interval window along with the slot.

Source code in pynumaflow/reducestreamer/_dtypes.py
def __init__(self, start: datetime, end: datetime, slot: str = ""):
    self._window = IntervalWindow(start=start, end=end)
    self._slot = slot

start property

start

Returns the start point of the interval window.

end property

end

Returns the end point of the interval window.

slot property

slot

Returns the slot from the window

window property

window

Return the interval window

ReduceStreamAsyncServer

ReduceStreamAsyncServer(
    reduce_stream_instance: ReduceStreamCallable,
    init_args: tuple = (),
    init_kwargs: Optional[dict] = None,
    sock_path=REDUCE_STREAM_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=REDUCE_STREAM_SERVER_INFO_FILE_PATH,
)

Bases: NumaflowServer

Class for a new Reduce Stream Server instance. A new servicer instance is created and attached to the server. The server instance is returned.

Parameters:

Name Type Description Default
reduce_stream_instance ReduceStreamCallable

The reducer instance to be used for Reduce Streaming UDF

required
init_args tuple

The arguments to be passed to the reduce_stream_handler

()
init_kwargs Optional[dict]

The keyword arguments to be passed to the reduce_stream_handler

None
sock_path

The UNIX socket path to be used for the server

REDUCE_STREAM_SOCK_PATH
max_message_size

The max message size in bytes the server can receive and send

MAX_MESSAGE_SIZE
max_threads

The max number of threads to be spawned;

NUM_THREADS_DEFAULT
server_info_file

The path to the server info file

REDUCE_STREAM_SERVER_INFO_FILE_PATH

Example invocation:

import os
from collections.abc import AsyncIterable
from pynumaflow.reducestreamer import Messages, Message, Datum, Metadata,
ReduceStreamAsyncServer, ReduceStreamer

class ReduceCounter(ReduceStreamer):
    def __init__(self, counter):
        self.counter = counter

    async def handler(
        self,
        keys: list[str],
        datums: AsyncIterable[Datum],
        output: NonBlockingIterator,
        md: Metadata,
    ):
        async for _ in datums:
            self.counter += 1
            if self.counter > 20:
                msg = f"counter:{self.counter}"
                await output.put(Message(str.encode(msg), keys=keys))
                self.counter = 0
        msg = f"counter:{self.counter}"
        await output.put(Message(str.encode(msg), keys=keys))

async def reduce_handler(
        keys: list[str],
        datums: AsyncIterable[Datum],
        output: NonBlockingIterator,
        md: Metadata,
    ):
    counter = 0
    async for _ in datums:
        counter += 1
        if counter > 20:
            msg = f"counter:{counter}"
            await output.put(Message(str.encode(msg), keys=keys))
            counter = 0
    msg = f"counter:{counter}"
    await output.put(Message(str.encode(msg), keys=keys))

if __name__ == "__main__":
    invoke = os.getenv("INVOKE", "func_handler")
    if invoke == "class":
        # Here we are using the class instance as the reducer_instance
        # which will be used to invoke the handler function.
        # We are passing the init_args for the class instance.
        grpc_server = ReduceStreamAsyncServer(ReduceCounter, init_args=(0,))
    else:
        # Here we are using the handler function directly as the reducer_instance.
        grpc_server = ReduceStreamAsyncServer(reduce_handler)
    grpc_server.start()

Source code in pynumaflow/reducestreamer/async_server.py
def __init__(
    self,
    reduce_stream_instance: ReduceStreamCallable,
    init_args: tuple = (),
    init_kwargs: Optional[dict] = None,
    sock_path=REDUCE_STREAM_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=REDUCE_STREAM_SERVER_INFO_FILE_PATH,
):
    init_kwargs = init_kwargs or {}
    self.reduce_stream_handler = get_handler(reduce_stream_instance, init_args, init_kwargs)
    self.sock_path = f"unix://{sock_path}"
    self.max_message_size = max_message_size
    self.max_threads = min(max_threads, MAX_NUM_THREADS)
    self.server_info_file = server_info_file

    self._server_options = [
        ("grpc.max_send_message_length", self.max_message_size),
        ("grpc.max_receive_message_length", self.max_message_size),
    ]
    # Get the servicer instance for the async server
    self.servicer = AsyncReduceStreamServicer(self.reduce_stream_handler)

start

start()

Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context

Source code in pynumaflow/reducestreamer/async_server.py
def start(self):
    """
    Starter function for the Async server class, need a separate caller
    so that all the async coroutines can be started from a single context
    """
    _LOGGER.info(
        "Starting Async Reduce Stream Server",
    )
    aiorun.run(self.aexec(), use_uvloop=True)

aexec async

aexec()

Starts the Async gRPC server on the given UNIX socket with given max threads.

Source code in pynumaflow/reducestreamer/async_server.py
async def aexec(self):
    """
    Starts the Async gRPC server on the given UNIX socket with
    given max threads.
    """
    # As the server is async, we need to create a new server instance in the
    # same thread as the event loop so that all the async calls are made in the
    # same context
    # Create a new async server instance and add the servicer to it
    server = grpc.aio.server(options=self._server_options)
    server.add_insecure_port(self.sock_path)
    reduce_pb2_grpc.add_ReduceServicer_to_server(self.servicer, server)

    serv_info = ServerInfo.get_default_server_info()
    serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[ContainerType.Reducestreamer]
    await start_async_server(
        server_async=server,
        sock_path=self.sock_path,
        max_threads=self.max_threads,
        cleanup_coroutines=list(),
        server_info_file=self.server_info_file,
        server_info=serv_info,
    )