Skip to content

Map Streamer

The Map Streamer module provides classes and functions for implementing MapStream UDFs that stream results as they're produced. Unlike regular Map which returns all messages at once, Map Stream yields messages one at a time as they're ready, reducing latency for downstream consumers.

Classes

Message dataclass

Message(
    value: bytes,
    keys: Optional[list[str]] = None,
    tags: Optional[list[str]] = None,
)

Basic datatype for data passing to the next vertex/vertices.

Parameters:

Name Type Description Default
value bytes

data in bytes

required
keys Optional[list[str]]

list of keys for vertex (optional)

None
tags Optional[list[str]]

list of tags for conditional forwarding (optional)

None
Source code in pynumaflow/mapstreamer/_dtypes.py
def __init__(
    self, value: bytes, keys: Optional[list[str]] = None, tags: Optional[list[str]] = None
):
    """
    Creates a Message object to send value to a vertex.
    """
    self._keys = keys or []
    self._tags = tags or []
    self._value = value or b""

Messages

Messages(*messages: M)

Bases: Sequence[M]

Class to define a list of Message objects.

Parameters:

Name Type Description Default
messages M

list of Message objects.

()
Source code in pynumaflow/mapstreamer/_dtypes.py
def __init__(self, *messages: M):
    self._messages = list(messages) or []

Datum dataclass

Datum(
    keys: list[str],
    value: bytes,
    event_time: datetime,
    watermark: datetime,
    headers: Optional[dict[str, str]] = None,
)

Class to define the important information for the event.

Parameters:

Name Type Description Default
keys list[str]

the keys of the event.

required
value bytes

the payload of the event.

required
event_time datetime

the event time of the event.

required
watermark datetime

the watermark of the event.

required
headers Optional[dict[str, str]]

the headers of the event.

None

Example:

from pynumaflow.mapstreamer import Datum
from datetime import datetime, timezone

d = Datum(
      keys=["test_key"],
      value=b"test_mock_message",
      event_time=datetime.fromtimestamp(1662998400, timezone.utc),
      watermark=datetime.fromtimestamp(1662998460, timezone.utc),
      headers={"key1": "value1", "key2": "value2"},
   )

Source code in pynumaflow/mapstreamer/_dtypes.py
def __init__(
    self,
    keys: list[str],
    value: bytes,
    event_time: datetime,
    watermark: datetime,
    headers: Optional[dict[str, str]] = None,
):
    self._keys = keys or list()
    self._value = value or b""
    if not isinstance(event_time, datetime):
        raise TypeError(f"Wrong data type: {type(event_time)} for Datum.event_time")
    self._event_time = event_time
    if not isinstance(watermark, datetime):
        raise TypeError(f"Wrong data type: {type(watermark)} for Datum.watermark")
    self._watermark = watermark
    self._headers = headers or {}

keys property

keys: list[str]

Returns the keys of the event

value property

value: bytes

Returns the value of the event.

event_time property

event_time: datetime

Returns the event time of the event.

watermark property

watermark: datetime

Returns the watermark of the event.

headers property

headers: dict[str, str]

Returns the headers of the event.

MapStreamer

Provides an interface to write a Map Streamer which will be exposed over a gRPC server.

Args:

handler abstractmethod async

handler(
    keys: list[str], datum: Datum
) -> AsyncIterable[Message]

Implement this handler function which implements the MapSyncCallable interface.

Source code in pynumaflow/mapstreamer/_dtypes.py
@abstractmethod
async def handler(self, keys: list[str], datum: Datum) -> AsyncIterable[Message]:
    """
    Implement this handler function which implements the MapSyncCallable interface.
    """
    pass

MapStreamAsyncServer

MapStreamAsyncServer(
    map_stream_instance: MapStreamCallable,
    sock_path=MAP_STREAM_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=MAP_SERVER_INFO_FILE_PATH,
)

Bases: NumaflowServer

Class for a new Map Stream Server instance.

The server instance is returned.

Parameters:

Name Type Description Default
map_stream_instance MapStreamCallable

The map stream instance to be used for Map Stream UDF

required
sock_path

The UNIX socket path to be used for the server

MAP_STREAM_SOCK_PATH
max_message_size

The max message size in bytes the server can receive and send

MAX_MESSAGE_SIZE
max_threads

The max number of threads to be spawned; defaults to 4 and max capped at 16

NUM_THREADS_DEFAULT
server_type

The type of server to be used

required

Example invocation:

import os
from collections.abc import AsyncIterable
from pynumaflow.mapstreamer import Message, Datum, MapStreamAsyncServer, MapStreamer

class FlatMapStream(MapStreamer):
    async def handler(self, keys: list[str], datum: Datum) -> AsyncIterable[Message]:
        val = datum.value
        _ = datum.event_time
        _ = datum.watermark
        strs = val.decode("utf-8").split(",")

        if len(strs) == 0:
            yield Message.to_drop()
            return
        for s in strs:
            yield Message(str.encode(s))

async def map_stream_handler(_: list[str], datum: Datum) -> AsyncIterable[Message]:

    val = datum.value
    _ = datum.event_time
    _ = datum.watermark
    strs = val.decode("utf-8").split(",")

    if len(strs) == 0:
        yield Message.to_drop()
        return
    for s in strs:
        yield Message(str.encode(s))

if __name__ == "__main__":
    invoke = os.getenv("INVOKE", "func_handler")
    if invoke == "class":
        handler = FlatMapStream()
    else:
        handler = map_stream_handler
    grpc_server = MapStreamAsyncServer(handler)
    grpc_server.start()

Source code in pynumaflow/mapstreamer/async_server.py
def __init__(
    self,
    map_stream_instance: MapStreamCallable,
    sock_path=MAP_STREAM_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=MAP_SERVER_INFO_FILE_PATH,
):
    """
    Create a new grpc Async Map Stream Server instance.
    A new servicer instance is created and attached to the server.
    The server instance is returned.

    Args:
        map_stream_instance: The map stream instance to be used for Map Stream UDF
        sock_path: The UNIX socket path to be used for the server
        max_message_size: The max message size in bytes the server can receive and send
        max_threads: The max number of threads to be spawned;
                        defaults to 4 and max capped at 16
        server_type: The type of server to be used

    Example invocation:
    ```py
    import os
    from collections.abc import AsyncIterable
    from pynumaflow.mapstreamer import Message, Datum, MapStreamAsyncServer, MapStreamer

    class FlatMapStream(MapStreamer):
        async def handler(self, keys: list[str], datum: Datum) -> AsyncIterable[Message]:
            val = datum.value
            _ = datum.event_time
            _ = datum.watermark
            strs = val.decode("utf-8").split(",")

            if len(strs) == 0:
                yield Message.to_drop()
                return
            for s in strs:
                yield Message(str.encode(s))

    async def map_stream_handler(_: list[str], datum: Datum) -> AsyncIterable[Message]:

        val = datum.value
        _ = datum.event_time
        _ = datum.watermark
        strs = val.decode("utf-8").split(",")

        if len(strs) == 0:
            yield Message.to_drop()
            return
        for s in strs:
            yield Message(str.encode(s))

    if __name__ == "__main__":
        invoke = os.getenv("INVOKE", "func_handler")
        if invoke == "class":
            handler = FlatMapStream()
        else:
            handler = map_stream_handler
        grpc_server = MapStreamAsyncServer(handler)
        grpc_server.start()
    ```
    """
    self.map_stream_instance: MapStreamCallable = map_stream_instance
    self.sock_path = f"unix://{sock_path}"
    self.max_threads = min(max_threads, MAX_NUM_THREADS)
    self.max_message_size = max_message_size
    self.server_info_file = server_info_file

    self._server_options = [
        ("grpc.max_send_message_length", self.max_message_size),
        ("grpc.max_receive_message_length", self.max_message_size),
    ]

    self.servicer = AsyncMapStreamServicer(handler=self.map_stream_instance)

start

start()

Starter function for the Async Map Stream server, we need a separate caller to the aexec so that all the async coroutines can be started from a single context

Source code in pynumaflow/mapstreamer/async_server.py
def start(self):
    """
    Starter function for the Async Map Stream server, we need a separate caller
    to the aexec so that all the async coroutines can be started from a single context
    """
    aiorun.run(self.aexec(), use_uvloop=True)

aexec async

aexec()

Starts the Async gRPC server on the given UNIX socket with given max threads.

Source code in pynumaflow/mapstreamer/async_server.py
async def aexec(self):
    """
    Starts the Async gRPC server on the given UNIX socket with
    given max threads.
    """
    # As the server is async, we need to create a new server instance in the
    # same thread as the event loop so that all the async calls are made in the
    # same context
    # Create a new async server instance and add the servicer to it
    server = grpc.aio.server(options=self._server_options)
    server.add_insecure_port(self.sock_path)
    map_pb2_grpc.add_MapServicer_to_server(
        self.servicer,
        server,
    )
    _LOGGER.info("Starting Map Stream Server")
    serv_info = ServerInfo.get_default_server_info()
    serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[ContainerType.Mapper]
    # Add the MAP_MODE metadata to the server info for the correct map mode
    serv_info.metadata[MAP_MODE_KEY] = MapMode.StreamMap

    # Start the async server
    await start_async_server(
        server_async=server,
        sock_path=self.sock_path,
        max_threads=self.max_threads,
        cleanup_coroutines=list(),
        server_info_file=self.server_info_file,
        server_info=serv_info,
    )