Skip to content

Source Transformer

The Source Transformer module provides classes and functions for implementing Source Transform UDFs that transform data immediately after it's read from a source. Source Transform is useful for:

  • Parsing/deserializing data at ingestion
  • Filtering messages early
  • Assigning event times
  • Adding metadata
  • Routing messages with tags

Classes

Message dataclass

Message(
    value: bytes,
    event_time: datetime,
    keys: list[str] = None,
    tags: list[str] = None,
    user_metadata: Optional[UserMetadata] = None,
)

Basic datatype for data passing to the next vertex/vertices.

Parameters:

Name Type Description Default
value bytes

data in bytes

required
event_time datetime

event time of the message, usually extracted from the payload.

required
keys list[str]

[]string keys for vertex (optional)

None
tags list[str]

[]string tags for conditional forwarding (optional)

None
user_metadata Optional[UserMetadata]

metadata for the message (optional)

None
Source code in pynumaflow/sourcetransformer/_dtypes.py
def __init__(
    self,
    value: bytes,
    event_time: datetime,
    keys: list[str] = None,
    tags: list[str] = None,
    user_metadata: Optional[UserMetadata] = None,
):
    """
    Creates a Message object to send value to a vertex.
    """
    self._tags = tags or []
    self._keys = keys or []

    # There is no year 0, so setting following as default event time.
    self._event_time = event_time or datetime(1, 1, 1, 0, 0)
    self._value = value or b""
    self._user_metadata = user_metadata or UserMetadata()

Messages

Messages(*messages: M)

Bases: Sequence[M]

Class to define a list of Message objects.

Parameters:

Name Type Description Default
messages M

list of Message objects.

()
Source code in pynumaflow/sourcetransformer/_dtypes.py
def __init__(self, *messages: M):
    self._messages = list(messages) or []

Datum dataclass

Datum(
    keys: list[str],
    value: bytes,
    event_time: datetime,
    watermark: datetime,
    headers: Optional[dict[str, str]] = None,
    user_metadata: Optional[UserMetadata] = None,
    system_metadata: Optional[SystemMetadata] = None,
)

Class to define the important information for the event.

Parameters:

Name Type Description Default
keys list[str]

the keys of the event.

required
value bytes

the payload of the event.

required
event_time datetime

the event time of the event.

required
watermark datetime

the watermark of the event.

required
headers Optional[dict[str, str]]

the headers of the event.

None
user_metadata Optional[UserMetadata]

the user metadata of the event.

None
system_metadata Optional[SystemMetadata]

the system metadata of the event.

None

Example:

from pynumaflow.sourcetransformer import Datum
from datetime import datetime, timezone

d = Datum(
        keys=["test_key"],
        value=b"test_mock_message",
        event_time=datetime.fromtimestamp(1662998400, timezone.utc),
        watermark=datetime.fromtimestamp(1662998460, timezone.utc),
        headers={"key1": "value1", "key2": "value2"},
   )

Source code in pynumaflow/sourcetransformer/_dtypes.py
def __init__(
    self,
    keys: list[str],
    value: bytes,
    event_time: datetime,
    watermark: datetime,
    headers: Optional[dict[str, str]] = None,
    user_metadata: Optional[UserMetadata] = None,
    system_metadata: Optional[SystemMetadata] = None,
):
    self._keys = keys or list()
    self._value = value or b""
    if not isinstance(event_time, datetime):
        raise TypeError(f"Wrong data type: {type(event_time)} for Datum.event_time")
    self._event_time = event_time
    if not isinstance(watermark, datetime):
        raise TypeError(f"Wrong data type: {type(watermark)} for Datum.watermark")
    self._watermark = watermark
    self._headers = headers or {}
    self._user_metadata = user_metadata or UserMetadata()
    self._system_metadata = system_metadata or SystemMetadata()

keys property

keys: list[str]

Returns the keys of the event

value property

value: bytes

Returns the value of the event.

event_time property

event_time: datetime

Returns the event time of the event.

watermark property

watermark: datetime

Returns the watermark of the event.

headers property

headers: dict[str, str]

Returns the headers of the event.

user_metadata property

user_metadata: UserMetadata

Returns the user metadata of the event.

system_metadata property

system_metadata: SystemMetadata

Returns the system metadata of the event.

SourceTransformer

Provides an interface to write a Source Transformer which will be exposed over a GRPC server.

handler abstractmethod

handler(keys: list[str], datum: Datum) -> Messages

Implement this handler function which implements the SourceTransformCallable interface.

Source code in pynumaflow/sourcetransformer/_dtypes.py
@abstractmethod
def handler(self, keys: list[str], datum: Datum) -> Messages:
    """
    Implement this handler function which implements the
    SourceTransformCallable interface.
    """
    pass

SourceTransformMultiProcServer

SourceTransformMultiProcServer(
    source_transform_instance: SourceTransformCallable,
    server_count: int = _PROCESS_COUNT,
    sock_path=SOURCE_TRANSFORMER_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH,
)

Bases: NumaflowServer

Class for a new Source Transformer Server instance.

The server instance is returned.

Parameters:

Name Type Description Default
source_transform_instance SourceTransformCallable

The source transformer instance to be used for

required
sock_path

The UNIX socket path to be used for the server

SOURCE_TRANSFORMER_SOCK_PATH
server_count int

The number of grpc server instances to be forked for multiproc

_PROCESS_COUNT
max_message_size

The max message size in bytes the server can receive and send

MAX_MESSAGE_SIZE
max_threads

The max number of threads to be spawned; defaults to 4 and max capped at 16

NUM_THREADS_DEFAULT

Example invocation:

import datetime
import logging

from pynumaflow.sourcetransformer import Messages, Message, Datum, SourceTransformServer

# This is a simple User Defined Function example which receives a message,
# applies the following data transformation, and returns the message.
# If the message event time is before year 2022, drop the message
# with event time unchanged.
# If it's within year 2022, update the tag to "within_year_2022" and
# update the message event time to Jan 1st 2022.
# Otherwise, (exclusively after year 2022), update the tag to
# "after_year_2022" and update the


january_first_2022 = datetime.datetime.fromtimestamp(1640995200)
january_first_2023 = datetime.datetime.fromtimestamp(1672531200)


def my_handler(keys: list[str], datum: Datum) -> Messages:
    val = datum.value
    event_time = datum.event_time
    messages = Messages()

    if event_time < january_first_2022:
        logging.info("Got event time:%s, it is before 2022, so dropping", event_time)
        messages.append(Message.to_drop(event_time))
    elif event_time < january_first_2023:
        logging.info(
            "Got event time:%s, it is within year 2022, so
            forwarding to within_year_2022",
            event_time,
        )
        message = Message(
            value=val,
            event_time=january_first_2022,
            tags=["within_year_2022"],
        )
        messages.append(message)
    else:
        logging.info(
            "Got event time:%s, it is after year 2022, so forwarding to after_year_2022",
            event_time,
        )
        message = Message(
            value=val,
            event_time=january_first_2023,
            tags=["after_year_2022"],
        )
        messages.append(message)

    return messages

if __name__ == "__main__":
    grpc_server = SourceTransformMultiProcServer(
        source_transform_instance=my_handler,
        server_count=2,
    )
    grpc_server.start()

Source code in pynumaflow/sourcetransformer/multiproc_server.py
def __init__(
    self,
    source_transform_instance: SourceTransformCallable,
    server_count: int = _PROCESS_COUNT,
    sock_path=SOURCE_TRANSFORMER_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH,
):
    """
    Create a new grpc Source Transformer Multiproc Server instance.
    A new servicer instance is created and attached to the server.
    The server instance is returned.

    Args:
        source_transform_instance: The source transformer instance to be used for
        Source Transformer UDF
        sock_path: The UNIX socket path to be used for the server
        server_count: The number of grpc server instances to be forked for multiproc
        max_message_size: The max message size in bytes the server can receive and send
        max_threads: The max number of threads to be spawned;
                        defaults to 4 and max capped at 16

    Example invocation:
    ```py
    import datetime
    import logging

    from pynumaflow.sourcetransformer import Messages, Message, Datum, SourceTransformServer

    # This is a simple User Defined Function example which receives a message,
    # applies the following data transformation, and returns the message.
    # If the message event time is before year 2022, drop the message
    # with event time unchanged.
    # If it's within year 2022, update the tag to "within_year_2022" and
    # update the message event time to Jan 1st 2022.
    # Otherwise, (exclusively after year 2022), update the tag to
    # "after_year_2022" and update the


    january_first_2022 = datetime.datetime.fromtimestamp(1640995200)
    january_first_2023 = datetime.datetime.fromtimestamp(1672531200)


    def my_handler(keys: list[str], datum: Datum) -> Messages:
        val = datum.value
        event_time = datum.event_time
        messages = Messages()

        if event_time < january_first_2022:
            logging.info("Got event time:%s, it is before 2022, so dropping", event_time)
            messages.append(Message.to_drop(event_time))
        elif event_time < january_first_2023:
            logging.info(
                "Got event time:%s, it is within year 2022, so
                forwarding to within_year_2022",
                event_time,
            )
            message = Message(
                value=val,
                event_time=january_first_2022,
                tags=["within_year_2022"],
            )
            messages.append(message)
        else:
            logging.info(
                "Got event time:%s, it is after year 2022, so forwarding to after_year_2022",
                event_time,
            )
            message = Message(
                value=val,
                event_time=january_first_2023,
                tags=["after_year_2022"],
            )
            messages.append(message)

        return messages

    if __name__ == "__main__":
        grpc_server = SourceTransformMultiProcServer(
            source_transform_instance=my_handler,
            server_count=2,
        )
        grpc_server.start()
    ```
    """
    self.sock_path = f"unix://{sock_path}"
    self.max_threads = min(max_threads, MAX_NUM_THREADS)
    self.max_message_size = max_message_size
    self.server_info_file = server_info_file

    self.source_transform_instance = source_transform_instance

    self._server_options = [
        ("grpc.max_send_message_length", self.max_message_size),
        ("grpc.max_receive_message_length", self.max_message_size),
        ("grpc.so_reuseport", 1),
        ("grpc.so_reuseaddr", 1),
    ]
    # Set the number of processes to be spawned to the number of CPUs or
    # the value of the parameter server_count defined by the user
    # Setting the max value to 2 * CPU count
    # Used for multiproc server
    self._process_count = min(server_count, 2 * _PROCESS_COUNT)
    self.servicer = SourceTransformServicer(handler=source_transform_instance, multiproc=True)

start

start()

Starts the N gRPC servers on the given socket path with given max threads. Here N = The number of CPUs or the value of the parameter server_count defined by the user. The max value is capped to 2 * CPU count.

Source code in pynumaflow/sourcetransformer/multiproc_server.py
def start(self):
    """
    Starts the N gRPC servers on the given socket path with given max threads.
    Here N = The number of CPUs or the value of the parameter `server_count`
    defined by the user. The max value is capped to 2 * CPU count.
    """

    serv_info = ServerInfo.get_default_server_info()
    serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[
        ContainerType.Sourcetransformer
    ]
    start_multiproc_server(
        max_threads=self.max_threads,
        servicer=self.servicer,
        process_count=self._process_count,
        server_info_file=self.server_info_file,
        server_options=self._server_options,
        udf_type=UDFType.SourceTransformer,
        server_info=serv_info,
    )

SourceTransformServer

SourceTransformServer(
    source_transform_instance: SourceTransformCallable,
    sock_path=SOURCE_TRANSFORMER_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH,
)

Bases: NumaflowServer

Class for a new Source Transformer Server instance.

The server instance is returned.

Parameters:

Name Type Description Default
source_transform_instance SourceTransformCallable

The source transformer instance to be used for

required
sock_path

The UNIX socket path to be used for the server

SOURCE_TRANSFORMER_SOCK_PATH
max_message_size

The max message size in bytes the server can receive and send

MAX_MESSAGE_SIZE
max_threads

The max number of threads to be spawned; defaults to 4 and max capped at 16

NUM_THREADS_DEFAULT

Below is a simple User Defined Function example which receives a message, applies the following data transformation, and returns the message.

  • If the message event time is before year 2022, drop the message with event time unchanged.
  • If it's within year 2022, update the tag to within_year_2022 and update the message event time to Jan 1st 2022.
  • Otherwise, (exclusively after year 2022), update the tag to after_year_2022 and update the message event time to Jan 1st 2023.
import datetime
import logging

from pynumaflow.sourcetransformer import Messages, Message, Datum, SourceTransformServer


january_first_2022 = datetime.datetime.fromtimestamp(1640995200)
january_first_2023 = datetime.datetime.fromtimestamp(1672531200)


def my_handler(keys: list[str], datum: Datum) -> Messages:
    val = datum.value
    event_time = datum.event_time
    messages = Messages()

    if event_time < january_first_2022:
        logging.info("Got event time:%s, it is before 2022, so dropping", event_time)
        messages.append(Message.to_drop(event_time))
    elif event_time < january_first_2023:
        logging.info(
            "Got event time:%s, it is within year 2022, so forwarding to within_year_2022",
            event_time,
        )
        messages.append(
            Message(value=val, event_time=january_first_2022,
                    tags=["within_year_2022"])
        )
    else:
        logging.info(
            "Got event time:%s, it is after year 2022, so forwarding to
            after_year_2022", event_time
        )
        messages.append(Message(value=val, event_time=january_first_2023,
                        tags=["after_year_2022"]))

    return messages


if __name__ == "__main__":
    grpc_server = SourceTransformServer(my_handler)
    grpc_server.start()
Source code in pynumaflow/sourcetransformer/server.py
def __init__(
    self,
    source_transform_instance: SourceTransformCallable,
    sock_path=SOURCE_TRANSFORMER_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH,
):
    """
    Create a new grpc Source Transformer Server instance.
    A new servicer instance is created and attached to the server.
    The server instance is returned.

    Args:
        source_transform_instance: The source transformer instance to be used for
        Source Transformer UDF
        sock_path: The UNIX socket path to be used for the server
        max_message_size: The max message size in bytes the server can receive and send
        max_threads: The max number of threads to be spawned;
                        defaults to 4 and max capped at 16

    Below is a simple User Defined Function example which receives a message, applies the
    following data transformation, and returns the message.

    - If the message event time is before year 2022, drop the message with event time unchanged.
    - If it's within year 2022, update the tag to `within_year_2022` and update the message
      event time to Jan 1st 2022.
    - Otherwise, (exclusively after year 2022), update the tag to `after_year_2022` and update
      the message event time to Jan 1st 2023.

    ```py
    import datetime
    import logging

    from pynumaflow.sourcetransformer import Messages, Message, Datum, SourceTransformServer


    january_first_2022 = datetime.datetime.fromtimestamp(1640995200)
    january_first_2023 = datetime.datetime.fromtimestamp(1672531200)


    def my_handler(keys: list[str], datum: Datum) -> Messages:
        val = datum.value
        event_time = datum.event_time
        messages = Messages()

        if event_time < january_first_2022:
            logging.info("Got event time:%s, it is before 2022, so dropping", event_time)
            messages.append(Message.to_drop(event_time))
        elif event_time < january_first_2023:
            logging.info(
                "Got event time:%s, it is within year 2022, so forwarding to within_year_2022",
                event_time,
            )
            messages.append(
                Message(value=val, event_time=january_first_2022,
                        tags=["within_year_2022"])
            )
        else:
            logging.info(
                "Got event time:%s, it is after year 2022, so forwarding to
                after_year_2022", event_time
            )
            messages.append(Message(value=val, event_time=january_first_2023,
                            tags=["after_year_2022"]))

        return messages


    if __name__ == "__main__":
        grpc_server = SourceTransformServer(my_handler)
        grpc_server.start()
    ```
    """
    self.sock_path = f"unix://{sock_path}"
    self.max_threads = min(max_threads, MAX_NUM_THREADS)
    self.max_message_size = max_message_size
    self.server_info_file = server_info_file

    self.source_transform_instance = source_transform_instance

    self._server_options = [
        ("grpc.max_send_message_length", self.max_message_size),
        ("grpc.max_receive_message_length", self.max_message_size),
    ]
    self.servicer = SourceTransformServicer(handler=source_transform_instance)

start

start()

Starts the Synchronous gRPC server on the given UNIX socket with given max threads.

Source code in pynumaflow/sourcetransformer/server.py
def start(self):
    """
    Starts the Synchronous gRPC server on the given UNIX socket with given max threads.
    """
    _LOGGER.info(
        "Sync GRPC Server listening on: %s with max threads: %s",
        self.sock_path,
        self.max_threads,
    )
    serv_info = ServerInfo.get_default_server_info()
    serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[
        ContainerType.Sourcetransformer
    ]
    # Start the sync server
    sync_server_start(
        servicer=self.servicer,
        bind_address=self.sock_path,
        max_threads=self.max_threads,
        server_info_file=self.server_info_file,
        server_options=self._server_options,
        udf_type=UDFType.SourceTransformer,
        server_info=serv_info,
    )

SourceTransformAsyncServer

SourceTransformAsyncServer(
    source_transform_instance: SourceTransformAsyncCallable,
    sock_path=SOURCE_TRANSFORMER_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH,
)

Bases: NumaflowServer

Create a new grpc Source Transformer Server instance. A new servicer instance is created and attached to the server. The server instance is returned.

Parameters:

Name Type Description Default
source_transform_instance SourceTransformAsyncCallable

The source transformer instance to be used for

required
sock_path

The UNIX socket path to be used for the server

SOURCE_TRANSFORMER_SOCK_PATH
max_message_size

The max message size in bytes the server can receive and send

MAX_MESSAGE_SIZE
max_threads

The max number of threads to be spawned; defaults to 4 and max capped at 16

NUM_THREADS_DEFAULT

Below is a simple User Defined Function example which receives a message, applies the following data transformation, and returns the message.

  • If the message event time is before year 2022, drop the message with event time unchanged.
  • If it's within year 2022, update the tag to within_year_2022 and update the message event time to Jan 1st 2022.
  • Otherwise, (exclusively after year 2022), update the tag to after_year_2022 and update the message event time to Jan 1st 2023.
import datetime
import logging
from pynumaflow.sourcetransformer import Messages, Message, Datum, SourceTransformServer

january_first_2022 = datetime.datetime.fromtimestamp(1640995200)
january_first_2023 = datetime.datetime.fromtimestamp(1672531200)


async def my_handler(keys: list[str], datum: Datum) -> Messages:
    val = datum.value
    event_time = datum.event_time
    messages = Messages()

    if event_time < january_first_2022:
        logging.info("Got event time:%s, it is before 2022, so dropping", event_time)
        messages.append(Message.to_drop(event_time))
    elif event_time < january_first_2023:
        logging.info(
            "Got event time:%s, it is within year 2022, so forwarding to within_year_2022",
            event_time,
        )
        messages.append(
            Message(value=val, event_time=january_first_2022,
                    tags=["within_year_2022"])
        )
    else:
        logging.info(
            "Got event time:%s, it is after year 2022, so forwarding to
            after_year_2022", event_time
        )
        messages.append(Message(value=val, event_time=january_first_2023,
                        tags=["after_year_2022"]))

    return messages


if __name__ == "__main__":
    grpc_server = SourceTransformAsyncServer(my_handler)
    grpc_server.start()
Source code in pynumaflow/sourcetransformer/async_server.py
def __init__(
    self,
    source_transform_instance: SourceTransformAsyncCallable,
    sock_path=SOURCE_TRANSFORMER_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH,
):
    self.sock_path = f"unix://{sock_path}"
    self.max_threads = min(max_threads, MAX_NUM_THREADS)
    self.max_message_size = max_message_size
    self.server_info_file = server_info_file

    self.source_transform_instance = source_transform_instance

    self._server_options = [
        ("grpc.max_send_message_length", self.max_message_size),
        ("grpc.max_receive_message_length", self.max_message_size),
    ]
    self.servicer = SourceTransformAsyncServicer(handler=source_transform_instance)

start

start() -> None

Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context

Source code in pynumaflow/sourcetransformer/async_server.py
def start(self) -> None:
    """
    Starter function for the Async server class, need a separate caller
    so that all the async coroutines can be started from a single context
    """
    aiorun.run(self.aexec(), use_uvloop=True)

aexec async

aexec() -> None

Starts the Async gRPC server on the given UNIX socket with given max threads.

Source code in pynumaflow/sourcetransformer/async_server.py
async def aexec(self) -> None:
    """
    Starts the Async gRPC server on the given UNIX socket with
    given max threads.
    """

    # As the server is async, we need to create a new server instance in the
    # same thread as the event loop so that all the async calls are made in the
    # same context

    server_new = grpc.aio.server(options=self._server_options)
    server_new.add_insecure_port(self.sock_path)
    transform_pb2_grpc.add_SourceTransformServicer_to_server(self.servicer, server_new)

    serv_info = ServerInfo.get_default_server_info()
    serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[
        ContainerType.Sourcetransformer
    ]

    # Start the async server
    await start_async_server(
        server_async=server_new,
        sock_path=self.sock_path,
        max_threads=self.max_threads,
        cleanup_coroutines=list(),
        server_info_file=self.server_info_file,
        server_info=serv_info,
    )

UserMetadata dataclass

UserMetadata(_data: dict[str, dict[str, bytes]] = dict())

UserMetadata wraps the user-generated metadata groups per message. It is read-write to UDFs.

groups

groups() -> list[str]

Returns the list of group names for the user metadata.

Source code in pynumaflow/_metadata.py
def groups(self) -> list[str]:
    """
    Returns the list of group names for the user metadata.
    """
    return list(self._data.keys())

keys

keys(group: str) -> list[str]

Returns the list of keys for a given group.

Source code in pynumaflow/_metadata.py
def keys(self, group: str) -> list[str]:
    """
    Returns the list of keys for a given group.
    """
    keys = self._data.get(group) or {}
    return list(keys.keys())

value

value(group: str, key: str) -> Optional[bytes]

Returns the value for a given group and key. If the group or key does not exist, returns None.

Source code in pynumaflow/_metadata.py
def value(self, group: str, key: str) -> Optional[bytes]:
    """
    Returns the value for a given group and key.
    If the group or key does not exist, returns None.
    """
    value = self._data.get(group)
    if value is None:
        return None
    return value.get(key)

add_key

add_key(group: str, key: str, value: bytes)

Adds the value for a given group and key.

Source code in pynumaflow/_metadata.py
def add_key(self, group: str, key: str, value: bytes):
    """
    Adds the value for a given group and key.
    """
    self._data.setdefault(group, {})[key] = value

remove_key

remove_key(group: str, key: str) -> Optional[bytes]

Removes the key and its value for a given group and returns the value. If this key is the only key in the group, the group will be removed. Returns None if the group or key does not exist.

Source code in pynumaflow/_metadata.py
def remove_key(self, group: str, key: str) -> Optional[bytes]:
    """
    Removes the key and its value for a given group and returns the value.
    If this key is the only key in the group, the group will be removed.
    Returns None if the group or key does not exist.
    """
    group_data = self._data.pop(group, None)
    if group_data is None:
        return None
    value = group_data.pop(key, None)
    if group_data:
        self._data[group] = group_data
    return value

remove_group

remove_group(group: str) -> Optional[dict[str, bytes]]

Removes the group and all its keys and values and returns the data. Returns None if the group does not exist.

Source code in pynumaflow/_metadata.py
def remove_group(self, group: str) -> Optional[dict[str, bytes]]:
    """
    Removes the group and all its keys and values and returns the data.
    Returns None if the group does not exist.
    """
    return self._data.pop(group, None)

clear

clear()

Clears all the groups and all their keys and values.

Source code in pynumaflow/_metadata.py
def clear(self):
    """
    Clears all the groups and all their keys and values.
    """
    self._data.clear()

SystemMetadata dataclass

SystemMetadata(_data: dict[str, dict[str, bytes]] = dict())

System metadata is the mapping of group name to key-value pairs for a given group. System metadata wraps the system-generated metadata groups per message. It is read-only to UDFs.

groups

groups() -> list[str]

Returns the list of group names for the system metadata.

Source code in pynumaflow/_metadata.py
def groups(self) -> list[str]:
    """
    Returns the list of group names for the system metadata.
    """
    return list(self._data.keys())

keys

keys(group: str) -> list[str]

Returns the list of keys for a given group.

Source code in pynumaflow/_metadata.py
def keys(self, group: str) -> list[str]:
    """
    Returns the list of keys for a given group.
    """
    return list(self._data.get(group, {}).keys())

value

value(group: str, key: str) -> Optional[bytes]

Returns the value for a given group and key.

Source code in pynumaflow/_metadata.py
def value(self, group: str, key: str) -> Optional[bytes]:
    """
    Returns the value for a given group and key.
    """
    return self._data.get(group, {}).get(key)