Reduce Streamer¶
The Reduce Streamer module provides classes and functions for implementing ReduceStream UDFs that emit results incrementally during reduction. Unlike regular Reduce which outputs only when the window closes, Reduce Stream emits results as they're computed. This is useful for early alerts or real-time dashboards.
Classes¶
Message
dataclass
¶
Basic datatype for data passing to the next vertex/vertices.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
bytes
|
data in bytes |
required |
keys
|
list[str]
|
[]string keys for vertex (optional) |
None
|
tags
|
list[str]
|
[]string tags for conditional forwarding (optional) |
None
|
Source code in pynumaflow/reducestreamer/_dtypes.py
Datum
dataclass
¶
Datum(
keys: list[str],
value: bytes,
event_time: datetime,
watermark: datetime,
headers: Optional[dict[str, str]] = None,
)
Class to define the important information for the event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keys
|
list[str]
|
the keys of the event. |
required |
value
|
bytes
|
the payload of the event. |
required |
event_time
|
datetime
|
the event time of the event. |
required |
watermark
|
datetime
|
the watermark of the event. |
required |
Example usage
from pynumaflow.reducer import Datum
from datetime import datetime, timezone
d = Datum(
keys=["test_key"],
value="test_mock_message".encode(),
event_time=datetime.fromtimestamp(1662998400, timezone.utc),
watermark=datetime.fromtimestamp(1662998460, timezone.utc),
headers={"key1": "value1", "key2": "value2"}
)
Source code in pynumaflow/reducestreamer/_dtypes.py
IntervalWindow
dataclass
¶
Defines the start and end of the interval window for the event.
Source code in pynumaflow/reducestreamer/_dtypes.py
Metadata
dataclass
¶
Metadata(interval_window: IntervalWindow)
ReduceStreamer
¶
Provides an interface to write a ReduceStreamer which will be exposed over a gRPC server.
ReduceWindow
dataclass
¶
Defines the window for a reduce operation which includes the interval window along with the slot.
Source code in pynumaflow/reducestreamer/_dtypes.py
ReduceStreamAsyncServer
¶
ReduceStreamAsyncServer(
reduce_stream_instance: ReduceStreamCallable,
init_args: tuple = (),
init_kwargs: Optional[dict] = None,
sock_path=REDUCE_STREAM_SOCK_PATH,
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=REDUCE_STREAM_SERVER_INFO_FILE_PATH,
)
Bases: NumaflowServer
Class for a new Reduce Stream Server instance. A new servicer instance is created and attached to the server. The server instance is returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
reduce_stream_instance
|
ReduceStreamCallable
|
The reducer instance to be used for Reduce Streaming UDF |
required |
init_args
|
tuple
|
The arguments to be passed to the reduce_stream_handler |
()
|
init_kwargs
|
Optional[dict]
|
The keyword arguments to be passed to the reduce_stream_handler |
None
|
sock_path
|
The UNIX socket path to be used for the server |
REDUCE_STREAM_SOCK_PATH
|
|
max_message_size
|
The max message size in bytes the server can receive and send |
MAX_MESSAGE_SIZE
|
|
max_threads
|
The max number of threads to be spawned; |
NUM_THREADS_DEFAULT
|
|
server_info_file
|
The path to the server info file |
REDUCE_STREAM_SERVER_INFO_FILE_PATH
|
Example invocation:
import os
from collections.abc import AsyncIterable
from pynumaflow.reducestreamer import Messages, Message, Datum, Metadata,
ReduceStreamAsyncServer, ReduceStreamer
class ReduceCounter(ReduceStreamer):
def __init__(self, counter):
self.counter = counter
async def handler(
self,
keys: list[str],
datums: AsyncIterable[Datum],
output: NonBlockingIterator,
md: Metadata,
):
async for _ in datums:
self.counter += 1
if self.counter > 20:
msg = f"counter:{self.counter}"
await output.put(Message(str.encode(msg), keys=keys))
self.counter = 0
msg = f"counter:{self.counter}"
await output.put(Message(str.encode(msg), keys=keys))
async def reduce_handler(
keys: list[str],
datums: AsyncIterable[Datum],
output: NonBlockingIterator,
md: Metadata,
):
counter = 0
async for _ in datums:
counter += 1
if counter > 20:
msg = f"counter:{counter}"
await output.put(Message(str.encode(msg), keys=keys))
counter = 0
msg = f"counter:{counter}"
await output.put(Message(str.encode(msg), keys=keys))
if __name__ == "__main__":
invoke = os.getenv("INVOKE", "func_handler")
if invoke == "class":
# Here we are using the class instance as the reducer_instance
# which will be used to invoke the handler function.
# We are passing the init_args for the class instance.
grpc_server = ReduceStreamAsyncServer(ReduceCounter, init_args=(0,))
else:
# Here we are using the handler function directly as the reducer_instance.
grpc_server = ReduceStreamAsyncServer(reduce_handler)
grpc_server.start()
Source code in pynumaflow/reducestreamer/async_server.py
start
¶
Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context
Source code in pynumaflow/reducestreamer/async_server.py
aexec
async
¶
Starts the Async gRPC server on the given UNIX socket with given max threads.