Accumulator¶
This module offers tools for accumulating and processing data while managing state. With it, you can:
- Accumulate data over time
- Maintain state across messages
- Process accumulated data
Classes¶
Message
dataclass
¶
Message(
value: bytes,
keys: list[str] = None,
tags: list[str] = None,
watermark: datetime = None,
event_time: datetime = None,
headers: dict[str, str] = None,
id: str = None,
)
Basic datatype for data passing to the next vertex/vertices.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
bytes
|
data in bytes |
required |
keys
|
list[str]
|
[]string keys for vertex (optional) |
None
|
tags
|
list[str]
|
[]string tags for conditional forwarding (optional) |
None
|
watermark
|
datetime
|
watermark for this message (optional) |
None
|
event_time
|
datetime
|
event time for this message (optional) |
None
|
headers
|
dict[str, str]
|
headers for this message (optional) |
None
|
id
|
str
|
message id (optional) |
None
|
Source code in pynumaflow/accumulator/_dtypes.py
value
property
¶
value: bytes
Returns the message payload value.
Returns:
| Type | Description |
|---|---|
bytes
|
The message payload data as bytes. |
keys
property
¶
tags
property
¶
watermark
property
¶
watermark: datetime
Returns the watermark timestamp for this message.
Returns:
| Type | Description |
|---|---|
datetime
|
The watermark timestamp, or None if not set. |
event_time
property
¶
event_time: datetime
Returns the event time for this message.
Returns:
| Type | Description |
|---|---|
datetime
|
The event time timestamp, or None if not set. |
headers
property
¶
to_drop
classmethod
¶
Creates a Message instance that indicates the message should be dropped.
Returns:
| Type | Description |
|---|---|
M
|
A Message instance with empty value and DROP tag indicating the message should be dropped. |
Source code in pynumaflow/accumulator/_dtypes.py
from_datum
classmethod
¶
from_datum(datum: Datum)
Create a Message instance from a Datum object.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
datum
|
Datum
|
The Datum object to convert |
required |
Returns:
| Type | Description |
|---|---|
Message
|
A new Message instance with data from the datum |
Source code in pynumaflow/accumulator/_dtypes.py
Datum
dataclass
¶
Datum(
keys: list[str],
value: bytes,
event_time: datetime,
watermark: datetime,
id_: str,
headers: Optional[dict[str, str]] = None,
)
Class to define the important information for the event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keys
|
list[str]
|
the keys of the event. |
required |
value
|
bytes
|
the payload of the event. |
required |
event_time
|
datetime
|
the event time of the event. |
required |
watermark
|
datetime
|
the watermark of the event. |
required |
Example usage
from pynumaflow.accumulator import Datum
from datetime import datetime, timezone
d = Datum(
keys=["test_key"],
value=b"test_mock_message",
event_time=datetime.fromtimestamp(1662998400, timezone.utc),
watermark=datetime.fromtimestamp(1662998460, timezone.utc),
headers={"key1": "value1", "key2": "value2"},
)
Source code in pynumaflow/accumulator/_dtypes.py
keys
property
¶
value
property
¶
value: bytes
Returns the value of the event.
Returns:
| Type | Description |
|---|---|
bytes
|
The payload data of the event as bytes. |
event_time
property
¶
event_time: datetime
Returns the event time of the event.
Returns:
| Type | Description |
|---|---|
datetime
|
The timestamp when the event occurred. |
watermark
property
¶
watermark: datetime
Returns the watermark of the event.
Returns:
| Type | Description |
|---|---|
datetime
|
The watermark timestamp indicating the progress of event time. |
headers
property
¶
IntervalWindow
dataclass
¶
KeyedWindow
dataclass
¶
Defines the window for a accumulator operation which includes the interval window along with the slot.
Source code in pynumaflow/accumulator/_dtypes.py
start
property
¶
start: datetime
Returns the start point of the interval window.
Returns:
| Type | Description |
|---|---|
datetime
|
The start timestamp of the interval window. |
end
property
¶
end: datetime
Returns the end point of the interval window.
Returns:
| Type | Description |
|---|---|
datetime
|
The end timestamp of the interval window. |
slot
property
¶
slot: str
Returns the slot from the window.
Returns:
| Type | Description |
|---|---|
str
|
The slot identifier for this window. |
window
property
¶
window: IntervalWindow
Returns the interval window.
Returns:
| Type | Description |
|---|---|
IntervalWindow
|
The underlying interval window object. |
Accumulator
¶
Accumulate can read unordered from the input stream and emit the ordered data to the output stream. Once the watermark (WM) of the output stream progresses, the data in WAL until that WM will be garbage collected. NOTE: A message can be silently dropped if need be, and it will be cleared from the WAL when the WM progresses.
handler
abstractmethod
async
¶
handler(
datums: AsyncIterable[Datum],
output: NonBlockingIterator,
)
Implement this handler function which implements the AccumulatorStreamCallable interface.
AccumulatorAsyncServer
¶
AccumulatorAsyncServer(
accumulator_instance: AccumulatorStreamCallable,
init_args: tuple = (),
init_kwargs: Optional[dict] = None,
sock_path=ACCUMULATOR_SOCK_PATH,
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=ACCUMULATOR_SERVER_INFO_FILE_PATH,
)
Bases: NumaflowServer
Class for a new Accumulator Server instance. A new servicer instance is created and attached to the server. The server instance is returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
accumulator_instance
|
AccumulatorStreamCallable
|
The accumulator instance to be used for Accumulator UDF |
required |
init_args
|
tuple
|
The arguments to be passed to the accumulator_handler |
()
|
init_kwargs
|
Optional[dict]
|
The keyword arguments to be passed to the accumulator_handler |
None
|
sock_path
|
The UNIX socket path to be used for the server |
ACCUMULATOR_SOCK_PATH
|
|
max_message_size
|
The max message size in bytes the server can receive and send |
MAX_MESSAGE_SIZE
|
|
max_threads
|
The max number of threads to be spawned; |
NUM_THREADS_DEFAULT
|
|
server_info_file
|
The path to the server info file |
ACCUMULATOR_SERVER_INFO_FILE_PATH
|
Example invocation:
import os
from collections.abc import AsyncIterable
from datetime import datetime
from pynumaflow.accumulator import Accumulator, AccumulatorAsyncServer
from pynumaflow.accumulator import Message, Datum
from pynumaflow.shared.asynciter import NonBlockingIterator
class StreamSorter(Accumulator):
def __init__(self, counter):
self.latest_wm = datetime.fromtimestamp(-1)
self.sorted_buffer: list[Datum] = []
async def handler(
self,
datums: AsyncIterable[Datum],
output: NonBlockingIterator,
):
async for _ in datums:
# Process the datums and send output
if datum.watermark and datum.watermark > self.latest_wm:
self.latest_wm = datum.watermark
await self.flush_buffer(output)
self.insert_sorted(datum)
def insert_sorted(self, datum: Datum):
# Binary insert to keep sorted buffer in order
left, right = 0, len(self.sorted_buffer)
while left < right:
mid = (left + right) // 2
if self.sorted_buffer[mid].event_time > datum.event_time:
right = mid
else:
left = mid + 1
self.sorted_buffer.insert(left, datum)
async def flush_buffer(self, output: NonBlockingIterator):
i = 0
for datum in self.sorted_buffer:
if datum.event_time > self.latest_wm:
break
await output.put(Message.from_datum(datum))
i += 1
# Remove flushed items
self.sorted_buffer = self.sorted_buffer[i:]
if __name__ == "__main__":
grpc_server = AccumulatorAsyncServer(StreamSorter)
grpc_server.start()
Source code in pynumaflow/accumulator/async_server.py
start
¶
Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context
Source code in pynumaflow/accumulator/async_server.py
aexec
async
¶
Starts the Async gRPC server on the given UNIX socket with given max threads.