Skip to content

Sourcer

The Sourcer module provides classes and functions for implementing User Defined Sources that produce messages for Numaflow pipelines.

Classes

Message dataclass

Message(
    payload: bytes,
    offset: Offset,
    event_time: datetime,
    keys: Optional[list[str]] = None,
    headers: Optional[dict[str, str]] = None,
    user_metadata: Optional[UserMetadata] = None,
)

Basic datatype for data passing to the next vertex/vertices.

Parameters:

Name Type Description Default
payload bytes

data in bytes

required
offset Offset

the offset of the datum.

required
event_time datetime

event time of the message, usually extracted from the payload.

required
keys Optional[list[str]]

list of string keys for the vertex (optional)

None
headers Optional[dict[str, str]]

dict of headers for the message (optional)

None
user_metadata Optional[UserMetadata]

metadata for the message (optional)

None
Source code in pynumaflow/sourcer/_dtypes.py
def __init__(
    self,
    payload: bytes,
    offset: Offset,
    event_time: datetime,
    keys: Optional[list[str]] = None,
    headers: Optional[dict[str, str]] = None,
    user_metadata: Optional[UserMetadata] = None,
):
    """
    Creates a Message object to send value to a vertex.
    """
    self._payload = payload
    self._offset = offset
    self._event_time = event_time
    self._keys = keys or []
    self._headers = headers or {}
    self._user_metadata = user_metadata or UserMetadata()

user_metadata property

user_metadata: UserMetadata

Returns the user metadata of the message.

ReadRequest dataclass

ReadRequest(num_records: int, timeout_in_ms: int)

Class to define the request for reading datum stream from user defined source.

Parameters:

Name Type Description Default
num_records int

the number of records to read.

required
timeout_in_ms int

the request timeout in milliseconds.

required

Example:

from pynumaflow.sourcer import ReadRequest
read_request = ReadRequest(num_records=10, timeout_in_ms=1000)

Source code in pynumaflow/sourcer/_dtypes.py
def __init__(
    self,
    num_records: int,
    timeout_in_ms: int,
):
    if not isinstance(num_records, int):
        raise TypeError(f"Wrong data type: {type(num_records)} for ReadRequest.num_records")
    self._num_records = num_records
    if not isinstance(timeout_in_ms, int):
        raise TypeError(f"Wrong data type: {type(timeout_in_ms)} for ReadRequest.timeout_in_ms")
    self._timeout_in_ms = timeout_in_ms

num_records property

num_records: int

Returns the num_records of the request

timeout_in_ms property

timeout_in_ms: int

Returns the timeout_in_ms of the request.

PendingResponse dataclass

PendingResponse(count: int)

PendingResponse is the response for the pending request. It indicates the number of pending records at the user defined source. A negative count indicates that the pending information is not available.

Parameters:

Name Type Description Default
count int

the number of pending records.

required
Source code in pynumaflow/sourcer/_dtypes.py
def __init__(self, count: int):
    if not isinstance(count, int):
        raise TypeError(f"Wrong data type: {type(count)} for Pending.count")
    self._count = count

count property

count: int

Returns the count of pending records

AckRequest dataclass

AckRequest(offsets: list[Offset])

Class for defining the request for acknowledging datum. It takes a list of offsets that need to be acknowledged.

Parameters:

Name Type Description Default
offsets list[Offset]

the offsets to be acknowledged.

required

Example:

from pynumaflow.sourcer import AckRequest, Offset

offset_val = Offset(offset=b"123", partition_id=0)
ack_request = AckRequest(offsets=[offset_val, offset_val])

Source code in pynumaflow/sourcer/_dtypes.py
def __init__(self, offsets: list[Offset]):
    self._offsets = offsets

offsets property

offsets: list[Offset]

Returns the offsets to be acknowledged.

NackRequest dataclass

NackRequest(offsets: list[Offset])

Class for defining the request for negatively acknowledging an offset. It takes a list of offsets that need to be negatively acknowledged on the source.

Parameters:

Name Type Description Default
offsets list[Offset]

the offsets to be negatively acknowledged.

required

Example:

from pynumaflow.sourcer import NackRequest, Offset
offset_val = Offset(offset=b"123", partition_id=0)
nack_request = NackRequest(offsets=[offset_val, offset_val])

Source code in pynumaflow/sourcer/_dtypes.py
def __init__(self, offsets: list[Offset]):
    self._offsets = offsets

offsets property

offsets: list[Offset]

Returns the offsets to be negatively acknowledged.

Offset dataclass

Offset(offset: bytes, partition_id: int)

Parameters:

Name Type Description Default
offset bytes

the offset of the datum.

required
partition_id int

partition_id indicates which partition of the source the datum belongs to.

required
Source code in pynumaflow/sourcer/_dtypes.py
def __init__(self, offset: bytes, partition_id: int):
    self._offset = offset
    self._partition_id = partition_id

offset_with_default_partition_id classmethod

offset_with_default_partition_id(offset: bytes)

Returns an Offset object with the given offset and default partition id.

Source code in pynumaflow/sourcer/_dtypes.py
@classmethod
def offset_with_default_partition_id(cls, offset: bytes):
    """
    Returns an Offset object with the given offset and default partition id.
    """
    return Offset(offset=offset, partition_id=get_default_partitions()[0])

PartitionsResponse dataclass

PartitionsResponse(partitions: list[int])

PartitionsResponse is the response for the partition request. It indicates the number of partitions at the user defined source. A negative count indicates that the partition information is not available.

Parameters:

Name Type Description Default
count

the number of partitions.

required
Source code in pynumaflow/sourcer/_dtypes.py
def __init__(self, partitions: list[int]):
    if not isinstance(partitions, list):
        raise TypeError(f"Wrong data type: {type(partitions)} for Partition.partitions")
    self._partitions = partitions

partitions property

partitions: list[int]

Returns the list of partitions

Sourcer

Provides an interface to write a Sourcer which will be exposed over an gRPC server.

read_handler abstractmethod async

read_handler(
    datum: ReadRequest, output: NonBlockingIterator
)

Implement this handler function which implements the SourceReadCallable interface. read_handler is used to read the data from the source and send the data forward for each read request we process num_records and increment the read_idx to indicate that the message has been read and the same is added to the ack set

Source code in pynumaflow/sourcer/_dtypes.py
@abstractmethod
async def read_handler(self, datum: ReadRequest, output: NonBlockingIterator):
    """
    Implement this handler function which implements the SourceReadCallable interface.
    read_handler is used to read the data from the source and send the data forward
    for each read request we process num_records and increment the read_idx to indicate that
    the message has been read and the same is added to the ack set
    """
    pass

ack_handler abstractmethod async

ack_handler(ack_request: AckRequest)

The ack handler is used to acknowledge the offsets that have been read

Source code in pynumaflow/sourcer/_dtypes.py
@abstractmethod
async def ack_handler(self, ack_request: AckRequest):
    """
    The ack handler is used to acknowledge the offsets that have been read
    """
    pass

nack_handler abstractmethod async

nack_handler(nack_request: NackRequest)

The nack handler is used to negatively acknowledge the offsets on the source

Source code in pynumaflow/sourcer/_dtypes.py
@abstractmethod
async def nack_handler(self, nack_request: NackRequest):
    """
    The nack handler is used to negatively acknowledge the offsets on the source
    """
    pass

pending_handler abstractmethod async

pending_handler() -> PendingResponse

The simple source always returns zero to indicate there is no pending record.

Source code in pynumaflow/sourcer/_dtypes.py
@abstractmethod
async def pending_handler(self) -> PendingResponse:
    """
    The simple source always returns zero to indicate there is no pending record.
    """
    pass

partitions_handler abstractmethod async

partitions_handler() -> PartitionsResponse

The simple source always returns zero to indicate there is no pending record.

Source code in pynumaflow/sourcer/_dtypes.py
@abstractmethod
async def partitions_handler(self) -> PartitionsResponse:
    """
    The simple source always returns zero to indicate there is no pending record.
    """
    pass

UserMetadata dataclass

UserMetadata(_data: dict[str, dict[str, bytes]] = dict())

UserMetadata wraps the user-generated metadata groups per message. It is read-write to UDFs.

groups

groups() -> list[str]

Returns the list of group names for the user metadata.

Source code in pynumaflow/_metadata.py
def groups(self) -> list[str]:
    """
    Returns the list of group names for the user metadata.
    """
    return list(self._data.keys())

keys

keys(group: str) -> list[str]

Returns the list of keys for a given group.

Source code in pynumaflow/_metadata.py
def keys(self, group: str) -> list[str]:
    """
    Returns the list of keys for a given group.
    """
    keys = self._data.get(group) or {}
    return list(keys.keys())

value

value(group: str, key: str) -> Optional[bytes]

Returns the value for a given group and key. If the group or key does not exist, returns None.

Source code in pynumaflow/_metadata.py
def value(self, group: str, key: str) -> Optional[bytes]:
    """
    Returns the value for a given group and key.
    If the group or key does not exist, returns None.
    """
    value = self._data.get(group)
    if value is None:
        return None
    return value.get(key)

add_key

add_key(group: str, key: str, value: bytes)

Adds the value for a given group and key.

Source code in pynumaflow/_metadata.py
def add_key(self, group: str, key: str, value: bytes):
    """
    Adds the value for a given group and key.
    """
    self._data.setdefault(group, {})[key] = value

remove_key

remove_key(group: str, key: str) -> Optional[bytes]

Removes the key and its value for a given group and returns the value. If this key is the only key in the group, the group will be removed. Returns None if the group or key does not exist.

Source code in pynumaflow/_metadata.py
def remove_key(self, group: str, key: str) -> Optional[bytes]:
    """
    Removes the key and its value for a given group and returns the value.
    If this key is the only key in the group, the group will be removed.
    Returns None if the group or key does not exist.
    """
    group_data = self._data.pop(group, None)
    if group_data is None:
        return None
    value = group_data.pop(key, None)
    if group_data:
        self._data[group] = group_data
    return value

remove_group

remove_group(group: str) -> Optional[dict[str, bytes]]

Removes the group and all its keys and values and returns the data. Returns None if the group does not exist.

Source code in pynumaflow/_metadata.py
def remove_group(self, group: str) -> Optional[dict[str, bytes]]:
    """
    Removes the group and all its keys and values and returns the data.
    Returns None if the group does not exist.
    """
    return self._data.pop(group, None)

clear

clear()

Clears all the groups and all their keys and values.

Source code in pynumaflow/_metadata.py
def clear(self):
    """
    Clears all the groups and all their keys and values.
    """
    self._data.clear()

SourceAsyncServer

SourceAsyncServer(
    sourcer_instance: SourceCallable,
    sock_path=SOURCE_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=SOURCE_SERVER_INFO_FILE_PATH,
)

Bases: NumaflowServer

Class for a new Async Source Server instance.

The server instance is returned.

Parameters:

Name Type Description Default
sourcer_instance SourceCallable

The sourcer instance to be used for Source UDF

required
sock_path

The UNIX socket path to be used for the server

SOURCE_SOCK_PATH
max_message_size

The max message size in bytes the server can receive and send

MAX_MESSAGE_SIZE
max_threads

The max number of threads to be spawned; defaults to 4 and max capped at 16

NUM_THREADS_DEFAULT

Example invocation:

from datetime import datetime
from pynumaflow.shared.asynciter import NonBlockingIterator
from pynumaflow.sourcer import (
    ReadRequest,
    Message,
    AckRequest,
    PendingResponse,
    Offset,
    PartitionsResponse,
    get_default_partitions,
    Sourcer,
    SourceAsyncServer,
    NackRequest,
)

class AsyncSource(Sourcer):
    # AsyncSource is a class for User Defined Source implementation.

    def __init__(self):
        # The offset idx till where the messages have been read
        self.read_idx: int = 0
        # Set to maintain a track of the offsets yet to be acknowledged
        self.to_ack_set: set[int] = set()
        # Set to maintain a track of the offsets that have been negatively acknowledged
        self.nacked: set[int] = set()

    async def read_handler(self, datum: ReadRequest, output: NonBlockingIterator):
        '''
        read_handler is used to read the data from the source and send the data forward
        for each read request we process num_records and increment the read_idx to
        indicate that the message has been read and the same is added to the ack set
        '''
        if self.to_ack_set:
            return

        for x in range(datum.num_records):
            # If there are any nacked offsets, re-deliver them
            if self.nacked:
                idx = self.nacked.pop()
            else:
                idx = self.read_idx
                self.read_idx += 1
            headers = {"x-txn-id": str(uuid.uuid4())}
            await output.put(
                Message(
                    payload=str(self.read_idx).encode(),
                    offset=Offset.offset_with_default_partition_id(str(idx).encode()),
                    event_time=datetime.now(),
                    headers=headers,
                )
            )
            self.to_ack_set.add(idx)

    async def ack_handler(self, ack_request: AckRequest):
        '''
        The ack handler is used acknowledge the offsets that have been read, and remove
        them from the to_ack_set
        '''
        for req in ack_request.offsets:
            offset = int(req.offset)
            self.to_ack_set.remove(offset)

    async def nack_handler(self, ack_request: NackRequest):
        '''
        Add the offsets that have been negatively acknowledged to the nacked set
        '''
        for req in ack_request.offsets:
            offset = int(req.offset)
            self.to_ack_set.remove(offset)
            self.nacked.add(offset)

    async def pending_handler(self) -> PendingResponse:
        '''
        The simple source always returns zero to indicate there is no pending record.
        '''
        return PendingResponse(count=0)

    async def partitions_handler(self) -> PartitionsResponse:
        '''
        The simple source always returns default partitions.
        '''
        return PartitionsResponse(partitions=get_default_partitions())


if __name__ == "__main__":
    ud_source = AsyncSource()
    grpc_server = SourceAsyncServer(ud_source)
    grpc_server.start()

Source code in pynumaflow/sourcer/async_server.py
def __init__(
    self,
    sourcer_instance: SourceCallable,
    sock_path=SOURCE_SOCK_PATH,
    max_message_size=MAX_MESSAGE_SIZE,
    max_threads=NUM_THREADS_DEFAULT,
    server_info_file=SOURCE_SERVER_INFO_FILE_PATH,
):
    """
    Create a new grpc Async Source Server instance.
    A new servicer instance is created and attached to the server.
    The server instance is returned.

    Args:
        sourcer_instance: The sourcer instance to be used for Source UDF
        sock_path: The UNIX socket path to be used for the server
        max_message_size: The max message size in bytes the server can receive and send
        max_threads: The max number of threads to be spawned;
                        defaults to 4 and max capped at 16

    Example invocation:
    ```py
    from datetime import datetime
    from pynumaflow.shared.asynciter import NonBlockingIterator
    from pynumaflow.sourcer import (
        ReadRequest,
        Message,
        AckRequest,
        PendingResponse,
        Offset,
        PartitionsResponse,
        get_default_partitions,
        Sourcer,
        SourceAsyncServer,
        NackRequest,
    )

    class AsyncSource(Sourcer):
        # AsyncSource is a class for User Defined Source implementation.

        def __init__(self):
            # The offset idx till where the messages have been read
            self.read_idx: int = 0
            # Set to maintain a track of the offsets yet to be acknowledged
            self.to_ack_set: set[int] = set()
            # Set to maintain a track of the offsets that have been negatively acknowledged
            self.nacked: set[int] = set()

        async def read_handler(self, datum: ReadRequest, output: NonBlockingIterator):
            '''
            read_handler is used to read the data from the source and send the data forward
            for each read request we process num_records and increment the read_idx to
            indicate that the message has been read and the same is added to the ack set
            '''
            if self.to_ack_set:
                return

            for x in range(datum.num_records):
                # If there are any nacked offsets, re-deliver them
                if self.nacked:
                    idx = self.nacked.pop()
                else:
                    idx = self.read_idx
                    self.read_idx += 1
                headers = {"x-txn-id": str(uuid.uuid4())}
                await output.put(
                    Message(
                        payload=str(self.read_idx).encode(),
                        offset=Offset.offset_with_default_partition_id(str(idx).encode()),
                        event_time=datetime.now(),
                        headers=headers,
                    )
                )
                self.to_ack_set.add(idx)

        async def ack_handler(self, ack_request: AckRequest):
            '''
            The ack handler is used acknowledge the offsets that have been read, and remove
            them from the to_ack_set
            '''
            for req in ack_request.offsets:
                offset = int(req.offset)
                self.to_ack_set.remove(offset)

        async def nack_handler(self, ack_request: NackRequest):
            '''
            Add the offsets that have been negatively acknowledged to the nacked set
            '''
            for req in ack_request.offsets:
                offset = int(req.offset)
                self.to_ack_set.remove(offset)
                self.nacked.add(offset)

        async def pending_handler(self) -> PendingResponse:
            '''
            The simple source always returns zero to indicate there is no pending record.
            '''
            return PendingResponse(count=0)

        async def partitions_handler(self) -> PartitionsResponse:
            '''
            The simple source always returns default partitions.
            '''
            return PartitionsResponse(partitions=get_default_partitions())


    if __name__ == "__main__":
        ud_source = AsyncSource()
        grpc_server = SourceAsyncServer(ud_source)
        grpc_server.start()
    ```
    """
    self.sock_path = f"unix://{sock_path}"
    self.max_threads = min(max_threads, MAX_NUM_THREADS)
    self.max_message_size = max_message_size
    self.server_info_file = server_info_file

    self.sourcer_instance = sourcer_instance

    self._server_options = [
        ("grpc.max_send_message_length", self.max_message_size),
        ("grpc.max_receive_message_length", self.max_message_size),
    ]

    self.servicer = AsyncSourceServicer(source_handler=sourcer_instance)

start

start()

Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context

Source code in pynumaflow/sourcer/async_server.py
def start(self):
    """
    Starter function for the Async server class, need a separate caller
    so that all the async coroutines can be started from a single context
    """
    aiorun.run(self.aexec(), use_uvloop=True)

aexec async

aexec()

Starts the Async gRPC server on the given UNIX socket with given max threads

Source code in pynumaflow/sourcer/async_server.py
async def aexec(self):
    """
    Starts the Async gRPC server on the given UNIX socket with given max threads
    """
    # As the server is async, we need to create a new server instance in the
    # same thread as the event loop so that all the async calls are made in the
    # same context
    # Create a new async server instance and add the servicer to it
    server = grpc.aio.server(options=self._server_options)
    server.add_insecure_port(self.sock_path)
    source_servicer = self.servicer
    source_pb2_grpc.add_SourceServicer_to_server(source_servicer, server)

    serv_info = ServerInfo.get_default_server_info()
    serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[ContainerType.Sourcer]
    # Start the async server
    await start_async_server(
        server_async=server,
        sock_path=self.sock_path,
        max_threads=self.max_threads,
        cleanup_coroutines=list(),
        server_info_file=self.server_info_file,
        server_info=serv_info,
    )

get_default_partitions

get_default_partitions() -> list[int]

Returns the default partition ids.

Source code in pynumaflow/sourcer/_dtypes.py
def get_default_partitions() -> list[int]:
    """
    Returns the default partition ids.
    """
    return [DefaultPartitionId]