Skip to content

Sinker

The Sinker module provides classes and functions for implementing User Defined Sinks that write processed data to external systems ((database, kafka topic, etc.)).

Classes

SinkAsyncServer

SinkAsyncServer(
    sinker_instance: SinkAsyncCallable,
    sock_path=SINK_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=SINK_SERVER_INFO_FILE_PATH,
)

Bases: NumaflowServer

SinkAsyncServer is the main class to start a gRPC server for a sinker. Create a new grpc Async Sink Server instance. A new servicer instance is created and attached to the server. The server instance is returned.

Parameters:

Name Type Description Default
sinker_instance SinkAsyncCallable

The sinker instance to be used for Sink UDF

required
sock_path

The UNIX socket path to be used for the server

SINK_SOCK_PATH
max_message_size

The max message size in bytes the server can receive and send

MAX_MESSAGE_SIZE
max_threads

The max number of threads to be spawned; defaults to 4 and max capped at 16

NUM_THREADS_DEFAULT

Example invocation:

import os
import logging
from collections.abc import AsyncIterable
from pynumaflow.sinker import Datum, Responses, Response, Sinker
from pynumaflow.sinker import SinkAsyncServer
from pynumaflow._constants import _LOGGER

logging.basicConfig(level=logging.INFO)

class UserDefinedSink(Sinker):
    async def handler(self, datums: AsyncIterable[Datum]) -> Responses:
        responses = Responses()
        async for msg in datums:
            logging.info("User Defined Sink %s", msg.value.decode("utf-8"))
            responses.append(Response.as_success(msg.id))
        return responses


async def udsink_handler(datums: AsyncIterable[Datum]) -> Responses:
    responses = Responses()
    async for msg in datums:
        logging.info("User Defined Sink %s", msg.value.decode("utf-8"))
        responses.append(Response.as_success(msg.id))
    return responses


if __name__ == "__main__":
    invoke = os.getenv("INVOKE", "func_handler")
    if invoke == "class":
        sink_handler = UserDefinedSink()
    else:
        sink_handler = udsink_handler
    grpc_server = SinkAsyncServer(sink_handler)
    grpc_server.start()

Source code in pynumaflow/sinker/async_server.py
def __init__(
    self,
    sinker_instance: SinkAsyncCallable,
    sock_path=SINK_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=SINK_SERVER_INFO_FILE_PATH,
):
    # If the container type is fallback sink, then use the fallback sink address and path.
    if os.getenv(ENV_UD_CONTAINER_TYPE, "") == UD_CONTAINER_FALLBACK_SINK:
        _LOGGER.info("Using Fallback Sink")
        sock_path = FALLBACK_SINK_SOCK_PATH
        server_info_file = FALLBACK_SINK_SERVER_INFO_FILE_PATH
    elif os.getenv(ENV_UD_CONTAINER_TYPE, "") == UD_CONTAINER_ON_SUCCESS_SINK:
        _LOGGER.info("Using On Success Sink")
        sock_path = ON_SUCCESS_SINK_SOCK_PATH
        server_info_file = ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH

    self.sock_path = f"unix://{sock_path}"
    self.max_threads = min(max_threads, MAX_NUM_THREADS)
    self.max_message_size = max_message_size
    self.server_info_file = server_info_file

    self.sinker_instance = sinker_instance

    self._server_options = [
        ("grpc.max_send_message_length", self.max_message_size),
        ("grpc.max_receive_message_length", self.max_message_size),
    ]

    self.servicer = AsyncSinkServicer(sinker_instance)

start

start()

Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context

Source code in pynumaflow/sinker/async_server.py
def start(self):
    """
    Starter function for the Async server class, need a separate caller
    so that all the async coroutines can be started from a single context
    """
    aiorun.run(self.aexec(), use_uvloop=True)

aexec async

aexec()

Starts the Asynchronous gRPC server on the given UNIX socket with given max threads.

Source code in pynumaflow/sinker/async_server.py
async def aexec(self):
    """
    Starts the Asynchronous gRPC server on the given UNIX socket with given max threads.
    """
    # As the server is async, we need to create a new server instance in the
    # same thread as the event loop so that all the async calls are made in the
    # same context
    # Create a new server instance, add the servicer to it and start the server
    server = grpc.aio.server(options=self._server_options)
    server.add_insecure_port(self.sock_path)
    sink_pb2_grpc.add_SinkServicer_to_server(self.servicer, server)
    serv_info = ServerInfo.get_default_server_info()
    serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[ContainerType.Sinker]
    await start_async_server(
        server_async=server,
        sock_path=self.sock_path,
        max_threads=self.max_threads,
        cleanup_coroutines=list(),
        server_info_file=self.server_info_file,
        server_info=serv_info,
    )

SinkServer

SinkServer(
    sinker_instance: SinkSyncCallable,
    sock_path=SINK_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=SINK_SERVER_INFO_FILE_PATH,
)

Bases: NumaflowServer

SinkServer is the main class to start a gRPC server for a sinker.

The server instance is returned.

Parameters:

Name Type Description Default
sinker_instance SinkSyncCallable

The sinker instance to be used for Sink UDF

required
sock_path

The UNIX socket path to be used for the server

SINK_SOCK_PATH
max_message_size

The max message size in bytes the server can receive and send

MAX_MESSAGE_SIZE
max_threads

The max number of threads to be spawned; defaults to 4 and max capped at 16

NUM_THREADS_DEFAULT

Example invocation:

import os
import logging
from collections.abc import Iterator

from pynumaflow.sinker import Datum, Responses, Response, SinkServer
from pynumaflow.sinker import Sinker
from pynumaflow._constants import _LOGGER

logging.basicConfig(level=logging.INFO)

class UserDefinedSink(Sinker):
    def handler(self, datums: Iterator[Datum]) -> Responses:
        responses = Responses()
        for msg in datums:
            logging.info("User Defined Sink %s", msg.value.decode("utf-8"))
            responses.append(Response.as_success(msg.id))
        return responses

def udsink_handler(datums: Iterator[Datum]) -> Responses:
    responses = Responses()
    for msg in datums:
        logging.info("User Defined Sink %s", msg.value.decode("utf-8"))
        responses.append(Response.as_success(msg.id))
    return responses

if __name__ == "__main__":
    invoke = os.getenv("INVOKE", "func_handler")
    if invoke == "class":
        sink_handler = UserDefinedSink()
    else:
        sink_handler = udsink_handler
    grpc_server = SinkServer(sink_handler)
    grpc_server.start()

Source code in pynumaflow/sinker/server.py
def __init__(
    self,
    sinker_instance: SinkSyncCallable,
    sock_path=SINK_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=SINK_SERVER_INFO_FILE_PATH,
):
    """
    Create a new grpc Sink Server instance.
    A new servicer instance is created and attached to the server.
    The server instance is returned.

    Args:
        sinker_instance: The sinker instance to be used for Sink UDF
        sock_path: The UNIX socket path to be used for the server
        max_message_size: The max message size in bytes the server can receive and send
        max_threads: The max number of threads to be spawned;
                        defaults to 4 and max capped at 16
    Example invocation:
    ```py
    import os
    import logging
    from collections.abc import Iterator

    from pynumaflow.sinker import Datum, Responses, Response, SinkServer
    from pynumaflow.sinker import Sinker
    from pynumaflow._constants import _LOGGER

    logging.basicConfig(level=logging.INFO)

    class UserDefinedSink(Sinker):
        def handler(self, datums: Iterator[Datum]) -> Responses:
            responses = Responses()
            for msg in datums:
                logging.info("User Defined Sink %s", msg.value.decode("utf-8"))
                responses.append(Response.as_success(msg.id))
            return responses

    def udsink_handler(datums: Iterator[Datum]) -> Responses:
        responses = Responses()
        for msg in datums:
            logging.info("User Defined Sink %s", msg.value.decode("utf-8"))
            responses.append(Response.as_success(msg.id))
        return responses

    if __name__ == "__main__":
        invoke = os.getenv("INVOKE", "func_handler")
        if invoke == "class":
            sink_handler = UserDefinedSink()
        else:
            sink_handler = udsink_handler
        grpc_server = SinkServer(sink_handler)
        grpc_server.start()
    ```
    """
    # If the container type is fallback sink, then use the fallback sink address and path.
    if os.getenv(ENV_UD_CONTAINER_TYPE, "") == UD_CONTAINER_FALLBACK_SINK:
        _LOGGER.info("Using Fallback Sink")
        sock_path = FALLBACK_SINK_SOCK_PATH
        server_info_file = FALLBACK_SINK_SERVER_INFO_FILE_PATH
    elif os.getenv(ENV_UD_CONTAINER_TYPE, "") == UD_CONTAINER_ON_SUCCESS_SINK:
        _LOGGER.info("Using On Success Sink")
        sock_path = ON_SUCCESS_SINK_SOCK_PATH
        server_info_file = ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH

    self.sock_path = f"unix://{sock_path}"
    self.max_threads = min(max_threads, MAX_NUM_THREADS)
    self.max_message_size = max_message_size
    self.server_info_file = server_info_file

    self.sinker_instance = sinker_instance

    self._server_options = [
        ("grpc.max_send_message_length", self.max_message_size),
        ("grpc.max_receive_message_length", self.max_message_size),
    ]
    self.servicer = SyncSinkServicer(sinker_instance)

start

start()

Starts the Synchronous gRPC server on the given UNIX socket with given max threads.

Source code in pynumaflow/sinker/server.py
def start(self):
    """
    Starts the Synchronous gRPC server on the
    given UNIX socket with given max threads.
    """
    _LOGGER.info(
        "Sync GRPC Sink listening on: %s with max threads: %s",
        self.sock_path,
        self.max_threads,
    )
    serv_info = ServerInfo.get_default_server_info()
    serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[ContainerType.Sinker]
    # Start the server
    sync_server_start(
        servicer=self.servicer,
        bind_address=self.sock_path,
        max_threads=self.max_threads,
        server_info_file=self.server_info_file,
        server_options=self._server_options,
        udf_type=UDFType.Sink,
        server_info=serv_info,
    )

UserMetadata dataclass

UserMetadata(_data: dict[str, dict[str, bytes]] = dict())

UserMetadata wraps the user-generated metadata groups per message. It is read-write to UDFs.

groups

groups() -> list[str]

Returns the list of group names for the user metadata.

Source code in pynumaflow/_metadata.py
def groups(self) -> list[str]:
    """
    Returns the list of group names for the user metadata.
    """
    return list(self._data.keys())

keys

keys(group: str) -> list[str]

Returns the list of keys for a given group.

Source code in pynumaflow/_metadata.py
def keys(self, group: str) -> list[str]:
    """
    Returns the list of keys for a given group.
    """
    keys = self._data.get(group) or {}
    return list(keys.keys())

value

value(group: str, key: str) -> Optional[bytes]

Returns the value for a given group and key. If the group or key does not exist, returns None.

Source code in pynumaflow/_metadata.py
def value(self, group: str, key: str) -> Optional[bytes]:
    """
    Returns the value for a given group and key.
    If the group or key does not exist, returns None.
    """
    value = self._data.get(group)
    if value is None:
        return None
    return value.get(key)

add_key

add_key(group: str, key: str, value: bytes)

Adds the value for a given group and key.

Source code in pynumaflow/_metadata.py
def add_key(self, group: str, key: str, value: bytes):
    """
    Adds the value for a given group and key.
    """
    self._data.setdefault(group, {})[key] = value

remove_key

remove_key(group: str, key: str) -> Optional[bytes]

Removes the key and its value for a given group and returns the value. If this key is the only key in the group, the group will be removed. Returns None if the group or key does not exist.

Source code in pynumaflow/_metadata.py
def remove_key(self, group: str, key: str) -> Optional[bytes]:
    """
    Removes the key and its value for a given group and returns the value.
    If this key is the only key in the group, the group will be removed.
    Returns None if the group or key does not exist.
    """
    group_data = self._data.pop(group, None)
    if group_data is None:
        return None
    value = group_data.pop(key, None)
    if group_data:
        self._data[group] = group_data
    return value

remove_group

remove_group(group: str) -> Optional[dict[str, bytes]]

Removes the group and all its keys and values and returns the data. Returns None if the group does not exist.

Source code in pynumaflow/_metadata.py
def remove_group(self, group: str) -> Optional[dict[str, bytes]]:
    """
    Removes the group and all its keys and values and returns the data.
    Returns None if the group does not exist.
    """
    return self._data.pop(group, None)

clear

clear()

Clears all the groups and all their keys and values.

Source code in pynumaflow/_metadata.py
def clear(self):
    """
    Clears all the groups and all their keys and values.
    """
    self._data.clear()

SystemMetadata dataclass

SystemMetadata(_data: dict[str, dict[str, bytes]] = dict())

System metadata is the mapping of group name to key-value pairs for a given group. System metadata wraps the system-generated metadata groups per message. It is read-only to UDFs.

groups

groups() -> list[str]

Returns the list of group names for the system metadata.

Source code in pynumaflow/_metadata.py
def groups(self) -> list[str]:
    """
    Returns the list of group names for the system metadata.
    """
    return list(self._data.keys())

keys

keys(group: str) -> list[str]

Returns the list of keys for a given group.

Source code in pynumaflow/_metadata.py
def keys(self, group: str) -> list[str]:
    """
    Returns the list of keys for a given group.
    """
    return list(self._data.get(group, {}).keys())

value

value(group: str, key: str) -> Optional[bytes]

Returns the value for a given group and key.

Source code in pynumaflow/_metadata.py
def value(self, group: str, key: str) -> Optional[bytes]:
    """
    Returns the value for a given group and key.
    """
    return self._data.get(group, {}).get(key)

Response dataclass

Response(
    id: str,
    success: bool,
    err: Optional[str],
    fallback: bool,
    on_success: bool,
    on_success_msg: Optional[Message],
)

Basic datatype for UDSink response.

Parameters:

Name Type Description Default
id str

the id of the event.

required
success bool

boolean indicating whether the event was successfully processed.

required
err Optional[str]

error message if the event was not successfully processed.

required
fallback bool

fallback is true if the message to be sent to the fallback sink.

required

Responses

Responses(*responses: R)

Bases: Sequence[R]

Container to hold a list of Response instances.

Parameters:

Name Type Description Default
responses R

list of Response instances.

()
Source code in pynumaflow/sinker/_dtypes.py
def __init__(self, *responses: R):
    self._responses = list(responses) or []

Datum dataclass

Datum(
    keys: list[str],
    sink_msg_id: str,
    value: bytes,
    event_time: datetime,
    watermark: datetime,
    headers: Optional[dict[str, str]] = None,
    user_metadata: Optional[UserMetadata] = None,
    system_metadata: Optional[SystemMetadata] = None,
)

Class to define the important information for the event.

Parameters:

Name Type Description Default
keys list[str]

the keys of the event.

required
value bytes

the payload of the event.

required
event_time datetime

the event time of the event.

required
watermark datetime

the watermark of the event.

required
headers Optional[dict[str, str]]

the headers of the event.

None

Example usage

from pynumaflow.sinker import Datum
from datetime import datetime, timezone

d = Datum(
      keys=["test_key"],
      sink_msg_id="test_id",
      value=b"test_mock_message",
      event_time=datetime.fromtimestamp(1662998400, timezone.utc),
      watermark=datetime.fromtimestamp(1662998460, timezone.utc),
      headers={"key1": "value1", "key2": "value2"},
   )

Source code in pynumaflow/sinker/_dtypes.py
def __init__(
    self,
    keys: list[str],
    sink_msg_id: str,
    value: bytes,
    event_time: datetime,
    watermark: datetime,
    headers: Optional[dict[str, str]] = None,
    user_metadata: Optional[UserMetadata] = None,
    system_metadata: Optional[SystemMetadata] = None,
):
    self._keys = keys
    self._id = sink_msg_id or ""
    self._value = value or b""
    if not isinstance(event_time, datetime):
        raise TypeError(f"Wrong data type: {type(event_time)} for Datum.event_time")
    self._event_time = event_time
    if not isinstance(watermark, datetime):
        raise TypeError(f"Wrong data type: {type(watermark)} for Datum.watermark")
    self._watermark = watermark
    self._headers = headers or {}
    self._user_metadata = user_metadata or UserMetadata()
    self._system_metadata = system_metadata or SystemMetadata()

id property

id: str

Returns the id of the event.

keys property

keys: list[str]

Returns the keys of the event.

value property

value: bytes

Returns the value of the event.

event_time property

event_time: datetime

Returns the event time of the event.

watermark property

watermark: datetime

Returns the watermark of the event.

headers property

headers: dict[str, str]

Returns the headers of the event.

user_metadata property

user_metadata: UserMetadata

Returns the user metadata of the event.

system_metadata property

system_metadata: SystemMetadata

Returns the system metadata of the event.

Sinker

Provides an interface to write a Sinker which will be exposed over a gRPC server.

handler abstractmethod

handler(datums: Iterator[Datum]) -> Responses

Implement this handler function which implements the SinkCallable interface.

Source code in pynumaflow/sinker/_dtypes.py
@abstractmethod
def handler(self, datums: Iterator[Datum]) -> Responses:
    """
    Implement this handler function which implements the SinkCallable interface.
    """
    pass

Message dataclass

Message(
    value: bytes,
    keys: Optional[list[str]] = None,
    user_metadata: Optional[UserMetadata] = None,
)

Basic datatype for OnSuccess UDSink message.

Parameters:

Name Type Description Default
keys Optional[list[str]]

list of keys for the on_success message.

None
value bytes

payload of the on_success message.

required
user_metadata Optional[UserMetadata]

user metadata of the on_success message.

None
Source code in pynumaflow/sinker/_dtypes.py
def __init__(
    self,
    value: bytes,
    keys: Optional[list[str]] = None,
    user_metadata: Optional[UserMetadata] = None,
):
    self._value = value
    self._keys = keys
    self._user_metadata = user_metadata

keys property

keys: Optional[list[str]]

Returns the id of the event.

value property

value: bytes

Returns the id of the event.

user_metadata property

user_metadata: Optional[UserMetadata]

Returns the id of the event.