Mapper¶
The Mapper module provides classes and functions for implementing Map UDFs that transform messages one at a time. Map is the most common UDF type. It receives one message at a time and can return:
- One message (1:1 transformation)
- Multiple messages (fan-out)
- No messages (filter/drop)
Classes¶
MapAsyncServer
¶
MapAsyncServer(
mapper_instance: MapAsyncCallable,
sock_path=MAP_SOCK_PATH,
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=MAP_SERVER_INFO_FILE_PATH,
)
Bases: NumaflowServer
Create a new grpc Map Server instance. Args: mapper_instance: The mapper instance to be used for Map UDF sock_path: The UNIX socket path to be used for the server max_message_size: The max message size in bytes the server can receive and send max_threads: The max number of threads to be spawned; defaults to 4 and max capped at 16
Example invocation:
from pynumaflow.mapper import Messages, Message, Datum, MapAsyncServer
async def async_map_handler(keys: list[str], datum: Datum) -> Messages:
val = datum.value
msg = (
f"payload:{val.decode('utf-8')} "
f"event_time:{datum.event_time} "
f"watermark:{datum.watermark}"
)
return Messages(Message(value=msg.encode('utf-8'), keys=keys))
if __name__ == "__main__":
grpc_server = MapAsyncServer(async_map_handler)
grpc_server.start()
The server instance is returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mapper_instance
|
MapAsyncCallable
|
The mapper instance to be used for Map UDF |
required |
sock_path
|
The UNIX socket path to be used for the server |
MAP_SOCK_PATH
|
|
max_message_size
|
The max message size in bytes the server can receive and send |
MAX_MESSAGE_SIZE
|
|
max_threads
|
The max number of threads to be spawned; defaults to 4 and max capped at 16 |
NUM_THREADS_DEFAULT
|
Source code in pynumaflow/mapper/async_server.py
start
¶
Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context
aexec
async
¶
Starts the Async gRPC server on the given UNIX socket with given max threads.
Source code in pynumaflow/mapper/async_server.py
MapMultiprocServer
¶
MapMultiprocServer(
mapper_instance: MapSyncCallable,
server_count: int = _PROCESS_COUNT,
sock_path=MAP_SOCK_PATH,
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=MAP_SERVER_INFO_FILE_PATH,
)
Bases: NumaflowServer
Create a new grpc Multiproc Map Server instance.
The server instance is returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mapper_instance
|
MapSyncCallable
|
The mapper instance to be used for Map UDF |
required |
server_count
|
int
|
The number of grpc server instances to be forked for multiproc |
_PROCESS_COUNT
|
sock_path
|
The UNIX socket path to be used for the server |
MAP_SOCK_PATH
|
|
max_message_size
|
The max message size in bytes the server can receive and send |
MAX_MESSAGE_SIZE
|
|
max_threads
|
The max number of threads to be spawned; defaults to 4 and max capped at 16 |
NUM_THREADS_DEFAULT
|
Example invocation:
import math
import os
from pynumaflow.mapper import Messages, Message, Datum, Mapper, MapMultiprocServer
def is_prime(n):
for i in range(2, int(math.ceil(math.sqrt(n)))):
if n % i == 0:
return False
else:
return True
class PrimeMap(Mapper):
def handler(self, keys: list[str], datum: Datum) -> Messages:
val = datum.value
_ = datum.event_time
_ = datum.watermark
messages = Messages()
for i in range(2, 100000):
is_prime(i)
messages.append(Message(val, keys=keys))
return messages
if __name__ == "__main__":
server_count = 2
prime_class = PrimeMap()
# Server count is the number of server processes to start
grpc_server = MapMultiprocServer(prime_class, server_count=server_count)
grpc_server.start()
Source code in pynumaflow/mapper/multiproc_server.py
start
¶
Starts the N grpc servers gRPC serves on the with given max threads. Here N = The number of CPUs or the value of the parameter server_count defined by the user. The max value is capped to 2 * CPU count.
Source code in pynumaflow/mapper/multiproc_server.py
MapServer
¶
MapServer(
mapper_instance: MapSyncCallable,
sock_path=MAP_SOCK_PATH,
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=MAP_SERVER_INFO_FILE_PATH,
)
Bases: NumaflowServer
Create a new grpc Map Server instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mapper_instance
|
MapSyncCallable
|
The mapper instance to be used for Map UDF |
required |
sock_path
|
The UNIX socket path to be used for the server |
MAP_SOCK_PATH
|
|
max_message_size
|
The max message size in bytes the server can receive and send |
MAX_MESSAGE_SIZE
|
|
max_threads
|
The max number of threads to be spawned; defaults to 4 and max capped at 16 |
NUM_THREADS_DEFAULT
|
Example Invocation:
from pynumaflow.mapper import Messages, Message, Datum, MapServer, Mapper
class MessageForwarder(Mapper):
def handler(self, keys: list[str], datum: Datum) -> Messages:
val = datum.value
_ = datum.event_time
_ = datum.watermark
return Messages(Message(value=val, keys=keys))
def my_handler(keys: list[str], datum: Datum) -> Messages:
val = datum.value
_ = datum.event_time
_ = datum.watermark
return Messages(Message(value=val, keys=keys))
if __name__ == "__main__":
# Use the class based approach or function based handler based on the env variable
# Both can be used and passed directly to the server class
invoke = os.getenv("INVOKE", "func_handler")
if invoke == "class":
handler = MessageForwarder()
else:
handler = my_handler
grpc_server = MapServer(handler)
grpc_server.start()
Source code in pynumaflow/mapper/sync_server.py
start
¶
Starts the Synchronous gRPC server on the given UNIX socket with given max threads.
Source code in pynumaflow/mapper/sync_server.py
Message
dataclass
¶
Message(
value: bytes,
keys: Optional[list[str]] = None,
tags: Optional[list[str]] = None,
user_metadata: Optional[UserMetadata] = None,
)
Basic datatype for data passing to the next vertex/vertices.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
bytes
|
data in bytes |
required |
keys
|
Optional[list[str]]
|
list of keys for the vertex (optional) |
None
|
tags
|
Optional[list[str]]
|
list of tags for conditional forwarding (optional) |
None
|
user_metadata
|
Optional[UserMetadata]
|
metadata for the message (optional) |
None
|
Source code in pynumaflow/mapper/_dtypes.py
Messages
¶
Datum
dataclass
¶
Datum(
keys: list[str],
value: bytes,
event_time: datetime,
watermark: datetime,
headers: Optional[dict[str, str]] = None,
user_metadata: Optional[UserMetadata] = None,
system_metadata: Optional[SystemMetadata] = None,
)
Class to define the important information for the event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keys
|
list[str]
|
the keys of the event. |
required |
value
|
bytes
|
the payload of the event. |
required |
event_time
|
datetime
|
the event time of the event. |
required |
watermark
|
datetime
|
the watermark of the event. |
required |
headers
|
Optional[dict[str, str]]
|
the headers of the event. |
None
|
Example usage
from pynumaflow.mapper import Datum
from datetime import datetime, timezone
d = Datum(
keys=["test_key"],
value=b'test_mock_message'
event_time=datetime.fromtimestamp(1662998400, timezone.utc),
watermark=datetime.fromtimestamp(1662998460, timezone.utc),
headers={"key1": "value1", "key2": "value2"},
)
Source code in pynumaflow/mapper/_dtypes.py
system_metadata
property
¶
system_metadata: SystemMetadata
Returns the system metadata of the event.
Mapper
¶
Provides an interface to write a SyncMapServicer which will be exposed over a Synchronous gRPC server.
handler
abstractmethod
¶
Implement this handler function which implements the MapSyncCallable interface.
UserMetadata
dataclass
¶
UserMetadata wraps the user-generated metadata groups per message. It is read-write to UDFs.
groups
¶
keys
¶
value
¶
Returns the value for a given group and key. If the group or key does not exist, returns None.
Source code in pynumaflow/_metadata.py
add_key
¶
remove_key
¶
Removes the key and its value for a given group and returns the value. If this key is the only key in the group, the group will be removed. Returns None if the group or key does not exist.
Source code in pynumaflow/_metadata.py
remove_group
¶
Removes the group and all its keys and values and returns the data. Returns None if the group does not exist.