Reducer¶
The Reducer module provides classes and functions for implementing Reduce UDFs that aggregate messages by key within time windows. It's used for operations like counting, summing, or computing statistics over groups of messages.
Classes¶
Message
dataclass
¶
Basic datatype for data passing to the next vertex/vertices.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
bytes
|
data in bytes |
required |
keys
|
list[str]
|
[]string keys for vertex (optional) |
None
|
tags
|
list[str]
|
[]string tags for conditional forwarding (optional) |
None
|
Source code in pynumaflow/reducer/_dtypes.py
Messages
¶
Datum
dataclass
¶
Datum(
keys: list[str],
value: bytes,
event_time: datetime,
watermark: datetime,
headers: Optional[dict[str, str]] = None,
)
Class to define the important information for the event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keys
|
list[str]
|
the keys of the event. |
required |
value
|
bytes
|
the payload of the event. |
required |
event_time
|
datetime
|
the event time of the event. |
required |
watermark
|
datetime
|
the watermark of the event. |
required |
headers
|
Optional[dict[str, str]]
|
the headers of the event. |
None
|
Example usage
from pynumaflow.reducer import Datum
from datetime import datetime, timezone
d = Datum(
keys=["test_key"],
value=b"test_mock_message",
event_time=datetime.fromtimestamp(1662998400, timezone.utc),
watermark=datetime.fromtimestamp(1662998460, timezone.utc),
headers={"key1": "value1", "key2": "value2"},
)
Source code in pynumaflow/reducer/_dtypes.py
IntervalWindow
dataclass
¶
Defines the start and end of the interval window for the event.
Source code in pynumaflow/reducer/_dtypes.py
Metadata
dataclass
¶
Metadata(interval_window: IntervalWindow)
Reducer
¶
Provides an interface to write a Reducer which will be exposed over a gRPC server.
handler
abstractmethod
async
¶
Implement this handler function which implements the ReduceCallable interface.
ReduceAsyncServer
¶
ReduceAsyncServer(
reducer_instance: ReduceCallable,
init_args: tuple = (),
init_kwargs: Optional[dict] = None,
sock_path=REDUCE_SOCK_PATH,
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=REDUCE_SERVER_INFO_FILE_PATH,
)
Bases: NumaflowServer
Class for a new Reduce Server instance. A new servicer instance is created and attached to the server. The server instance is returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
reducer_instance
|
ReduceCallable
|
The reducer instance to be used for Reduce UDF |
required |
sock_path
|
The UNIX socket path to be used for the server |
REDUCE_SOCK_PATH
|
|
max_message_size
|
The max message size in bytes the server can receive and send |
MAX_MESSAGE_SIZE
|
|
max_threads
|
The max number of threads to be spawned; defaults to 4 and max capped at 16 |
NUM_THREADS_DEFAULT
|
Example invocation:
import os
from collections.abc import AsyncIterable
from pynumaflow.reducer import Messages, Message, Datum, Metadata,
ReduceAsyncServer, Reducer
class ReduceCounter(Reducer):
def __init__(self, counter):
self.counter = counter
async def handler(
self, keys: list[str], datums: AsyncIterable[Datum], md: Metadata
) -> Messages:
interval_window = md.interval_window
self.counter = 0
async for _ in datums:
self.counter += 1
msg = (
f"counter:{self.counter} interval_window_start:{interval_window.start} "
f"interval_window_end:{interval_window.end}"
)
return Messages(Message(str.encode(msg), keys=keys))
async def reduce_handler(
keys: list[str], datums: AsyncIterable[Datum], md: Metadata
) -> Messages:
interval_window = md.interval_window
counter = 0
async for _ in datums:
counter += 1
msg = (
f"counter:{counter} interval_window_start:{interval_window.start} "
f"interval_window_end:{interval_window.end}"
)
return Messages(Message(str.encode(msg), keys=keys))
if __name__ == "__main__":
invoke = os.getenv("INVOKE", "func_handler")
if invoke == "class":
# Here we are using the class instance as the reducer_instance
# which will be used to invoke the handler function.
# We are passing the init_args for the class instance.
grpc_server = ReduceAsyncServer(ReduceCounter, init_args=(0,))
else:
# Here we are using the handler function directly as the reducer_instance.
grpc_server = ReduceAsyncServer(reduce_handler)
grpc_server.start()
Source code in pynumaflow/reducer/async_server.py
start
¶
Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context
Source code in pynumaflow/reducer/async_server.py
aexec
async
¶
Starts the Async gRPC server on the given UNIX socket with given max threads.