Skip to content

Reducer

The Reducer module provides classes and functions for implementing Reduce UDFs that aggregate messages by key within time windows. It's used for operations like counting, summing, or computing statistics over groups of messages.

Classes

Message dataclass

Message(
    value: bytes,
    keys: list[str] = None,
    tags: list[str] = None,
)

Basic datatype for data passing to the next vertex/vertices.

Parameters:

Name Type Description Default
value bytes

data in bytes

required
keys list[str]

[]string keys for vertex (optional)

None
tags list[str]

[]string tags for conditional forwarding (optional)

None
Source code in pynumaflow/reducer/_dtypes.py
def __init__(self, value: bytes, keys: list[str] = None, tags: list[str] = None):
    """
    Creates a Message object to send value to a vertex.
    """
    self._keys = keys or []
    self._tags = tags or []
    self._value = value or b""

Messages

Messages(*messages: M)

Bases: Sequence[M]

Class to define a list of Message objects.

Parameters:

Name Type Description Default
messages M

list of Message objects.

()
Source code in pynumaflow/reducer/_dtypes.py
def __init__(self, *messages: M):
    self._messages = list(messages) or []

Datum dataclass

Datum(
    keys: list[str],
    value: bytes,
    event_time: datetime,
    watermark: datetime,
    headers: Optional[dict[str, str]] = None,
)

Class to define the important information for the event.

Parameters:

Name Type Description Default
keys list[str]

the keys of the event.

required
value bytes

the payload of the event.

required
event_time datetime

the event time of the event.

required
watermark datetime

the watermark of the event.

required
headers Optional[dict[str, str]]

the headers of the event.

None

Example usage

from pynumaflow.reducer import Datum
from datetime import datetime, timezone

d = Datum(
      keys=["test_key"],
      value=b"test_mock_message",
      event_time=datetime.fromtimestamp(1662998400, timezone.utc),
      watermark=datetime.fromtimestamp(1662998460, timezone.utc),
      headers={"key1": "value1", "key2": "value2"},
   )

Source code in pynumaflow/reducer/_dtypes.py
def __init__(
    self,
    keys: list[str],
    value: bytes,
    event_time: datetime,
    watermark: datetime,
    headers: Optional[dict[str, str]] = None,
):
    self._keys = keys or list()
    self._value = value or b""
    if not isinstance(event_time, datetime):
        raise TypeError(f"Wrong data type: {type(event_time)} for Datum.event_time")
    self._event_time = event_time
    if not isinstance(watermark, datetime):
        raise TypeError(f"Wrong data type: {type(watermark)} for Datum.watermark")
    self._watermark = watermark
    self._headers = headers or {}

keys property

keys: list[str]

Returns the keys of the event

value property

value: bytes

Returns the value of the event.

event_time property

event_time: datetime

Returns the event time of the event.

watermark property

watermark: datetime

Returns the watermark of the event.

headers property

headers: dict[str, str]

Returns the headers of the event.

IntervalWindow dataclass

IntervalWindow(start: datetime, end: datetime)

Defines the start and end of the interval window for the event.

Source code in pynumaflow/reducer/_dtypes.py
def __init__(self, start: datetime, end: datetime):
    self._start = start
    self._end = end

start property

start

Returns the start point of the interval window.

end property

end

Returns the end point of the interval window.

Metadata dataclass

Metadata(interval_window: IntervalWindow)

Defines the metadata for the event.

Source code in pynumaflow/reducer/_dtypes.py
def __init__(self, interval_window: IntervalWindow):
    self._interval_window = interval_window

interval_window property

interval_window

Returns the interval window for the event.

Reducer

Provides an interface to write a Reducer which will be exposed over a gRPC server.

handler abstractmethod async

handler(
    keys: list[str],
    datums: AsyncIterable[Datum],
    md: Metadata,
) -> Messages

Implement this handler function which implements the ReduceCallable interface.

Source code in pynumaflow/reducer/_dtypes.py
@abstractmethod
async def handler(
    self, keys: list[str], datums: AsyncIterable[Datum], md: Metadata
) -> Messages:
    """
    Implement this handler function which implements the ReduceCallable interface.
    """
    pass

ReduceAsyncServer

ReduceAsyncServer(
    reducer_instance: ReduceCallable,
    init_args: tuple = (),
    init_kwargs: Optional[dict] = None,
    sock_path=REDUCE_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=REDUCE_SERVER_INFO_FILE_PATH,
)

Bases: NumaflowServer

Class for a new Reduce Server instance. A new servicer instance is created and attached to the server. The server instance is returned.

Parameters:

Name Type Description Default
reducer_instance ReduceCallable

The reducer instance to be used for Reduce UDF

required
sock_path

The UNIX socket path to be used for the server

REDUCE_SOCK_PATH
max_message_size

The max message size in bytes the server can receive and send

MAX_MESSAGE_SIZE
max_threads

The max number of threads to be spawned; defaults to 4 and max capped at 16

NUM_THREADS_DEFAULT

Example invocation:

import os
from collections.abc import AsyncIterable
from pynumaflow.reducer import Messages, Message, Datum, Metadata,
ReduceAsyncServer, Reducer

class ReduceCounter(Reducer):
    def __init__(self, counter):
        self.counter = counter

    async def handler(
        self, keys: list[str], datums: AsyncIterable[Datum], md: Metadata
    ) -> Messages:
        interval_window = md.interval_window
        self.counter = 0
        async for _ in datums:
            self.counter += 1
        msg = (
            f"counter:{self.counter} interval_window_start:{interval_window.start} "
            f"interval_window_end:{interval_window.end}"
        )
        return Messages(Message(str.encode(msg), keys=keys))

async def reduce_handler(
    keys: list[str], datums: AsyncIterable[Datum], md: Metadata
) -> Messages:
    interval_window = md.interval_window
    counter = 0
    async for _ in datums:
        counter += 1
    msg = (
        f"counter:{counter} interval_window_start:{interval_window.start} "
        f"interval_window_end:{interval_window.end}"
    )
    return Messages(Message(str.encode(msg), keys=keys))

if __name__ == "__main__":
    invoke = os.getenv("INVOKE", "func_handler")
    if invoke == "class":
        # Here we are using the class instance as the reducer_instance
        # which will be used to invoke the handler function.
        # We are passing the init_args for the class instance.
        grpc_server = ReduceAsyncServer(ReduceCounter, init_args=(0,))
    else:
        # Here we are using the handler function directly as the reducer_instance.
        grpc_server = ReduceAsyncServer(reduce_handler)
    grpc_server.start()

Source code in pynumaflow/reducer/async_server.py
def __init__(
    self,
    reducer_instance: ReduceCallable,
    init_args: tuple = (),
    init_kwargs: Optional[dict] = None,
    sock_path=REDUCE_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=REDUCE_SERVER_INFO_FILE_PATH,
):
    init_kwargs = init_kwargs or {}
    self.reducer_handler = get_handler(reducer_instance, init_args, init_kwargs)
    self.sock_path = f"unix://{sock_path}"
    self.max_message_size = max_message_size
    self.max_threads = min(max_threads, MAX_NUM_THREADS)
    self.server_info_file = server_info_file

    self._server_options = [
        ("grpc.max_send_message_length", self.max_message_size),
        ("grpc.max_receive_message_length", self.max_message_size),
    ]
    # Get the servicer instance for the async server
    self.servicer = AsyncReduceServicer(self.reducer_handler)

start

start()

Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context

Source code in pynumaflow/reducer/async_server.py
def start(self):
    """
    Starter function for the Async server class, need a separate caller
    so that all the async coroutines can be started from a single context
    """
    _LOGGER.info(
        "Starting Async Reduce Server",
    )
    aiorun.run(self.aexec(), use_uvloop=True)

aexec async

aexec()

Starts the Async gRPC server on the given UNIX socket with given max threads.

Source code in pynumaflow/reducer/async_server.py
async def aexec(self):
    """
    Starts the Async gRPC server on the given UNIX socket with
    given max threads.
    """
    # As the server is async, we need to create a new server instance in the
    # same thread as the event loop so that all the async calls are made in the
    # same context
    # Create a new async server instance and add the servicer to it
    server = grpc.aio.server(options=self._server_options)
    server.add_insecure_port(self.sock_path)
    reduce_servicer = self.servicer
    reduce_pb2_grpc.add_ReduceServicer_to_server(reduce_servicer, server)

    serv_info = ServerInfo.get_default_server_info()
    serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[ContainerType.Reducer]
    # Start the async server
    await start_async_server(
        server_async=server,
        sock_path=self.sock_path,
        max_threads=self.max_threads,
        cleanup_coroutines=list(),
        server_info_file=self.server_info_file,
        server_info=serv_info,
    )