Sinker¶
The Sinker module provides classes and functions for implementing User Defined Sinks that write processed data to external systems ((database, kafka topic, etc.)).
Classes¶
SinkAsyncServer
¶
SinkAsyncServer(
sinker_instance: SinkAsyncCallable,
sock_path=SINK_SOCK_PATH,
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=SINK_SERVER_INFO_FILE_PATH,
)
Bases: NumaflowServer
SinkAsyncServer is the main class to start a gRPC server for a sinker. Create a new grpc Async Sink Server instance. A new servicer instance is created and attached to the server. The server instance is returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sinker_instance
|
SinkAsyncCallable
|
The sinker instance to be used for Sink UDF |
required |
sock_path
|
The UNIX socket path to be used for the server |
SINK_SOCK_PATH
|
|
max_message_size
|
The max message size in bytes the server can receive and send |
MAX_MESSAGE_SIZE
|
|
max_threads
|
The max number of threads to be spawned; defaults to 4 and max capped at 16 |
NUM_THREADS_DEFAULT
|
Example invocation:
import os
import logging
from collections.abc import AsyncIterable
from pynumaflow.sinker import Datum, Responses, Response, Sinker
from pynumaflow.sinker import SinkAsyncServer
from pynumaflow._constants import _LOGGER
logging.basicConfig(level=logging.INFO)
class UserDefinedSink(Sinker):
async def handler(self, datums: AsyncIterable[Datum]) -> Responses:
responses = Responses()
async for msg in datums:
logging.info("User Defined Sink %s", msg.value.decode("utf-8"))
responses.append(Response.as_success(msg.id))
return responses
async def udsink_handler(datums: AsyncIterable[Datum]) -> Responses:
responses = Responses()
async for msg in datums:
logging.info("User Defined Sink %s", msg.value.decode("utf-8"))
responses.append(Response.as_success(msg.id))
return responses
if __name__ == "__main__":
invoke = os.getenv("INVOKE", "func_handler")
if invoke == "class":
sink_handler = UserDefinedSink()
else:
sink_handler = udsink_handler
grpc_server = SinkAsyncServer(sink_handler)
grpc_server.start()
Source code in pynumaflow/sinker/async_server.py
start
¶
Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context
aexec
async
¶
Starts the Asynchronous gRPC server on the given UNIX socket with given max threads.
Source code in pynumaflow/sinker/async_server.py
SinkServer
¶
SinkServer(
sinker_instance: SinkSyncCallable,
sock_path=SINK_SOCK_PATH,
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=SINK_SERVER_INFO_FILE_PATH,
)
Bases: NumaflowServer
SinkServer is the main class to start a gRPC server for a sinker.
The server instance is returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sinker_instance
|
SinkSyncCallable
|
The sinker instance to be used for Sink UDF |
required |
sock_path
|
The UNIX socket path to be used for the server |
SINK_SOCK_PATH
|
|
max_message_size
|
The max message size in bytes the server can receive and send |
MAX_MESSAGE_SIZE
|
|
max_threads
|
The max number of threads to be spawned; defaults to 4 and max capped at 16 |
NUM_THREADS_DEFAULT
|
Example invocation:
import os
import logging
from collections.abc import Iterator
from pynumaflow.sinker import Datum, Responses, Response, SinkServer
from pynumaflow.sinker import Sinker
from pynumaflow._constants import _LOGGER
logging.basicConfig(level=logging.INFO)
class UserDefinedSink(Sinker):
def handler(self, datums: Iterator[Datum]) -> Responses:
responses = Responses()
for msg in datums:
logging.info("User Defined Sink %s", msg.value.decode("utf-8"))
responses.append(Response.as_success(msg.id))
return responses
def udsink_handler(datums: Iterator[Datum]) -> Responses:
responses = Responses()
for msg in datums:
logging.info("User Defined Sink %s", msg.value.decode("utf-8"))
responses.append(Response.as_success(msg.id))
return responses
if __name__ == "__main__":
invoke = os.getenv("INVOKE", "func_handler")
if invoke == "class":
sink_handler = UserDefinedSink()
else:
sink_handler = udsink_handler
grpc_server = SinkServer(sink_handler)
grpc_server.start()
Source code in pynumaflow/sinker/server.py
start
¶
Starts the Synchronous gRPC server on the given UNIX socket with given max threads.
Source code in pynumaflow/sinker/server.py
UserMetadata
dataclass
¶
UserMetadata wraps the user-generated metadata groups per message. It is read-write to UDFs.
groups
¶
keys
¶
value
¶
Returns the value for a given group and key. If the group or key does not exist, returns None.
Source code in pynumaflow/_metadata.py
add_key
¶
remove_key
¶
Removes the key and its value for a given group and returns the value. If this key is the only key in the group, the group will be removed. Returns None if the group or key does not exist.
Source code in pynumaflow/_metadata.py
remove_group
¶
Removes the group and all its keys and values and returns the data. Returns None if the group does not exist.
SystemMetadata
dataclass
¶
Response
dataclass
¶
Response(
id: str,
success: bool,
err: Optional[str],
fallback: bool,
on_success: bool,
on_success_msg: Optional[Message],
)
Basic datatype for UDSink response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id
|
str
|
the id of the event. |
required |
success
|
bool
|
boolean indicating whether the event was successfully processed. |
required |
err
|
Optional[str]
|
error message if the event was not successfully processed. |
required |
fallback
|
bool
|
fallback is true if the message to be sent to the fallback sink. |
required |
Responses
¶
Datum
dataclass
¶
Datum(
keys: list[str],
sink_msg_id: str,
value: bytes,
event_time: datetime,
watermark: datetime,
headers: Optional[dict[str, str]] = None,
user_metadata: Optional[UserMetadata] = None,
system_metadata: Optional[SystemMetadata] = None,
)
Class to define the important information for the event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keys
|
list[str]
|
the keys of the event. |
required |
value
|
bytes
|
the payload of the event. |
required |
event_time
|
datetime
|
the event time of the event. |
required |
watermark
|
datetime
|
the watermark of the event. |
required |
headers
|
Optional[dict[str, str]]
|
the headers of the event. |
None
|
Example usage
from pynumaflow.sinker import Datum
from datetime import datetime, timezone
d = Datum(
keys=["test_key"],
sink_msg_id="test_id",
value=b"test_mock_message",
event_time=datetime.fromtimestamp(1662998400, timezone.utc),
watermark=datetime.fromtimestamp(1662998460, timezone.utc),
headers={"key1": "value1", "key2": "value2"},
)
Source code in pynumaflow/sinker/_dtypes.py
system_metadata
property
¶
system_metadata: SystemMetadata
Returns the system metadata of the event.
Sinker
¶
Provides an interface to write a Sinker which will be exposed over a gRPC server.
handler
abstractmethod
¶
Implement this handler function which implements the SinkCallable interface.
Message
dataclass
¶
Message(
value: bytes,
keys: Optional[list[str]] = None,
user_metadata: Optional[UserMetadata] = None,
)
Basic datatype for OnSuccess UDSink message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keys
|
Optional[list[str]]
|
list of keys for the on_success message. |
None
|
value
|
bytes
|
payload of the on_success message. |
required |
user_metadata
|
Optional[UserMetadata]
|
user metadata of the on_success message. |
None
|