Skip to content

Batch Mapper

The Batch Mapper module offers tools for building BatchMap UDFs, allowing you to process multiple messages simultaneously. This enables more efficient handling of workloads such as bulk API requests or batch database operations by grouping messages and processing them together in a single operation.

Classes

Message dataclass

Message(
    value: bytes,
    keys: Optional[list[str]] = None,
    tags: Optional[list[str]] = None,
)

Basic datatype for data passing to the next vertex/vertices.

Parameters:

Name Type Description Default
value bytes

data in bytes

required
keys Optional[list[str]]

list of keys for vertex (optional)

None
tags Optional[list[str]]

list of tags for conditional forwarding (optional)

None
Source code in pynumaflow/batchmapper/_dtypes.py
def __init__(
    self, value: bytes, keys: Optional[list[str]] = None, tags: Optional[list[str]] = None
):
    """
    Creates a Message object to send value to a vertex.
    """
    self._keys = keys or []
    self._tags = tags or []
    self._value = value or b""

Datum dataclass

Datum(
    id: str,
    keys: list[str],
    value: bytes,
    event_time: datetime,
    watermark: datetime,
    headers: Optional[dict[str, str]] = None,
)

Class to define the important information for the event.

Parameters:

Name Type Description Default
keys list[str]

the keys of the event.

required
value bytes

the payload of the event.

required
event_time datetime

the event time of the event.

required
watermark datetime

the watermark of the event.

required
headers Optional[dict[str, str]]

the headers of the event.

None
id str

the unique ID for this request

required
Source code in pynumaflow/batchmapper/_dtypes.py
def __init__(
    self,
    id: str,
    keys: list[str],
    value: bytes,
    event_time: datetime,
    watermark: datetime,
    headers: Optional[dict[str, str]] = None,
):
    self._id = id
    self._keys = keys or list()
    self._value = value or b""
    if not isinstance(event_time, datetime):
        raise TypeError(f"Wrong data type: {type(event_time)} for Datum.event_time")
    self._event_time = event_time
    if not isinstance(watermark, datetime):
        raise TypeError(f"Wrong data type: {type(watermark)} for Datum.watermark")
    self._watermark = watermark
    self._headers = headers or {}

keys property

keys: list[str]

Returns the keys of the event

value property

value: bytes

Returns the value of the event.

event_time property

event_time: datetime

Returns the event time of the event.

watermark property

watermark: datetime

Returns the watermark of the event.

headers property

headers: dict[str, str]

Returns the headers of the event.

id property

id: str

Returns the id of the event.

BatchMapper

Provides an interface to write a Batch Mapper which will be exposed over a gRPC server.

Args:

handler abstractmethod async

handler(datums: AsyncIterable[Datum]) -> BatchResponses

Implement this handler function which implements the BatchMapAsyncCallable interface.

Source code in pynumaflow/batchmapper/_dtypes.py
@abstractmethod
async def handler(self, datums: AsyncIterable[Datum]) -> BatchResponses:
    """
    Implement this handler function which implements the BatchMapAsyncCallable interface.
    """
    pass

BatchResponses

BatchResponses(*responses: B)

Bases: Sequence[B]

Class to define a list of Batch Response objects.

Parameters:

Name Type Description Default
responses B

list of Batch Response objects.

()
Source code in pynumaflow/batchmapper/_dtypes.py
def __init__(self, *responses: B):
    self._responses = list(responses) or []

BatchResponse dataclass

BatchResponse(_id: str, messages: list[M])

Basic datatype for Batch map response.

Parameters:

Name Type Description Default
id

the id of the request.

required
messages list[M]

list of responses for corresponding to the request id

required

BatchMapAsyncServer

BatchMapAsyncServer(
    batch_mapper_instance: BatchMapCallable,
    sock_path=BATCH_MAP_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=MAP_SERVER_INFO_FILE_PATH,
)

Bases: NumaflowServer

Class for a new Batch Map Async Server instance.

The server instance is returned.

Parameters:

Name Type Description Default
batch_mapper_instance BatchMapCallable

The batch map stream instance to be used for Batch Map UDF

required
sock_path

The UNIX socket path to be used for the server

BATCH_MAP_SOCK_PATH
max_message_size

The max message size in bytes the server can receive and send

MAX_MESSAGE_SIZE
max_threads

The max number of threads to be spawned; defaults to 4 and max capped at 16

NUM_THREADS_DEFAULT

Example invocation:

class Flatmap(BatchMapper):
   async def handler(
       self,
       datums: AsyncIterable[Datum],
   ) -> BatchResponses:
       batch_responses = BatchResponses()
       async for datum in datums:
           val = datum.value
           _ = datum.event_time
           _ = datum.watermark
           strs = val.decode("utf-8").split(",")
           batch_response = BatchResponse.from_id(datum.id)
           if len(strs) == 0:
               batch_response.append(Message.to_drop())
           else:
               for s in strs:
                   batch_response.append(Message(str.encode(s)))
           batch_responses.append(batch_response)

       return batch_responses
if __name__ == "__main__":
    grpc_server = BatchMapAsyncServer(Flatmap())
    grpc_server.start()

Source code in pynumaflow/batchmapper/async_server.py
def __init__(
    self,
    batch_mapper_instance: BatchMapCallable,
    sock_path=BATCH_MAP_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=MAP_SERVER_INFO_FILE_PATH,
):
    """
    Create a new grpc Async Batch Map Server instance.
    A new servicer instance is created and attached to the server.
    The server instance is returned.

    Args:
        batch_mapper_instance: The batch map stream instance to be used for Batch Map UDF
        sock_path: The UNIX socket path to be used for the server
        max_message_size: The max message size in bytes the server can receive and send
        max_threads: The max number of threads to be spawned;
                        defaults to 4 and max capped at 16

    Example invocation:
    ```py
    class Flatmap(BatchMapper):
       async def handler(
           self,
           datums: AsyncIterable[Datum],
       ) -> BatchResponses:
           batch_responses = BatchResponses()
           async for datum in datums:
               val = datum.value
               _ = datum.event_time
               _ = datum.watermark
               strs = val.decode("utf-8").split(",")
               batch_response = BatchResponse.from_id(datum.id)
               if len(strs) == 0:
                   batch_response.append(Message.to_drop())
               else:
                   for s in strs:
                       batch_response.append(Message(str.encode(s)))
               batch_responses.append(batch_response)

           return batch_responses
    if __name__ == "__main__":
        grpc_server = BatchMapAsyncServer(Flatmap())
        grpc_server.start()
    ```
    """
    self.batch_mapper_instance: BatchMapCallable = batch_mapper_instance
    self.sock_path = f"unix://{sock_path}"
    self.max_threads = min(max_threads, MAX_NUM_THREADS)
    self.max_message_size = max_message_size
    self.server_info_file = server_info_file

    self._server_options = [
        ("grpc.max_send_message_length", self.max_message_size),
        ("grpc.max_receive_message_length", self.max_message_size),
    ]

    self.servicer = AsyncBatchMapServicer(handler=self.batch_mapper_instance)

start

start()

Starter function for the Async Batch Map server, we need a separate caller to the aexec so that all the async coroutines can be started from a single context

Source code in pynumaflow/batchmapper/async_server.py
def start(self):
    """
    Starter function for the Async Batch Map server, we need a separate caller
    to the aexec so that all the async coroutines can be started from a single context
    """
    aiorun.run(self.aexec(), use_uvloop=True)

aexec async

aexec()

Starts the Async gRPC server on the given UNIX socket with given max threads.

Source code in pynumaflow/batchmapper/async_server.py
async def aexec(self):
    """
    Starts the Async gRPC server on the given UNIX socket with
    given max threads.
    """
    # As the server is async, we need to create a new server instance in the
    # same thread as the event loop so that all the async calls are made in the
    # same context
    # Create a new async server instance and add the servicer to it
    server = grpc.aio.server(options=self._server_options)
    server.add_insecure_port(self.sock_path)
    map_pb2_grpc.add_MapServicer_to_server(
        self.servicer,
        server,
    )
    _LOGGER.info("Starting Batch Map Server")
    serv_info = ServerInfo.get_default_server_info()
    serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[ContainerType.Mapper]
    # Add the MAP_MODE metadata to the server info for the correct map mode
    serv_info.metadata[MAP_MODE_KEY] = MapMode.BatchMap

    # Start the async server
    await start_async_server(
        server_async=server,
        sock_path=self.sock_path,
        max_threads=self.max_threads,
        cleanup_coroutines=list(),
        server_info_file=self.server_info_file,
        server_info=serv_info,
    )