Source Transformer¶
The Source Transformer module provides classes and functions for implementing Source Transform UDFs that transform data immediately after it's read from a source. Source Transform is useful for:
- Parsing/deserializing data at ingestion
- Filtering messages early
- Assigning event times
- Adding metadata
- Routing messages with tags
Classes¶
Message
dataclass
¶
Message(
value: bytes,
event_time: datetime,
keys: list[str] = None,
tags: list[str] = None,
user_metadata: Optional[UserMetadata] = None,
)
Basic datatype for data passing to the next vertex/vertices.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
bytes
|
data in bytes |
required |
event_time
|
datetime
|
event time of the message, usually extracted from the payload. |
required |
keys
|
list[str]
|
[]string keys for vertex (optional) |
None
|
tags
|
list[str]
|
[]string tags for conditional forwarding (optional) |
None
|
user_metadata
|
Optional[UserMetadata]
|
metadata for the message (optional) |
None
|
Source code in pynumaflow/sourcetransformer/_dtypes.py
Messages
¶
Datum
dataclass
¶
Datum(
keys: list[str],
value: bytes,
event_time: datetime,
watermark: datetime,
headers: Optional[dict[str, str]] = None,
user_metadata: Optional[UserMetadata] = None,
system_metadata: Optional[SystemMetadata] = None,
)
Class to define the important information for the event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keys
|
list[str]
|
the keys of the event. |
required |
value
|
bytes
|
the payload of the event. |
required |
event_time
|
datetime
|
the event time of the event. |
required |
watermark
|
datetime
|
the watermark of the event. |
required |
headers
|
Optional[dict[str, str]]
|
the headers of the event. |
None
|
user_metadata
|
Optional[UserMetadata]
|
the user metadata of the event. |
None
|
system_metadata
|
Optional[SystemMetadata]
|
the system metadata of the event. |
None
|
Example:
from pynumaflow.sourcetransformer import Datum
from datetime import datetime, timezone
d = Datum(
keys=["test_key"],
value=b"test_mock_message",
event_time=datetime.fromtimestamp(1662998400, timezone.utc),
watermark=datetime.fromtimestamp(1662998460, timezone.utc),
headers={"key1": "value1", "key2": "value2"},
)
Source code in pynumaflow/sourcetransformer/_dtypes.py
system_metadata
property
¶
system_metadata: SystemMetadata
Returns the system metadata of the event.
SourceTransformer
¶
Provides an interface to write a Source Transformer which will be exposed over a GRPC server.
handler
abstractmethod
¶
Implement this handler function which implements the SourceTransformCallable interface.
SourceTransformMultiProcServer
¶
SourceTransformMultiProcServer(
source_transform_instance: SourceTransformCallable,
server_count: int = _PROCESS_COUNT,
sock_path=SOURCE_TRANSFORMER_SOCK_PATH,
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH,
)
Bases: NumaflowServer
Class for a new Source Transformer Server instance.
The server instance is returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_transform_instance
|
SourceTransformCallable
|
The source transformer instance to be used for |
required |
sock_path
|
The UNIX socket path to be used for the server |
SOURCE_TRANSFORMER_SOCK_PATH
|
|
server_count
|
int
|
The number of grpc server instances to be forked for multiproc |
_PROCESS_COUNT
|
max_message_size
|
The max message size in bytes the server can receive and send |
MAX_MESSAGE_SIZE
|
|
max_threads
|
The max number of threads to be spawned; defaults to 4 and max capped at 16 |
NUM_THREADS_DEFAULT
|
Example invocation:
import datetime
import logging
from pynumaflow.sourcetransformer import Messages, Message, Datum, SourceTransformServer
# This is a simple User Defined Function example which receives a message,
# applies the following data transformation, and returns the message.
# If the message event time is before year 2022, drop the message
# with event time unchanged.
# If it's within year 2022, update the tag to "within_year_2022" and
# update the message event time to Jan 1st 2022.
# Otherwise, (exclusively after year 2022), update the tag to
# "after_year_2022" and update the
january_first_2022 = datetime.datetime.fromtimestamp(1640995200)
january_first_2023 = datetime.datetime.fromtimestamp(1672531200)
def my_handler(keys: list[str], datum: Datum) -> Messages:
val = datum.value
event_time = datum.event_time
messages = Messages()
if event_time < january_first_2022:
logging.info("Got event time:%s, it is before 2022, so dropping", event_time)
messages.append(Message.to_drop(event_time))
elif event_time < january_first_2023:
logging.info(
"Got event time:%s, it is within year 2022, so
forwarding to within_year_2022",
event_time,
)
message = Message(
value=val,
event_time=january_first_2022,
tags=["within_year_2022"],
)
messages.append(message)
else:
logging.info(
"Got event time:%s, it is after year 2022, so forwarding to after_year_2022",
event_time,
)
message = Message(
value=val,
event_time=january_first_2023,
tags=["after_year_2022"],
)
messages.append(message)
return messages
if __name__ == "__main__":
grpc_server = SourceTransformMultiProcServer(
source_transform_instance=my_handler,
server_count=2,
)
grpc_server.start()
Source code in pynumaflow/sourcetransformer/multiproc_server.py
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | |
start
¶
Starts the N gRPC servers on the given socket path with given max threads.
Here N = The number of CPUs or the value of the parameter server_count
defined by the user. The max value is capped to 2 * CPU count.
Source code in pynumaflow/sourcetransformer/multiproc_server.py
SourceTransformServer
¶
SourceTransformServer(
source_transform_instance: SourceTransformCallable,
sock_path=SOURCE_TRANSFORMER_SOCK_PATH,
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH,
)
Bases: NumaflowServer
Class for a new Source Transformer Server instance.
The server instance is returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_transform_instance
|
SourceTransformCallable
|
The source transformer instance to be used for |
required |
sock_path
|
The UNIX socket path to be used for the server |
SOURCE_TRANSFORMER_SOCK_PATH
|
|
max_message_size
|
The max message size in bytes the server can receive and send |
MAX_MESSAGE_SIZE
|
|
max_threads
|
The max number of threads to be spawned; defaults to 4 and max capped at 16 |
NUM_THREADS_DEFAULT
|
Below is a simple User Defined Function example which receives a message, applies the following data transformation, and returns the message.
- If the message event time is before year 2022, drop the message with event time unchanged.
- If it's within year 2022, update the tag to
within_year_2022and update the message event time to Jan 1st 2022. - Otherwise, (exclusively after year 2022), update the tag to
after_year_2022and update the message event time to Jan 1st 2023.
import datetime
import logging
from pynumaflow.sourcetransformer import Messages, Message, Datum, SourceTransformServer
january_first_2022 = datetime.datetime.fromtimestamp(1640995200)
january_first_2023 = datetime.datetime.fromtimestamp(1672531200)
def my_handler(keys: list[str], datum: Datum) -> Messages:
val = datum.value
event_time = datum.event_time
messages = Messages()
if event_time < january_first_2022:
logging.info("Got event time:%s, it is before 2022, so dropping", event_time)
messages.append(Message.to_drop(event_time))
elif event_time < january_first_2023:
logging.info(
"Got event time:%s, it is within year 2022, so forwarding to within_year_2022",
event_time,
)
messages.append(
Message(value=val, event_time=january_first_2022,
tags=["within_year_2022"])
)
else:
logging.info(
"Got event time:%s, it is after year 2022, so forwarding to
after_year_2022", event_time
)
messages.append(Message(value=val, event_time=january_first_2023,
tags=["after_year_2022"]))
return messages
if __name__ == "__main__":
grpc_server = SourceTransformServer(my_handler)
grpc_server.start()
Source code in pynumaflow/sourcetransformer/server.py
start
¶
Starts the Synchronous gRPC server on the given UNIX socket with given max threads.
Source code in pynumaflow/sourcetransformer/server.py
SourceTransformAsyncServer
¶
SourceTransformAsyncServer(
source_transform_instance: SourceTransformAsyncCallable,
sock_path=SOURCE_TRANSFORMER_SOCK_PATH,
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH,
)
Bases: NumaflowServer
Create a new grpc Source Transformer Server instance. A new servicer instance is created and attached to the server. The server instance is returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_transform_instance
|
SourceTransformAsyncCallable
|
The source transformer instance to be used for |
required |
sock_path
|
The UNIX socket path to be used for the server |
SOURCE_TRANSFORMER_SOCK_PATH
|
|
max_message_size
|
The max message size in bytes the server can receive and send |
MAX_MESSAGE_SIZE
|
|
max_threads
|
The max number of threads to be spawned; defaults to 4 and max capped at 16 |
NUM_THREADS_DEFAULT
|
Below is a simple User Defined Function example which receives a message, applies the following data transformation, and returns the message.
- If the message event time is before year 2022, drop the message with event time unchanged.
- If it's within year 2022, update the tag to
within_year_2022and update the message event time to Jan 1st 2022. - Otherwise, (exclusively after year 2022), update the tag to
after_year_2022and update the message event time to Jan 1st 2023.
import datetime
import logging
from pynumaflow.sourcetransformer import Messages, Message, Datum, SourceTransformServer
january_first_2022 = datetime.datetime.fromtimestamp(1640995200)
january_first_2023 = datetime.datetime.fromtimestamp(1672531200)
async def my_handler(keys: list[str], datum: Datum) -> Messages:
val = datum.value
event_time = datum.event_time
messages = Messages()
if event_time < january_first_2022:
logging.info("Got event time:%s, it is before 2022, so dropping", event_time)
messages.append(Message.to_drop(event_time))
elif event_time < january_first_2023:
logging.info(
"Got event time:%s, it is within year 2022, so forwarding to within_year_2022",
event_time,
)
messages.append(
Message(value=val, event_time=january_first_2022,
tags=["within_year_2022"])
)
else:
logging.info(
"Got event time:%s, it is after year 2022, so forwarding to
after_year_2022", event_time
)
messages.append(Message(value=val, event_time=january_first_2023,
tags=["after_year_2022"]))
return messages
if __name__ == "__main__":
grpc_server = SourceTransformAsyncServer(my_handler)
grpc_server.start()
Source code in pynumaflow/sourcetransformer/async_server.py
start
¶
Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context
aexec
async
¶
Starts the Async gRPC server on the given UNIX socket with given max threads.
Source code in pynumaflow/sourcetransformer/async_server.py
UserMetadata
dataclass
¶
UserMetadata wraps the user-generated metadata groups per message. It is read-write to UDFs.
groups
¶
keys
¶
value
¶
Returns the value for a given group and key. If the group or key does not exist, returns None.
Source code in pynumaflow/_metadata.py
add_key
¶
remove_key
¶
Removes the key and its value for a given group and returns the value. If this key is the only key in the group, the group will be removed. Returns None if the group or key does not exist.
Source code in pynumaflow/_metadata.py
remove_group
¶
Removes the group and all its keys and values and returns the data. Returns None if the group does not exist.