Skip to content

Side Input

Side Input allows you to inject external data into your UDFs. This is useful for configuration, lookup tables, or any data that UDFs need but isn't part of the main data stream.

Classes

Response dataclass

Response(value: bytes, no_broadcast: bool)

Class to define the important information for the event.

Parameters:

Name Type Description Default
value bytes

the payload of the event.

required
no_broadcast bool

the flag to indicate whether the event should be broadcasted.

required

Example usage

from pynumaflow.sideinput import Response

Response.broadcast_message(b"hello")
Response.no_broadcast_message()

broadcast_message classmethod

broadcast_message(value: bytes) -> R

Returns a SideInputResponse object with the given value, and the No broadcast flag set to False. This event will be broadcasted.

Source code in pynumaflow/sideinput/_dtypes.py
@classmethod
def broadcast_message(cls: type[R], value: bytes) -> R:
    """
    Returns a SideInputResponse object with the given value,
    and the No broadcast flag set to False. This event will be broadcasted.
    """
    return Response(value=value, no_broadcast=False)

no_broadcast_message classmethod

no_broadcast_message() -> R

Returns a SideInputResponse object with the No broadcast flag set to True. This event will not be broadcasted.

Source code in pynumaflow/sideinput/_dtypes.py
@classmethod
def no_broadcast_message(cls: type[R]) -> R:
    """
    Returns a SideInputResponse object with the No broadcast flag set to True.
    This event will not be broadcasted.
    """
    return Response(value=b"", no_broadcast=True)

SideInput

Provides an interface to write a SideInput Class which will be exposed over gRPC.

retrieve_handler abstractmethod

retrieve_handler() -> Response

This function is called when a Side Input request is received.

Source code in pynumaflow/sideinput/_dtypes.py
@abstractmethod
def retrieve_handler(self) -> Response:
    """
    This function is called when a Side Input request is received.
    """
    pass

SideInputServer

SideInputServer(
    side_input_instance: RetrieverCallable,
    sock_path=SIDE_INPUT_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    side_input_dir_path=SIDE_INPUT_DIR_PATH,
    server_info_file=SIDE_INPUT_SERVER_INFO_FILE_PATH,
)

Bases: NumaflowServer

Class for a new Side Input Server instance.

Parameters:

Name Type Description Default
side_input_instance RetrieverCallable

The side input instance to be used for Side Input UDF

required
sock_path

The UNIX socket path to be used for the server

SIDE_INPUT_SOCK_PATH
max_message_size

The max message size in bytes the server can receive and send

MAX_MESSAGE_SIZE
max_threads

The max number of threads to be spawned;

NUM_THREADS_DEFAULT

Example invocation:

import datetime
from pynumaflow.sideinput import Response, SideInputServer, SideInput

class ExampleSideInput(SideInput):
    def __init__(self):
        self.counter = 0

    def retrieve_handler(self) -> Response:
        time_now = datetime.datetime.now()
        # val is the value to be broadcasted
        val = f"an example: {str(time_now)}"
        self.counter += 1
        # broadcast every other time
        if self.counter % 2 == 0:
            # no_broadcast_message() is used to indicate that there is no broadcast
            return Response.no_broadcast_message()
        # broadcast_message() is used to indicate that there is a broadcast
        return Response.broadcast_message(val.encode("utf-8"))

if __name__ == "__main__":
    grpc_server = SideInputServer(ExampleSideInput())
    grpc_server.start()

Source code in pynumaflow/sideinput/server.py
def __init__(
    self,
    side_input_instance: RetrieverCallable,
    sock_path=SIDE_INPUT_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    side_input_dir_path=SIDE_INPUT_DIR_PATH,
    server_info_file=SIDE_INPUT_SERVER_INFO_FILE_PATH,
):
    self.sock_path = f"unix://{sock_path}"
    self.max_threads = min(max_threads, MAX_NUM_THREADS)
    self.max_message_size = max_message_size
    self.server_info_file = server_info_file

    self._server_options = [
        ("grpc.max_send_message_length", self.max_message_size),
        ("grpc.max_receive_message_length", self.max_message_size),
    ]

    self.side_input_instance = side_input_instance
    self.side_input_dir_path = side_input_dir_path
    self.servicer = SideInputServicer(side_input_instance)

start

start()

Starts the Synchronous gRPC server on the given UNIX socket with given max threads.

Source code in pynumaflow/sideinput/server.py
def start(self):
    """
    Starts the Synchronous gRPC server on the given UNIX socket with given max threads.
    """
    # Get the servicer instance based on the server type
    side_input_servicer = self.servicer

    _LOGGER.info(
        "Side Input GRPC Server listening on: %s with max threads: %s",
        self.sock_path,
        self.max_threads,
    )

    serv_info = ServerInfo.get_default_server_info()
    serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[ContainerType.Sideinput]
    # Start the server
    sync_server_start(
        servicer=side_input_servicer,
        bind_address=self.sock_path,
        max_threads=self.max_threads,
        server_info_file=self.server_info_file,
        server_options=self._server_options,
        udf_type=UDFType.SideInput,
        server_info=serv_info,
    )