Skip to main content
Version: 0.91.0

SDK Guide: Functions

Introduction

Pipeline elements such as data processors and data sinks are a great way to create reusable components that can be part of pipelines. However, creating a pipeline element is not always the best choice:

  • The behaviour of a data processor is bound to a specific input stream and
  • A data processor doesn't contain any user-defined configuration and
  • The intended action is fixed or known at build time and the data processor shouldn't be available in the pipeline editor.

To cover such use cases, we provide StreamPipes Functions. Functions are a great way to define custom processing logic based on previously connected data streams.

Functions can be registered in a similar way to pipeline elements, but define expected input streams at startup time. Functions are started once the corresponding extensions service starts and run until the service is stopped.

Writing a function

Work in Progress

Functions are currently in preview mode and are not yet recommended for production usage. APIs are subject to change in a future version.

To define a function, create a new extensions service using the Maven Archetypes or use an already existing service.

Skeleton

Functions can be defined by creating a new class which extends the StreamPipesFunction class.

The basic skeleton looks like this:

public class StreamPipesFunctionExample extends StreamPipesFunction {

@Override
public FunctionId getFunctionId() {
return FunctionId.from("my-function-id", 1);
}

@Override
public List<String> requiredStreamIds() {
return List.of("<id of the required stream>");
}

@Override
public void onServiceStarted(FunctionContext context) {
// called when the service is started
}

@Override
public void onEvent(Event event, String streamId) {
// called when an event arrives
}

@Override
public void onServiceStopped() {
// called when the service is stopped
}
}

The structure of a function class is easy to understand:

  • getFunctionId requires an identifier in form of a FunctionId, which defines the id itself along with a version number that can be freely chosen.
  • requiredStreamIds expects a list of references to data streams that are already available in StreamPipes. See below to learn how to find the id of a stream in StreamPipes.
  • onServiceStarted is called once the extensions service is started and can be used to initialize the function.
  • onEvent is called every time a new event arrives and provides a streamId as a reference to the corresponding stream, which is useful in case multiple data streams are received by the function.
  • onServiceStopped is called when the extensions service is stopped and can be used to perform any required cleanup.

Getting a stream ID

Functions require a reference to all data streams that should be retrieved by the function. Currently, the only way to get the ID of a function is by navigating to the Asset Management view in the StreamPipes UI. Create a new asset, click on Edit Asset and open Add Link in the Linked Resources panel. Choose Data Source as link type, select one of the available sources, copy the Resource ID and provide this ID in the requiredStreamIds method.

Function Context

The onServiceStarted method provides a function context which provides several convenience methods to work with functions:

  • getFunctionId returns the current function identifier
  • getConfig returns a reference to configuration options of the extensions service
  • getClient returns a reference to the StreamPipes client to interact with features from the REST API.
  • getStreams returns the data model of all data streams defined in the requiredStreamIds method.
  • getSchema returns the schema of a specific data stream by providing the streamId

Registering a function

Registering a function is easy and can be done in the Init class of the service. E.g., considering a service definition as illustrated below, simply call registerFunction and provide an instance of your function.


@Override
public SpServiceDefinition provideServiceDefinition() {
return SpServiceDefinitionBuilder.create("my-service-id",
"StreamPipes Function Example",
"",
8090)
.registerFunction(new MyExampleFunction())
.registerMessagingFormats(
new JsonDataFormatFactory())
.registerMessagingProtocols(
new SpNatsProtocolFactory())
.build();
}

Metrics & Monitoring

Similar to pipeline elements, function register at the StreamPipes core. Running functions can be seen in the pipeline view of the user interface under Functions, right below the list of available pipelines. Similar to pipelines, simple metrics, monitoring info and exceptions can be viewed in the Details section of each function.