SDK Guide: Functions
Introduction
Pipeline elements such as data processors and data sinks are a great way to create reusable components that can be part of pipelines. However, creating a pipeline element is not always the best choice:
- The behaviour of a data processor is bound to a specific input stream and
- A data processor doesn't contain any user-defined configuration and
- The intended action is fixed or known at build time and the data processor shouldn't be available in the pipeline editor.
To cover such use cases, we provide StreamPipes Functions. Functions are a great way to define custom processing logic based on previously connected data streams.
Functions can be registered in a similar way to pipeline elements, but define expected input streams at startup time. Functions are started once the corresponding extensions service starts and run until the service is stopped.
Writing a function
Functions are currently in preview mode and are not yet recommended for production usage. APIs are subject to change in a future version.
To define a function, create a new extensions service using the Maven Archetypes or use an already existing service.
Skeleton
Functions can be defined by creating a new class which extends the StreamPipesFunction
class.
The basic skeleton looks like this:
public class StreamPipesFunctionExample extends StreamPipesFunction {
@Override
public FunctionId getFunctionId() {
return FunctionId.from("my-function-id", 1);
}
@Override
public List<String> requiredStreamIds() {
return List.of("<id of the required stream>");
}
@Override
public void onServiceStarted(FunctionContext context) {
// called when the service is started
}
@Override
public void onEvent(Event event, String streamId) {
// called when an event arrives
}
@Override
public void onServiceStopped() {
// called when the service is stopped
}
}
The structure of a function class is easy to understand:
- getFunctionId requires an identifier in form of a
FunctionId
, which defines the id itself along with a version number that can be freely chosen. - requiredStreamIds expects a list of references to data streams that are already available in StreamPipes. See below to learn how to find the id of a stream in StreamPipes.
- onServiceStarted is called once the extensions service is started and can be used to initialize the function.
- onEvent is called every time a new event arrives and provides a
streamId
as a reference to the corresponding stream, which is useful in case multiple data streams are received by the function. - onServiceStopped is called when the extensions service is stopped and can be used to perform any required cleanup.
Getting a stream ID
Functions require a reference to all data streams that should be retrieved by the function.
Currently, the only way to get the ID of a function is by navigating to the Asset Management
view in the StreamPipes UI.
Create a new asset, click on Edit Asset
and open Add Link
in the Linked Resources panel.
Choose Data Source
as link type, select one of the available sources, copy the Resource ID
and provide this ID in the requiredStreamIds
method.
Function Context
The onServiceStarted
method provides a function context which provides several convenience methods to work with functions:
- getFunctionId returns the current function identifier
- getConfig returns a reference to configuration options of the extensions service
- getClient returns a reference to the StreamPipes client to interact with features from the REST API.
- getStreams returns the data model of all data streams defined in the
requiredStreamIds
method. - getSchema returns the schema of a specific data stream by providing the
streamId
Registering a function
Registering a function is easy and can be done in the Init class of the service.
E.g., considering a service definition as illustrated below, simply call registerFunction
and
provide an instance of your function.
@Override
public SpServiceDefinition provideServiceDefinition() {
return SpServiceDefinitionBuilder.create("my-service-id",
"StreamPipes Function Example",
"",
8090)
.registerFunction(new MyExampleFunction())
.registerMessagingFormats(
new JsonDataFormatFactory())
.registerMessagingProtocols(
new SpNatsProtocolFactory())
.build();
}
Metrics & Monitoring
Similar to pipeline elements, function register at the StreamPipes core. Running functions can be seen in the pipeline view of the user interface under Functions, right below the list of available pipelines. Similar to pipelines, simple metrics, monitoring info and exceptions can be viewed in the Details section of each function.