Map Streamer¶
The Map Streamer module provides classes and functions for implementing MapStream UDFs that stream results as they're produced. Unlike regular Map which returns all messages at once, Map Stream yields messages one at a time as they're ready, reducing latency for downstream consumers.
Classes¶
Message
dataclass
¶
Basic datatype for data passing to the next vertex/vertices.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
bytes
|
data in bytes |
required |
keys
|
Optional[list[str]]
|
list of keys for vertex (optional) |
None
|
tags
|
Optional[list[str]]
|
list of tags for conditional forwarding (optional) |
None
|
Source code in pynumaflow/mapstreamer/_dtypes.py
Messages
¶
Datum
dataclass
¶
Datum(
keys: list[str],
value: bytes,
event_time: datetime,
watermark: datetime,
headers: Optional[dict[str, str]] = None,
)
Class to define the important information for the event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keys
|
list[str]
|
the keys of the event. |
required |
value
|
bytes
|
the payload of the event. |
required |
event_time
|
datetime
|
the event time of the event. |
required |
watermark
|
datetime
|
the watermark of the event. |
required |
headers
|
Optional[dict[str, str]]
|
the headers of the event. |
None
|
Example:
from pynumaflow.mapstreamer import Datum
from datetime import datetime, timezone
d = Datum(
keys=["test_key"],
value=b"test_mock_message",
event_time=datetime.fromtimestamp(1662998400, timezone.utc),
watermark=datetime.fromtimestamp(1662998460, timezone.utc),
headers={"key1": "value1", "key2": "value2"},
)
Source code in pynumaflow/mapstreamer/_dtypes.py
MapStreamer
¶
MapStreamAsyncServer
¶
MapStreamAsyncServer(
map_stream_instance: MapStreamCallable,
sock_path=MAP_STREAM_SOCK_PATH,
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=MAP_SERVER_INFO_FILE_PATH,
)
Bases: NumaflowServer
Class for a new Map Stream Server instance.
The server instance is returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
map_stream_instance
|
MapStreamCallable
|
The map stream instance to be used for Map Stream UDF |
required |
sock_path
|
The UNIX socket path to be used for the server |
MAP_STREAM_SOCK_PATH
|
|
max_message_size
|
The max message size in bytes the server can receive and send |
MAX_MESSAGE_SIZE
|
|
max_threads
|
The max number of threads to be spawned; defaults to 4 and max capped at 16 |
NUM_THREADS_DEFAULT
|
|
server_type
|
The type of server to be used |
required |
Example invocation:
import os
from collections.abc import AsyncIterable
from pynumaflow.mapstreamer import Message, Datum, MapStreamAsyncServer, MapStreamer
class FlatMapStream(MapStreamer):
async def handler(self, keys: list[str], datum: Datum) -> AsyncIterable[Message]:
val = datum.value
_ = datum.event_time
_ = datum.watermark
strs = val.decode("utf-8").split(",")
if len(strs) == 0:
yield Message.to_drop()
return
for s in strs:
yield Message(str.encode(s))
async def map_stream_handler(_: list[str], datum: Datum) -> AsyncIterable[Message]:
val = datum.value
_ = datum.event_time
_ = datum.watermark
strs = val.decode("utf-8").split(",")
if len(strs) == 0:
yield Message.to_drop()
return
for s in strs:
yield Message(str.encode(s))
if __name__ == "__main__":
invoke = os.getenv("INVOKE", "func_handler")
if invoke == "class":
handler = FlatMapStream()
else:
handler = map_stream_handler
grpc_server = MapStreamAsyncServer(handler)
grpc_server.start()
Source code in pynumaflow/mapstreamer/async_server.py
start
¶
Starter function for the Async Map Stream server, we need a separate caller to the aexec so that all the async coroutines can be started from a single context
Source code in pynumaflow/mapstreamer/async_server.py
aexec
async
¶
Starts the Async gRPC server on the given UNIX socket with given max threads.