Skip to main content
Version: 0.93.0

Migration Guide: New Service Discovery in 0.69.0

Introduction

As part of our roadmap towards a release 1.0, Apache StreamPipes 0.69.0 features a new service discovery approach along with performance improvements related to a new storage layer for pipeline element descriptions.

The new service discovery approach is better suited for cloud-native scenarios, as the hostname of a pipeline element is now decoupled from its description. As such, StreamPipes now supports recovery of pipeline elements independent from their assigned host. In addition, the new approach simplifies development of StreamPipes, e.g., in cases where the core is running in Docker and pipeline elements are developed on a local machine. In this case, the IP of the host machine should now be auto-discovered so that provision of environement variables should now be obsolete in most cases. The second large improvement is related to the replacement of RDF4J as the storage engine with a NoSQL database. This leads to much faster load times (you will notice this especially at system startup).

While we are heavily working towards a stable release 1.0, we decided to put our efforts into the remaining features required for 1.0 and do not provide an auto-migration related to some breaking changes. Therefore, we recommend to reinstall StreamPipes when updating to 0.69.0. We currently plan to have at most two more releases before releasing the first 1.x version of StreamPipes.

Installation

  • Before upgrading to 0.69.0, clean any existing installation (e.g., by running docker-compose down -v) and make sure that no volumes of StreamPipes are left.
  • Upgrade to the latest installer version (can be found at streampipes/installer)
  • Upon restart, make sure that the setup dialog appears (make sure that the new StreamPipes logo appears) and re-initialize the system.

SDK changes

0.69.0 comes with a new ServiceDefinitionBuilder for pipelines, which simplifies the definition of a pipeline element.

The ServiceDefinitionBuilder requires an ID of your extensions service, an optional title and description and a default port. It is best to provide 8090 as the default port, so that this will be the standard port of all StreamPipes extensions services at deployment time in a containerized environment. The port port can always be overriden by providing an SP_PORT environment variable.

Init class

Modify the Init class of your pipeline element service as follows:

public class ExamplesInit extends StandaloneModelSubmitter {

public static void main(String[] args) {
new ExamplesInit().init();
}

@Override
public SpServiceDefinition provideServiceDefinition() {
return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.examples.jvm",
"StreamPipes Code Examples",
"",
8090)
.registerMessagingProtocols(new SpKafkaProtocolFactory(), new SpJmsProtocolFactory())
.registerMessagingFormats(new JsonDataFormatFactory())
.registerPipelineElement(new MyPipelineElementController())
.registerAdapter(new MyAdapter())
.build();
}
}

You can now easily define a StreamPipes extensions service that supports both custom adapters and pipeline elements by using the following Maven dependency: This is optional and no changes to your existing Maven dependencies (except the version, e.g., 0.69.0-SNAPSHOT) are required.

<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-container-extensions</artifactId>
</dependency>

Configs

Prior to version 0.69.0, additionally configs had to be provided in a separate Config class. This is now obsolete - configs can be directly provided within the builder class as follows:


@Override
public SpServiceDefinition provideServiceDefinition() {
return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.examples.jvm",
"StreamPipes Code Examples",
"",
8090)
.registerPipelineElement(new MyPipelineElement())
.registerAdapter(new MyAdapter())
.addConfig("key", 1)
.addConfig("my-string-config", "myvalue")
.build();
}

Configs can be easily accessed from the EventProcessorRuntimeContext (or EventSinkRuntimeContext):

@Override
public void onInvocation(Parameters params,
SpOutputCollector spOutputCollector,
EventProcessorRuntimeContext context) {

Integer myConfigValue = context.getConfigStore().getConfig().getInteger("key");
}

Service Discovery

An extensions service can be started by executing the Init class. StreamPipes will now automatically select the proper service IP address and register the service in Consul. You can inspect the selected IP address in the console:

16:41:58.342 SP [main] INFO  o.a.s.commons.networking.Networking - Using auto-discovered IP: 172.30.80.1
16:41:58.364 SP [main] INFO o.a.s.commons.networking.Networking - Using port from provided environment variable SP_PORT: 6025
16:41:58.367 SP [main] INFO o.a.s.c.init.DeclarersSingleton - Registering 0 configs in key/value store
16:41:58.400 SP [main] INFO o.a.s.s.consul.ConsulProvider - Checking if consul is available...
16:41:58.419 SP [main] INFO o.a.s.s.consul.ConsulProvider - Successfully connected to Consul

In some (rare) cases, a non-resolvable IP will be selected. In this case, you can manually override the IP by providing a SP_HOST environment variable. This falls back to a similar behaviour as in pre-0.69.0-versions and will use the manually provided IP.