Session Reduce namespace for session-based aggregation.
Session reduce allows you to aggregate messages based on session windows, where a session is defined by a gap of inactivity. This is useful for:
import { sessionReduce } from '@numaproj/numaflow-js';class MySessionReducer implements sessionReduce.SessionReducer { private state: number[] = []; async *sessionReduceFn(keys: string[], datums: AsyncIterableIterator<sessionReduce.Datum>) { for await (const datum of datums) { this.state.push(parseInt(datum.value.toString())); } yield new sessionReduce.Message(Buffer.from(JSON.stringify({ keys, values: this.state }))); } async accumulatorFn(): Promise<Buffer> { return Buffer.from(JSON.stringify(this.state)); } async mergeAccumulatorFn(accumulator: Buffer): Promise<void> { const other = JSON.parse(accumulator.toString()); this.state.push(...other); }}const server = new sessionReduce.AsyncServer(new MySessionReducer());server.start(); Copy
import { sessionReduce } from '@numaproj/numaflow-js';class MySessionReducer implements sessionReduce.SessionReducer { private state: number[] = []; async *sessionReduceFn(keys: string[], datums: AsyncIterableIterator<sessionReduce.Datum>) { for await (const datum of datums) { this.state.push(parseInt(datum.value.toString())); } yield new sessionReduce.Message(Buffer.from(JSON.stringify({ keys, values: this.state }))); } async accumulatorFn(): Promise<Buffer> { return Buffer.from(JSON.stringify(this.state)); } async mergeAccumulatorFn(accumulator: Buffer): Promise<void> { const other = JSON.parse(accumulator.toString()); this.state.push(...other); }}const server = new sessionReduce.AsyncServer(new MySessionReducer());server.start();
Session Reduce namespace for session-based aggregation.
Session reduce allows you to aggregate messages based on session windows, where a session is defined by a gap of inactivity. This is useful for:
Example