Sourcer¶
The Sourcer module provides classes and functions for implementing User Defined Sources that produce messages for Numaflow pipelines.
Classes¶
Message
dataclass
¶
Message(
payload: bytes,
offset: Offset,
event_time: datetime,
keys: Optional[list[str]] = None,
headers: Optional[dict[str, str]] = None,
user_metadata: Optional[UserMetadata] = None,
)
Basic datatype for data passing to the next vertex/vertices.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload
|
bytes
|
data in bytes |
required |
offset
|
Offset
|
the offset of the datum. |
required |
event_time
|
datetime
|
event time of the message, usually extracted from the payload. |
required |
keys
|
Optional[list[str]]
|
list of string keys for the vertex (optional) |
None
|
headers
|
Optional[dict[str, str]]
|
dict of headers for the message (optional) |
None
|
user_metadata
|
Optional[UserMetadata]
|
metadata for the message (optional) |
None
|
Source code in pynumaflow/sourcer/_dtypes.py
ReadRequest
dataclass
¶
Class to define the request for reading datum stream from user defined source.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
num_records
|
int
|
the number of records to read. |
required |
timeout_in_ms
|
int
|
the request timeout in milliseconds. |
required |
Example:
from pynumaflow.sourcer import ReadRequest
read_request = ReadRequest(num_records=10, timeout_in_ms=1000)
Source code in pynumaflow/sourcer/_dtypes.py
PendingResponse
dataclass
¶
PendingResponse(count: int)
PendingResponse is the response for the pending request. It indicates the number of pending records at the user defined source. A negative count indicates that the pending information is not available.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
count
|
int
|
the number of pending records. |
required |
Source code in pynumaflow/sourcer/_dtypes.py
AckRequest
dataclass
¶
Class for defining the request for acknowledging datum. It takes a list of offsets that need to be acknowledged.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
offsets
|
list[Offset]
|
the offsets to be acknowledged. |
required |
Example:
from pynumaflow.sourcer import AckRequest, Offset
offset_val = Offset(offset=b"123", partition_id=0)
ack_request = AckRequest(offsets=[offset_val, offset_val])
Source code in pynumaflow/sourcer/_dtypes.py
NackRequest
dataclass
¶
Class for defining the request for negatively acknowledging an offset. It takes a list of offsets that need to be negatively acknowledged on the source.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
offsets
|
list[Offset]
|
the offsets to be negatively acknowledged. |
required |
Example:
from pynumaflow.sourcer import NackRequest, Offset
offset_val = Offset(offset=b"123", partition_id=0)
nack_request = NackRequest(offsets=[offset_val, offset_val])
Source code in pynumaflow/sourcer/_dtypes.py
Offset
dataclass
¶
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
offset
|
bytes
|
the offset of the datum. |
required |
partition_id
|
int
|
partition_id indicates which partition of the source the datum belongs to. |
required |
Source code in pynumaflow/sourcer/_dtypes.py
PartitionsResponse
dataclass
¶
PartitionsResponse is the response for the partition request. It indicates the number of partitions at the user defined source. A negative count indicates that the partition information is not available.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
count
|
the number of partitions. |
required |
Source code in pynumaflow/sourcer/_dtypes.py
Sourcer
¶
Provides an interface to write a Sourcer which will be exposed over an gRPC server.
read_handler
abstractmethod
async
¶
read_handler(
datum: ReadRequest, output: NonBlockingIterator
)
Implement this handler function which implements the SourceReadCallable interface. read_handler is used to read the data from the source and send the data forward for each read request we process num_records and increment the read_idx to indicate that the message has been read and the same is added to the ack set
Source code in pynumaflow/sourcer/_dtypes.py
ack_handler
abstractmethod
async
¶
ack_handler(ack_request: AckRequest)
nack_handler
abstractmethod
async
¶
nack_handler(nack_request: NackRequest)
The nack handler is used to negatively acknowledge the offsets on the source
pending_handler
abstractmethod
async
¶
pending_handler() -> PendingResponse
The simple source always returns zero to indicate there is no pending record.
partitions_handler
abstractmethod
async
¶
partitions_handler() -> PartitionsResponse
The simple source always returns zero to indicate there is no pending record.
UserMetadata
dataclass
¶
UserMetadata wraps the user-generated metadata groups per message. It is read-write to UDFs.
groups
¶
keys
¶
value
¶
Returns the value for a given group and key. If the group or key does not exist, returns None.
Source code in pynumaflow/_metadata.py
add_key
¶
remove_key
¶
Removes the key and its value for a given group and returns the value. If this key is the only key in the group, the group will be removed. Returns None if the group or key does not exist.
Source code in pynumaflow/_metadata.py
remove_group
¶
Removes the group and all its keys and values and returns the data. Returns None if the group does not exist.
SourceAsyncServer
¶
SourceAsyncServer(
sourcer_instance: SourceCallable,
sock_path=SOURCE_SOCK_PATH,
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=SOURCE_SERVER_INFO_FILE_PATH,
)
Bases: NumaflowServer
Class for a new Async Source Server instance.
The server instance is returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sourcer_instance
|
SourceCallable
|
The sourcer instance to be used for Source UDF |
required |
sock_path
|
The UNIX socket path to be used for the server |
SOURCE_SOCK_PATH
|
|
max_message_size
|
The max message size in bytes the server can receive and send |
MAX_MESSAGE_SIZE
|
|
max_threads
|
The max number of threads to be spawned; defaults to 4 and max capped at 16 |
NUM_THREADS_DEFAULT
|
Example invocation:
from datetime import datetime
from pynumaflow.shared.asynciter import NonBlockingIterator
from pynumaflow.sourcer import (
ReadRequest,
Message,
AckRequest,
PendingResponse,
Offset,
PartitionsResponse,
get_default_partitions,
Sourcer,
SourceAsyncServer,
NackRequest,
)
class AsyncSource(Sourcer):
# AsyncSource is a class for User Defined Source implementation.
def __init__(self):
# The offset idx till where the messages have been read
self.read_idx: int = 0
# Set to maintain a track of the offsets yet to be acknowledged
self.to_ack_set: set[int] = set()
# Set to maintain a track of the offsets that have been negatively acknowledged
self.nacked: set[int] = set()
async def read_handler(self, datum: ReadRequest, output: NonBlockingIterator):
'''
read_handler is used to read the data from the source and send the data forward
for each read request we process num_records and increment the read_idx to
indicate that the message has been read and the same is added to the ack set
'''
if self.to_ack_set:
return
for x in range(datum.num_records):
# If there are any nacked offsets, re-deliver them
if self.nacked:
idx = self.nacked.pop()
else:
idx = self.read_idx
self.read_idx += 1
headers = {"x-txn-id": str(uuid.uuid4())}
await output.put(
Message(
payload=str(self.read_idx).encode(),
offset=Offset.offset_with_default_partition_id(str(idx).encode()),
event_time=datetime.now(),
headers=headers,
)
)
self.to_ack_set.add(idx)
async def ack_handler(self, ack_request: AckRequest):
'''
The ack handler is used acknowledge the offsets that have been read, and remove
them from the to_ack_set
'''
for req in ack_request.offsets:
offset = int(req.offset)
self.to_ack_set.remove(offset)
async def nack_handler(self, ack_request: NackRequest):
'''
Add the offsets that have been negatively acknowledged to the nacked set
'''
for req in ack_request.offsets:
offset = int(req.offset)
self.to_ack_set.remove(offset)
self.nacked.add(offset)
async def pending_handler(self) -> PendingResponse:
'''
The simple source always returns zero to indicate there is no pending record.
'''
return PendingResponse(count=0)
async def partitions_handler(self) -> PartitionsResponse:
'''
The simple source always returns default partitions.
'''
return PartitionsResponse(partitions=get_default_partitions())
if __name__ == "__main__":
ud_source = AsyncSource()
grpc_server = SourceAsyncServer(ud_source)
grpc_server.start()
Source code in pynumaflow/sourcer/async_server.py
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | |
start
¶
Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context
aexec
async
¶
Starts the Async gRPC server on the given UNIX socket with given max threads