Tutorial: Data Sources
In this tutorial, we will add a new data source consisting of a single data stream. The source will be provided as a standalone component (i.e., the description will be accessible through an integrated web server).
Objective
We are going to create a new data stream that is produced by a GPS sensor installed in a delivery vehicle. The sensor produces a continuous stream of events that contain the current timestamp, the current lat/lng position of the vehicle and the plate number of the vehicle. Events are published in a JSON format as follows:
{
"timestamp" : 145838399,
"latitude" : 37.04,
"longitude" : 17.04,
"plateNumber" : "KA-AB 123"
}
These events are published to a Kafka broker using the topic org.streampipes.tutorial.vehicle
.
In the following section, we show how to describe this stream in a form that allows you to import and use it in StreamPipes.
Project setup
Instead of creating a new project from scratch, we recommend to use the Maven archetype to create a new project skeleton (streampipes-archetype-extensions-jvm). Enter the following command in a command line of your choice (Apache Maven needs to be installed):
mvn archetype:generate \
-DarchetypeGroupId=org.apache.streampipes -DarchetypeArtifactId=streampipes-archetype-extensions-jvm \
-DarchetypeVersion=0.70.0 -DgroupId=my.groupId \
-DartifactId=my-source -DclassNamePrefix=MySource -DpackageName=mypackagename
You will see a project structure similar to the structure shown in the archetypes section.
Besides the basic project skeleton, the sample project also includes an example Dockerfile you can use to package your application into a Docker container.
Adding a data stream description
Now we will add a new data stream definition.
First, create a new class MyVehicleStream
which should look as follows:
package org.apache.streampipes.pe.example;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.sources.AbstractAdapterIncludedStream;
public class MyVehicleStream extends AbstractAdapterIncludedStream {
@Override
public SpDataStream declareModel() {
return null;
}
@Override
public void executeStream() {
}
}
This class extends the class AbstractAdapterIncludedStream
, which indicates that this source continuously produces data (configured in the executeStream()
method.
In contrast, the class AbstractAlreadyExistingStream
indicates that we only want to describe an already existing stream (e.g., a stream that already sends data to an existing Kafka broker).
Next, we will add the definition of the data stream. Add the following code inside of the declareModel
method:
return DataStreamBuilder.create("org.apache.streampipes.tutorial.vehicle.position", "Vehicle Position", "An event stream " +
"that produces current vehicle positions")
This line creates a new instance of the SDK's DataStreamBuilder
by providing three basic parameters:
The first parameter must be a unique identifier of your data stream.
The second and third parameters indicate a label and a description of your stream.
These values will later be used in the StreamPipes UI to display stream details in a human-readable manner.
Next, we will add the properties as stated above to the stream definition by adding the following lines:
.property(EpProperties.timestampProperty("timestamp"))
.property(EpProperties.stringEp(Labels.from("plate-number", "Plate Number", "Denotes the plate number of the vehicle"), "plateNumber", "http://my.company/plateNumber"))
.property(EpProperties.doubleEp(Labels.from("latitude", "Latitude", "Denotes the latitude value of the vehicle's position"), "latitude", Geo.lat))
.property(EpProperties.doubleEp(Labels.from("longitude", "Longitude", "Denotes the longitude value of the vehicle's position"), "longitude", Geo.lng))
These four event properties compose our event schema. An event property must, at least, provide the following attributes:
- Runtime Name. The runtime name indicates the key of the property at runtime, e.g., if our JSON message contains a structure such as
{"plateNumber" : "KA-F 123"}
, the runtime name must beplateNumber
. - Runtime Type. An event property must have a primitive type (we will later see how to model more complex properties such as lists and nested properties).
The type must be an instance of
XMLSchema
primitives, however, the SDK provides convenience methods to provide the property type. - Domain Property. The domain property indicates the semantics of the event property. For instance, the
latitude
property is linked to thehttp://www.w3.org/2003/01/geo/wgs84_pos#lat
property of the WGS84 vocabulary. The domain property should be an URI as part of an existing or domain-specific vocabulary. The SDK provides convenience methods for popuplar vocabularies (e.g., Schema.org, Dolce or WGS84).
In order to complete the minimum required specification of an event stream, we need to provide information on the transport format and protocol of the data stream at runtime.
This can be achieved by extending the builder with the respective properties:
.format(Formats.jsonFormat())
.protocol(Protocols.kafka("localhost", 9094, "org.apache.streampipes.tutoria.vehicle"))
.build();
In this example, we defined that the data stream consists of events in a JSON format and that Kafka is used as a message broker to transmit events. The last build() method call triggers the construction of the data stream definition.
That's it! In the next section, we will connect the data stream to a source and inspect the generated RDF description.
Creating some dummy data
Let's assume our stream should produce some random values that are sent to StreamPipes. We'll add a very simple data simulator to the executeStream
method as follows:
@Override
public void executeStream() {
SpKafkaProducer producer = new SpKafkaProducer("localhost:9094", "my-topic", Collections.emptyList());
Random random = new Random();
Runnable runnable = () -> {
for (;;) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("timestamp", System.currentTimeMillis());
jsonObject.addProperty("plateNumber", "KA-SP 1");
jsonObject.addProperty("latitude", random.nextDouble());
jsonObject.addProperty("longitude", random.nextDouble());
producer.publish(jsonObject.toString());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
new Thread(runnable).start();
}
Change the topic and the URL of your Kafka broker (as stated in the controller).
Registering the data stream
You need to register the stream in the service definition. Open the Init
class and register the MyVehicleStream
:
@Override
public SpServiceDefinition provideServiceDefinition() {
return SpServiceDefinitionBuilder.create("org.apache.streampipes",
"human-readable service name",
"human-readable service description", 8090)
.registerPipelineElement(new ExampleDataProcessor())
.registerPipelineElement(new ExampleDataSink())
.registerPipelineElement(new MyVehicleStream())
.registerMessagingFormats(
new JsonDataFormatFactory(),
new CborDataFormatFactory(),
new SmileDataFormatFactory(),
new FstDataFormatFactory())
.registerMessagingProtocols(
new SpKafkaProtocolFactory(),
new SpJmsProtocolFactory(),
new SpMqttProtocolFactory())
.build();
}
You can remove the other two example classes if you want.
Starting the service
Once you start the service, it will register in StreamPipes with the hostname. The hostname will be auto-discovered and should work out-of-the-box. In some cases, the detected hostname is not resolvable from within a container (where the core is running). In this case, provide a SP_HOST environment variable to override the auto-discovery.
Now we are ready to start our first container!
Configure your IDE to provide an environment variable called SP_DEBUG
with value true
when starting the project.
The service automatically registers itself in StreamPipes.
To install the just created element, open the StreamPipes UI and install the source over the Install Pipeline Elements
section.
Read more
Congratulations! You've just created your first pipeline element for StreamPipes. There are many more things to explore and data sources can be defined in much more detail. Also consider that in some cases, you would like to create a configurable adapter, where a data source can be configured by users in the UI when connecting data.