Side Input¶
Side Input allows you to inject external data into your UDFs. This is useful for configuration, lookup tables, or any data that UDFs need but isn't part of the main data stream.
Classes¶
Response
dataclass
¶
Class to define the important information for the event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
bytes
|
the payload of the event. |
required |
no_broadcast
|
bool
|
the flag to indicate whether the event should be broadcasted. |
required |
Example usage
from pynumaflow.sideinput import Response
Response.broadcast_message(b"hello")
Response.no_broadcast_message()
broadcast_message
classmethod
¶
broadcast_message(value: bytes) -> R
Returns a SideInputResponse object with the given value, and the No broadcast flag set to False. This event will be broadcasted.
Source code in pynumaflow/sideinput/_dtypes.py
no_broadcast_message
classmethod
¶
Returns a SideInputResponse object with the No broadcast flag set to True. This event will not be broadcasted.
SideInput
¶
SideInputServer
¶
SideInputServer(
side_input_instance: RetrieverCallable,
sock_path=SIDE_INPUT_SOCK_PATH,
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
side_input_dir_path=SIDE_INPUT_DIR_PATH,
server_info_file=SIDE_INPUT_SERVER_INFO_FILE_PATH,
)
Bases: NumaflowServer
Class for a new Side Input Server instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
side_input_instance
|
RetrieverCallable
|
The side input instance to be used for Side Input UDF |
required |
sock_path
|
The UNIX socket path to be used for the server |
SIDE_INPUT_SOCK_PATH
|
|
max_message_size
|
The max message size in bytes the server can receive and send |
MAX_MESSAGE_SIZE
|
|
max_threads
|
The max number of threads to be spawned; |
NUM_THREADS_DEFAULT
|
Example invocation:
import datetime
from pynumaflow.sideinput import Response, SideInputServer, SideInput
class ExampleSideInput(SideInput):
def __init__(self):
self.counter = 0
def retrieve_handler(self) -> Response:
time_now = datetime.datetime.now()
# val is the value to be broadcasted
val = f"an example: {str(time_now)}"
self.counter += 1
# broadcast every other time
if self.counter % 2 == 0:
# no_broadcast_message() is used to indicate that there is no broadcast
return Response.no_broadcast_message()
# broadcast_message() is used to indicate that there is a broadcast
return Response.broadcast_message(val.encode("utf-8"))
if __name__ == "__main__":
grpc_server = SideInputServer(ExampleSideInput())
grpc_server.start()
Source code in pynumaflow/sideinput/server.py
start
¶
Starts the Synchronous gRPC server on the given UNIX socket with given max threads.