Accumulator namespace for stateful message accumulation.
Accumulators allow you to maintain state across messages and emit results as an async iterable stream. This is useful for:
import { accumulator } from '@numaproj/numaflow-js';const server = new accumulator.AsyncServer(async function* (datums) { const buffer: accumulator.Datum[] = []; for await (const datum of datums) { buffer.push(datum); } // Sort by event time and emit buffer.sort((a, b) => a.eventTime.getTime() - b.eventTime.getTime()); for (const datum of buffer) { yield new accumulator.Message( datum.value, datum.id, datum.eventTime, datum.watermark, datum.headers ); }});server.start(); Copy
import { accumulator } from '@numaproj/numaflow-js';const server = new accumulator.AsyncServer(async function* (datums) { const buffer: accumulator.Datum[] = []; for await (const datum of datums) { buffer.push(datum); } // Sort by event time and emit buffer.sort((a, b) => a.eventTime.getTime() - b.eventTime.getTime()); for (const datum of buffer) { yield new accumulator.Message( datum.value, datum.id, datum.eventTime, datum.watermark, datum.headers ); }});server.start();
Accumulator namespace for stateful message accumulation.
Accumulators allow you to maintain state across messages and emit results as an async iterable stream. This is useful for:
Example