@numaproj/numaflow-js - v0.0.0-alpha.4
    Preparing search index...

    Namespace sessionReduce

    Session Reduce namespace for session-based aggregation.

    Session reduce allows you to aggregate messages based on session windows, where a session is defined by a gap of inactivity. This is useful for:

    • User session analysis
    • Activity-based grouping
    • Event sequences with natural breaks
    import { sessionReduce } from '@numaproj/numaflow-js';

    class MySessionReducer implements sessionReduce.SessionReducer {
    private state: number[] = [];

    async *sessionReduceFn(keys: string[], datums: AsyncIterableIterator<sessionReduce.Datum>) {
    for await (const datum of datums) {
    this.state.push(parseInt(datum.value.toString()));
    }
    yield new sessionReduce.Message(Buffer.from(JSON.stringify({
    keys,
    values: this.state
    })));
    }

    async accumulatorFn(): Promise<Buffer> {
    return Buffer.from(JSON.stringify(this.state));
    }

    async mergeAccumulatorFn(accumulator: Buffer): Promise<void> {
    const other = JSON.parse(accumulator.toString());
    this.state.push(...other);
    }
    }

    const server = new sessionReduce.AsyncServer(new MySessionReducer());
    server.start();

    Classes

    AsyncServer
    Message

    Interfaces

    Datum
    MessageOptions
    SessionReducer

    Type Aliases

    AccumulatorFnCallback
    MergeAccumulatorFnCallback
    SessionReduceFnCallback