Batch Mapper¶
The Batch Mapper module offers tools for building BatchMap UDFs, allowing you to process multiple messages simultaneously. This enables more efficient handling of workloads such as bulk API requests or batch database operations by grouping messages and processing them together in a single operation.
Classes¶
Message
dataclass
¶
Basic datatype for data passing to the next vertex/vertices.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
bytes
|
data in bytes |
required |
keys
|
Optional[list[str]]
|
list of keys for vertex (optional) |
None
|
tags
|
Optional[list[str]]
|
list of tags for conditional forwarding (optional) |
None
|
Source code in pynumaflow/batchmapper/_dtypes.py
Datum
dataclass
¶
Datum(
id: str,
keys: list[str],
value: bytes,
event_time: datetime,
watermark: datetime,
headers: Optional[dict[str, str]] = None,
)
Class to define the important information for the event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keys
|
list[str]
|
the keys of the event. |
required |
value
|
bytes
|
the payload of the event. |
required |
event_time
|
datetime
|
the event time of the event. |
required |
watermark
|
datetime
|
the watermark of the event. |
required |
headers
|
Optional[dict[str, str]]
|
the headers of the event. |
None
|
id
|
str
|
the unique ID for this request |
required |
Source code in pynumaflow/batchmapper/_dtypes.py
BatchMapper
¶
Provides an interface to write a Batch Mapper which will be exposed over a gRPC server.
Args:
handler
abstractmethod
async
¶
handler(datums: AsyncIterable[Datum]) -> BatchResponses
Implement this handler function which implements the BatchMapAsyncCallable interface.
BatchResponses
¶
BatchResponse
dataclass
¶
Basic datatype for Batch map response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id
|
the id of the request. |
required | |
messages
|
list[M]
|
list of responses for corresponding to the request id |
required |
BatchMapAsyncServer
¶
BatchMapAsyncServer(
batch_mapper_instance: BatchMapCallable,
sock_path=BATCH_MAP_SOCK_PATH,
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=MAP_SERVER_INFO_FILE_PATH,
)
Bases: NumaflowServer
Class for a new Batch Map Async Server instance.
The server instance is returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_mapper_instance
|
BatchMapCallable
|
The batch map stream instance to be used for Batch Map UDF |
required |
sock_path
|
The UNIX socket path to be used for the server |
BATCH_MAP_SOCK_PATH
|
|
max_message_size
|
The max message size in bytes the server can receive and send |
MAX_MESSAGE_SIZE
|
|
max_threads
|
The max number of threads to be spawned; defaults to 4 and max capped at 16 |
NUM_THREADS_DEFAULT
|
Example invocation:
class Flatmap(BatchMapper):
async def handler(
self,
datums: AsyncIterable[Datum],
) -> BatchResponses:
batch_responses = BatchResponses()
async for datum in datums:
val = datum.value
_ = datum.event_time
_ = datum.watermark
strs = val.decode("utf-8").split(",")
batch_response = BatchResponse.from_id(datum.id)
if len(strs) == 0:
batch_response.append(Message.to_drop())
else:
for s in strs:
batch_response.append(Message(str.encode(s)))
batch_responses.append(batch_response)
return batch_responses
if __name__ == "__main__":
grpc_server = BatchMapAsyncServer(Flatmap())
grpc_server.start()
Source code in pynumaflow/batchmapper/async_server.py
start
¶
Starter function for the Async Batch Map server, we need a separate caller to the aexec so that all the async coroutines can be started from a single context
aexec
async
¶
Starts the Async gRPC server on the given UNIX socket with given max threads.