Numaflow-JS is an SDK for Numaflow that provides the interfaces in JavaScript/TypeScript to implement different types of data processing tasks that Numaflow supports.
Currently, these include:
This SDK is powered by Numaflow Rust SDK through napi.rs to interact with Numaflow.
Replace npm with your favourite package manager (pnpm, yarn, etc.) in the following command to install the SDK.
npm install @numaproj/numaflow-js
The examples are available in the examples directory. The examples presented provide a basic overview of how to implement different types of data processing tasks using Numaflow-JS. Each example focuses implementing and building one specific component of Numaflow.
Most of the examples follow a similar structure:
Dockerfile: Contains the instructions to build the Docker image for the example.Makefile: Contains helper commands to build the Docker imageREADME.md: Provides details on implementing the concerned type of component and instructions to run the specific example.<example-name>.ts: Contains the TypeScript implementation of the example.<example-pipeline>.yaml: Contains the pipeline configuration which uses the image built from the specific example.In the implementation part of all the examples presented, i.e. in <example-name>.ts, the pattern is mostly similar.
We need to instantiate and start an async server for the respective component being implemented.
Eg: Implementing UD sink component:
sink.AsyncServer.sinkFn with a signature satisfying AsyncServer constructor.start method of AsyncServer.stop method of AsyncServer.Currently, source and session-reduce components require implementing all methods of an interface and passing an instance
of the same to their respective async server constructors. Rest of the components only require implementing a function with a signature
satisfying the constructor of the async server.
Following are the different ways to implement a function with a signature satisfying the constructor of the async server:
const sinkFn = (message: Message) => {
console.log(message)
}
const server = new sink.AsyncServer(sinkFn)
Still works if defined as part of a class.
class Sink {
counter = 0
sinkFn = (message: Message) => {
this.counter++
console.log(this.counter, message)
}
}
let sinker = new Sink()
const server = new sink.AsyncServer(sinker.sinkFn)
Simple named functions work the same way.
function sinkFn(message: Message) {
console.log(message)
}
const server = new sink.AsyncServer(sinkFn)
Named functions defined as part of a class may need to be bound to the instance of the class.
class Sink {
counter = 0
sinkFn(message: Message) {
this.counter++
console.log(this.counter, message)
}
}
let sinker = new Sink()
const server = new sink.AsyncServer(sinker.sinkFn.bind(sinker))
If any of the examples are failing to build or if they need further clarification, please create an issue to fix the same.