Source namespace for custom data sources.
Sources are the entry points to a Numaflow pipeline that read data from external systems. Use cases include:
import { source } from '@numaproj/numaflow-js';class MySource implements source.Sourcer { private counter = 0; async *read(request: source.ReadRequest) { for (let i = 0; i < request.numRecords; i++) { const offset: source.Offset = { offset: Buffer.from(String(this.counter)), partitionId: 0 }; yield new source.Message( Buffer.from(`message-${this.counter}`), offset, new Date(), ['key'], {} ); this.counter++; } } async ack(offsets: source.Offset[]): Promise<void> { // Acknowledge processed messages } async nack(offsets: source.Offset[]): Promise<void> { // Handle failed messages } async pending(): Promise<number | null> { return null; // Unknown pending count } async partitions(): Promise<number[] | null> { return [0]; // Single partition }}const server = new source.AsyncServer(new MySource());server.start(); Copy
import { source } from '@numaproj/numaflow-js';class MySource implements source.Sourcer { private counter = 0; async *read(request: source.ReadRequest) { for (let i = 0; i < request.numRecords; i++) { const offset: source.Offset = { offset: Buffer.from(String(this.counter)), partitionId: 0 }; yield new source.Message( Buffer.from(`message-${this.counter}`), offset, new Date(), ['key'], {} ); this.counter++; } } async ack(offsets: source.Offset[]): Promise<void> { // Acknowledge processed messages } async nack(offsets: source.Offset[]): Promise<void> { // Handle failed messages } async pending(): Promise<number | null> { return null; // Unknown pending count } async partitions(): Promise<number[] | null> { return [0]; // Single partition }}const server = new source.AsyncServer(new MySource());server.start();
Source namespace for custom data sources.
Sources are the entry points to a Numaflow pipeline that read data from external systems. Use cases include:
Example