Reduce Stream namespace for streaming aggregation results.
Similar to Reduce, but outputs results as an async iterable stream instead of an array. This is useful when:
import { reduceStream } from '@numaproj/numaflow-js';const server = new reduceStream.AsyncServer(async function* (keys, datums, metadata) { let runningSum = 0; for await (const datum of datums) { runningSum += parseInt(datum.value.toString()); // Emit running total yield new reduceStream.Message(Buffer.from(JSON.stringify({ keys, runningSum, window: metadata.intervalWindow }))); }});server.start(); Copy
import { reduceStream } from '@numaproj/numaflow-js';const server = new reduceStream.AsyncServer(async function* (keys, datums, metadata) { let runningSum = 0; for await (const datum of datums) { runningSum += parseInt(datum.value.toString()); // Emit running total yield new reduceStream.Message(Buffer.from(JSON.stringify({ keys, runningSum, window: metadata.intervalWindow }))); }});server.start();
Reduce Stream namespace for streaming aggregation results.
Similar to Reduce, but outputs results as an async iterable stream instead of an array. This is useful when:
Example