Skip to content

Mapper

The Mapper module provides classes and functions for implementing Map UDFs that transform messages one at a time. Map is the most common UDF type. It receives one message at a time and can return:

  • One message (1:1 transformation)
  • Multiple messages (fan-out)
  • No messages (filter/drop)

Classes

MapAsyncServer

MapAsyncServer(
    mapper_instance: MapAsyncCallable,
    sock_path=MAP_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=MAP_SERVER_INFO_FILE_PATH,
)

Bases: NumaflowServer

Create a new grpc Map Server instance. Args: mapper_instance: The mapper instance to be used for Map UDF sock_path: The UNIX socket path to be used for the server max_message_size: The max message size in bytes the server can receive and send max_threads: The max number of threads to be spawned; defaults to 4 and max capped at 16

Example invocation:

from pynumaflow.mapper import Messages, Message, Datum, MapAsyncServer

async def async_map_handler(keys: list[str], datum: Datum) -> Messages:
    val = datum.value
    msg = (
        f"payload:{val.decode('utf-8')} "
        f"event_time:{datum.event_time} "
        f"watermark:{datum.watermark}"
    )
    return Messages(Message(value=msg.encode('utf-8'), keys=keys))

if __name__ == "__main__":
    grpc_server = MapAsyncServer(async_map_handler)
    grpc_server.start()

The server instance is returned.

Parameters:

Name Type Description Default
mapper_instance MapAsyncCallable

The mapper instance to be used for Map UDF

required
sock_path

The UNIX socket path to be used for the server

MAP_SOCK_PATH
max_message_size

The max message size in bytes the server can receive and send

MAX_MESSAGE_SIZE
max_threads

The max number of threads to be spawned; defaults to 4 and max capped at 16

NUM_THREADS_DEFAULT
Source code in pynumaflow/mapper/async_server.py
def __init__(
    self,
    mapper_instance: MapAsyncCallable,
    sock_path=MAP_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=MAP_SERVER_INFO_FILE_PATH,
):
    """
    Create a new grpc Asynchronous Map Server instance.
    A new servicer instance is created and attached to the server.
    The server instance is returned.

    Args:
        mapper_instance: The mapper instance to be used for Map UDF
        sock_path: The UNIX socket path to be used for the server
        max_message_size: The max message size in bytes the server can receive and send
        max_threads: The max number of threads to be spawned;
                    defaults to 4 and max capped at 16
    """
    self.sock_path = f"unix://{sock_path}"
    self.max_threads = min(max_threads, MAX_NUM_THREADS)
    self.max_message_size = max_message_size
    self.server_info_file = server_info_file

    self.mapper_instance = mapper_instance

    self._server_options = [
        ("grpc.max_send_message_length", self.max_message_size),
        ("grpc.max_receive_message_length", self.max_message_size),
    ]
    # Get the servicer instance for the async server
    self.servicer = AsyncMapServicer(handler=mapper_instance)

start

start() -> None

Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context

Source code in pynumaflow/mapper/async_server.py
def start(self) -> None:
    """
    Starter function for the Async server class, need a separate caller
    so that all the async coroutines can be started from a single context
    """
    aiorun.run(self.aexec(), use_uvloop=True)

aexec async

aexec() -> None

Starts the Async gRPC server on the given UNIX socket with given max threads.

Source code in pynumaflow/mapper/async_server.py
async def aexec(self) -> None:
    """
    Starts the Async gRPC server on the given UNIX socket with
    given max threads.
    """

    # As the server is async, we need to create a new server instance in the
    # same thread as the event loop so that all the async calls are made in the
    # same context

    server_new = grpc.aio.server(options=self._server_options)
    server_new.add_insecure_port(self.sock_path)
    map_pb2_grpc.add_MapServicer_to_server(self.servicer, server_new)

    serv_info = ServerInfo.get_default_server_info()
    serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[ContainerType.Mapper]
    # Add the MAP_MODE metadata to the server info for the correct map mode
    serv_info.metadata[MAP_MODE_KEY] = MapMode.UnaryMap

    # Start the async server
    await start_async_server(
        server_async=server_new,
        sock_path=self.sock_path,
        max_threads=self.max_threads,
        cleanup_coroutines=list(),
        server_info_file=self.server_info_file,
        server_info=serv_info,
    )

MapMultiprocServer

MapMultiprocServer(
    mapper_instance: MapSyncCallable,
    server_count: int = _PROCESS_COUNT,
    sock_path=MAP_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=MAP_SERVER_INFO_FILE_PATH,
)

Bases: NumaflowServer

Create a new grpc Multiproc Map Server instance.

The server instance is returned.

Parameters:

Name Type Description Default
mapper_instance MapSyncCallable

The mapper instance to be used for Map UDF

required
server_count int

The number of grpc server instances to be forked for multiproc

_PROCESS_COUNT
sock_path

The UNIX socket path to be used for the server

MAP_SOCK_PATH
max_message_size

The max message size in bytes the server can receive and send

MAX_MESSAGE_SIZE
max_threads

The max number of threads to be spawned; defaults to 4 and max capped at 16

NUM_THREADS_DEFAULT

Example invocation:

import math
import os
from pynumaflow.mapper import Messages, Message, Datum, Mapper, MapMultiprocServer

def is_prime(n):
    for i in range(2, int(math.ceil(math.sqrt(n)))):
        if n % i == 0:
            return False
    else:
        return True

class PrimeMap(Mapper):
    def handler(self, keys: list[str], datum: Datum) -> Messages:
        val = datum.value
        _ = datum.event_time
        _ = datum.watermark
        messages = Messages()
        for i in range(2, 100000):
            is_prime(i)
        messages.append(Message(val, keys=keys))
        return messages

if __name__ == "__main__":
    server_count = 2
    prime_class = PrimeMap()
    # Server count is the number of server processes to start
    grpc_server = MapMultiprocServer(prime_class, server_count=server_count)
    grpc_server.start()

Source code in pynumaflow/mapper/multiproc_server.py
def __init__(
    self,
    mapper_instance: MapSyncCallable,
    server_count: int = _PROCESS_COUNT,
    sock_path=MAP_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=MAP_SERVER_INFO_FILE_PATH,
):
    """
    Create a new grpc Multiproc Map Server instance.
    A new servicer instance is created and attached to the server.
    The server instance is returned.

    Args:
        mapper_instance: The mapper instance to be used for Map UDF
        server_count: The number of grpc server instances to be forked for multiproc
        sock_path: The UNIX socket path to be used for the server
        max_message_size: The max message size in bytes the server can receive and send
        max_threads: The max number of threads to be spawned;
                    defaults to 4 and max capped at 16

    Example invocation:
    ```py
    import math
    import os
    from pynumaflow.mapper import Messages, Message, Datum, Mapper, MapMultiprocServer

    def is_prime(n):
        for i in range(2, int(math.ceil(math.sqrt(n)))):
            if n % i == 0:
                return False
        else:
            return True

    class PrimeMap(Mapper):
        def handler(self, keys: list[str], datum: Datum) -> Messages:
            val = datum.value
            _ = datum.event_time
            _ = datum.watermark
            messages = Messages()
            for i in range(2, 100000):
                is_prime(i)
            messages.append(Message(val, keys=keys))
            return messages

    if __name__ == "__main__":
        server_count = 2
        prime_class = PrimeMap()
        # Server count is the number of server processes to start
        grpc_server = MapMultiprocServer(prime_class, server_count=server_count)
        grpc_server.start()
    ```
    """
    self.sock_path = f"unix://{sock_path}"
    self.max_threads = min(max_threads, MAX_NUM_THREADS)
    self.max_message_size = max_message_size
    self.server_info_file = server_info_file

    self.mapper_instance = mapper_instance

    self._server_options = [
        ("grpc.max_send_message_length", self.max_message_size),
        ("grpc.max_receive_message_length", self.max_message_size),
        ("grpc.so_reuseport", 1),
        ("grpc.so_reuseaddr", 1),
    ]
    # Set the number of processes to be spawned to the number of CPUs or
    # the value of the parameter server_count defined by the user
    # Setting the max value to 2 * CPU count
    # Used for multiproc server
    self._process_count = min(server_count, 2 * _PROCESS_COUNT)
    self.servicer = SyncMapServicer(handler=mapper_instance, multiproc=True)

start

start() -> None

Starts the N grpc servers gRPC serves on the with given max threads. Here N = The number of CPUs or the value of the parameter server_count defined by the user. The max value is capped to 2 * CPU count.

Source code in pynumaflow/mapper/multiproc_server.py
def start(self) -> None:
    """
    Starts the N grpc servers gRPC serves on the with given max threads.
    Here N = The number of CPUs or the value of the parameter server_count
    defined by the user. The max value is capped to 2 * CPU count.
    """

    # Create the server info file
    server_info = ServerInfo.get_default_server_info()
    server_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[ContainerType.Mapper]
    server_info.metadata = get_metadata_env(envs=METADATA_ENVS)
    # Add the MULTIPROC metadata using the number of servers to use
    server_info.metadata[MULTIPROC_KEY] = str(self._process_count)
    # Add the MAP_MODE metadata to the server info for the correct map mode
    server_info.metadata[MAP_MODE_KEY] = MapMode.UnaryMap

    # Start the multiproc server
    start_multiproc_server(
        max_threads=self.max_threads,
        servicer=self.servicer,
        process_count=self._process_count,
        server_info_file=self.server_info_file,
        server_options=self._server_options,
        udf_type=UDFType.Map,
        server_info=server_info,
    )

MapServer

MapServer(
    mapper_instance: MapSyncCallable,
    sock_path=MAP_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=MAP_SERVER_INFO_FILE_PATH,
)

Bases: NumaflowServer

Create a new grpc Map Server instance.

Parameters:

Name Type Description Default
mapper_instance MapSyncCallable

The mapper instance to be used for Map UDF

required
sock_path

The UNIX socket path to be used for the server

MAP_SOCK_PATH
max_message_size

The max message size in bytes the server can receive and send

MAX_MESSAGE_SIZE
max_threads

The max number of threads to be spawned; defaults to 4 and max capped at 16

NUM_THREADS_DEFAULT

Example Invocation:

from pynumaflow.mapper import Messages, Message, Datum, MapServer, Mapper

class MessageForwarder(Mapper):
    def handler(self, keys: list[str], datum: Datum) -> Messages:
        val = datum.value
        _ = datum.event_time
        _ = datum.watermark
        return Messages(Message(value=val, keys=keys))

def my_handler(keys: list[str], datum: Datum) -> Messages:
    val = datum.value
    _ = datum.event_time
    _ = datum.watermark
    return Messages(Message(value=val, keys=keys))


if __name__ == "__main__":
    # Use the class based approach or function based handler based on the env variable
    # Both can be used and passed directly to the server class

    invoke = os.getenv("INVOKE", "func_handler")
    if invoke == "class":
        handler = MessageForwarder()
    else:
        handler = my_handler
    grpc_server = MapServer(handler)
    grpc_server.start()

Source code in pynumaflow/mapper/sync_server.py
def __init__(
    self,
    mapper_instance: MapSyncCallable,
    sock_path=MAP_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=MAP_SERVER_INFO_FILE_PATH,
):
    self.sock_path = f"unix://{sock_path}"
    self.max_threads = min(max_threads, MAX_NUM_THREADS)
    self.max_message_size = max_message_size
    self.server_info_file = server_info_file

    self.mapper_instance = mapper_instance

    self._server_options = [
        ("grpc.max_send_message_length", self.max_message_size),
        ("grpc.max_receive_message_length", self.max_message_size),
    ]
    # Get the servicer instance for the sync server
    self.servicer = SyncMapServicer(handler=mapper_instance)

start

start() -> None

Starts the Synchronous gRPC server on the given UNIX socket with given max threads.

Source code in pynumaflow/mapper/sync_server.py
def start(self) -> None:
    """
    Starts the Synchronous gRPC server on the given UNIX socket with given max threads.
    """
    _LOGGER.info(
        "Sync GRPC Server listening on: %s with max threads: %s",
        self.sock_path,
        self.max_threads,
    )

    serv_info = ServerInfo.get_default_server_info()
    serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[ContainerType.Mapper]
    # Add the MAP_MODE metadata to the server info for the correct map mode
    serv_info.metadata[MAP_MODE_KEY] = MapMode.UnaryMap
    # Start the server
    sync_server_start(
        servicer=self.servicer,
        bind_address=self.sock_path,
        max_threads=self.max_threads,
        server_info_file=self.server_info_file,
        server_options=self._server_options,
        udf_type=UDFType.Map,
        server_info=serv_info,
    )

Message dataclass

Message(
    value: bytes,
    keys: Optional[list[str]] = None,
    tags: Optional[list[str]] = None,
    user_metadata: Optional[UserMetadata] = None,
)

Basic datatype for data passing to the next vertex/vertices.

Parameters:

Name Type Description Default
value bytes

data in bytes

required
keys Optional[list[str]]

list of keys for the vertex (optional)

None
tags Optional[list[str]]

list of tags for conditional forwarding (optional)

None
user_metadata Optional[UserMetadata]

metadata for the message (optional)

None
Source code in pynumaflow/mapper/_dtypes.py
def __init__(
    self,
    value: bytes,
    keys: Optional[list[str]] = None,
    tags: Optional[list[str]] = None,
    user_metadata: Optional[UserMetadata] = None,
):
    """
    Creates a Message object to send value to a vertex.
    """
    self._keys = keys or []
    self._tags = tags or []
    self._value = value or b""
    self._user_metadata = user_metadata or UserMetadata()

Messages

Messages(*messages: M)

Bases: Sequence[M]

Class to define a list of Message objects.

Parameters:

Name Type Description Default
messages M

list of Message objects.

()
Source code in pynumaflow/mapper/_dtypes.py
def __init__(self, *messages: M):
    self._messages = list(messages) or []

Datum dataclass

Datum(
    keys: list[str],
    value: bytes,
    event_time: datetime,
    watermark: datetime,
    headers: Optional[dict[str, str]] = None,
    user_metadata: Optional[UserMetadata] = None,
    system_metadata: Optional[SystemMetadata] = None,
)

Class to define the important information for the event.

Parameters:

Name Type Description Default
keys list[str]

the keys of the event.

required
value bytes

the payload of the event.

required
event_time datetime

the event time of the event.

required
watermark datetime

the watermark of the event.

required
headers Optional[dict[str, str]]

the headers of the event.

None

Example usage

from pynumaflow.mapper import Datum
from datetime import datetime, timezone

d = Datum(
      keys=["test_key"],
      value=b'test_mock_message'
      event_time=datetime.fromtimestamp(1662998400, timezone.utc),
      watermark=datetime.fromtimestamp(1662998460, timezone.utc),
      headers={"key1": "value1", "key2": "value2"},
   )

Source code in pynumaflow/mapper/_dtypes.py
def __init__(
    self,
    keys: list[str],
    value: bytes,
    event_time: datetime,
    watermark: datetime,
    headers: Optional[dict[str, str]] = None,
    user_metadata: Optional[UserMetadata] = None,
    system_metadata: Optional[SystemMetadata] = None,
):
    self._keys = keys or list()
    self._value = value or b""
    if not isinstance(event_time, datetime):
        raise TypeError(f"Wrong data type: {type(event_time)} for Datum.event_time")
    self._event_time = event_time
    if not isinstance(watermark, datetime):
        raise TypeError(f"Wrong data type: {type(watermark)} for Datum.watermark")
    self._watermark = watermark
    self._headers = headers or {}
    self._user_metadata = user_metadata or UserMetadata()
    self._system_metadata = system_metadata or SystemMetadata()

keys property

keys: list[str]

Returns the keys of the event

value property

value: bytes

Returns the value of the event.

event_time property

event_time: datetime

Returns the event time of the event.

watermark property

watermark: datetime

Returns the watermark of the event.

headers property

headers: dict[str, str]

Returns the headers of the event.

user_metadata property

user_metadata: UserMetadata

Returns the user metadata of the event.

system_metadata property

system_metadata: SystemMetadata

Returns the system metadata of the event.

Mapper

Provides an interface to write a SyncMapServicer which will be exposed over a Synchronous gRPC server.

handler abstractmethod

handler(keys: list[str], datum: Datum) -> Messages

Implement this handler function which implements the MapSyncCallable interface.

Source code in pynumaflow/mapper/_dtypes.py
@abstractmethod
def handler(self, keys: list[str], datum: Datum) -> Messages:
    """
    Implement this handler function which implements the MapSyncCallable interface.
    """
    pass

UserMetadata dataclass

UserMetadata(_data: dict[str, dict[str, bytes]] = dict())

UserMetadata wraps the user-generated metadata groups per message. It is read-write to UDFs.

groups

groups() -> list[str]

Returns the list of group names for the user metadata.

Source code in pynumaflow/_metadata.py
def groups(self) -> list[str]:
    """
    Returns the list of group names for the user metadata.
    """
    return list(self._data.keys())

keys

keys(group: str) -> list[str]

Returns the list of keys for a given group.

Source code in pynumaflow/_metadata.py
def keys(self, group: str) -> list[str]:
    """
    Returns the list of keys for a given group.
    """
    keys = self._data.get(group) or {}
    return list(keys.keys())

value

value(group: str, key: str) -> Optional[bytes]

Returns the value for a given group and key. If the group or key does not exist, returns None.

Source code in pynumaflow/_metadata.py
def value(self, group: str, key: str) -> Optional[bytes]:
    """
    Returns the value for a given group and key.
    If the group or key does not exist, returns None.
    """
    value = self._data.get(group)
    if value is None:
        return None
    return value.get(key)

add_key

add_key(group: str, key: str, value: bytes)

Adds the value for a given group and key.

Source code in pynumaflow/_metadata.py
def add_key(self, group: str, key: str, value: bytes):
    """
    Adds the value for a given group and key.
    """
    self._data.setdefault(group, {})[key] = value

remove_key

remove_key(group: str, key: str) -> Optional[bytes]

Removes the key and its value for a given group and returns the value. If this key is the only key in the group, the group will be removed. Returns None if the group or key does not exist.

Source code in pynumaflow/_metadata.py
def remove_key(self, group: str, key: str) -> Optional[bytes]:
    """
    Removes the key and its value for a given group and returns the value.
    If this key is the only key in the group, the group will be removed.
    Returns None if the group or key does not exist.
    """
    group_data = self._data.pop(group, None)
    if group_data is None:
        return None
    value = group_data.pop(key, None)
    if group_data:
        self._data[group] = group_data
    return value

remove_group

remove_group(group: str) -> Optional[dict[str, bytes]]

Removes the group and all its keys and values and returns the data. Returns None if the group does not exist.

Source code in pynumaflow/_metadata.py
def remove_group(self, group: str) -> Optional[dict[str, bytes]]:
    """
    Removes the group and all its keys and values and returns the data.
    Returns None if the group does not exist.
    """
    return self._data.pop(group, None)

clear

clear()

Clears all the groups and all their keys and values.

Source code in pynumaflow/_metadata.py
def clear(self):
    """
    Clears all the groups and all their keys and values.
    """
    self._data.clear()

SystemMetadata dataclass

SystemMetadata(_data: dict[str, dict[str, bytes]] = dict())

System metadata is the mapping of group name to key-value pairs for a given group. System metadata wraps the system-generated metadata groups per message. It is read-only to UDFs.

groups

groups() -> list[str]

Returns the list of group names for the system metadata.

Source code in pynumaflow/_metadata.py
def groups(self) -> list[str]:
    """
    Returns the list of group names for the system metadata.
    """
    return list(self._data.keys())

keys

keys(group: str) -> list[str]

Returns the list of keys for a given group.

Source code in pynumaflow/_metadata.py
def keys(self, group: str) -> list[str]:
    """
    Returns the list of keys for a given group.
    """
    return list(self._data.get(group, {}).keys())

value

value(group: str, key: str) -> Optional[bytes]

Returns the value for a given group and key.

Source code in pynumaflow/_metadata.py
def value(self, group: str, key: str) -> Optional[bytes]:
    """
    Returns the value for a given group and key.
    """
    return self._data.get(group, {}).get(key)