Reduce namespace for aggregating messages by key over time windows.
Reduce operations aggregate multiple messages that share the same key within a time window. This is essential for:
import { reduce } from '@numaproj/numaflow-js';const server = new reduce.AsyncServer(async (keys, datums, metadata) => { let sum = 0; let count = 0; for await (const datum of datums) { const value = parseInt(datum.value.toString()); sum += value; count++; } return [new reduce.Message(Buffer.from(JSON.stringify({ keys, sum, count, window: metadata.intervalWindow })))];});server.start(); Copy
import { reduce } from '@numaproj/numaflow-js';const server = new reduce.AsyncServer(async (keys, datums, metadata) => { let sum = 0; let count = 0; for await (const datum of datums) { const value = parseInt(datum.value.toString()); sum += value; count++; } return [new reduce.Message(Buffer.from(JSON.stringify({ keys, sum, count, window: metadata.intervalWindow })))];});server.start();
Reduce namespace for aggregating messages by key over time windows.
Reduce operations aggregate multiple messages that share the same key within a time window. This is essential for:
Example