SDK Guide: Event Model
Introduction
This guide explains the usage of the event model to manipulate runtime events for data processors and data sink.
Prerequisites
This guide assumes that you are already familiar with the basic setup of data processors.
Property Selectors
In most cases, fields that are subject to be transformed by pipeline elements are provided by the assigned MappingProperty
(see the guide on static properties).
Mapping properties return a PropertySelector
that identifies a field based on (i) the streamIndex and (ii) the runtime name of the field.
Let's assume we have an event with the following structure:
{
"timestamp" : 1234556,
"temperature" : 37.0,
"deviceId" : "sensor1",
"running" : true,
"location" : {"latitude" : 34.4, "longitude" : -47},
"lastValues" : [45, 22, 21]
}
In addition, we assume that a data processor exists (with one input node) that converts the temperature value (measured in degrees celsius) to a degree fahrenheit value.
In this case, a mapping property (selected by the pipeline developer in the StreamPipes UI) would link to the temperature
field of the event.
The mapping property value will be the PropertySelector
of the temperature value, which looks as follows:
s0::temperature
s0
identifies the stream (in this case, only one input streams exist, but as data processors might require more than one input stream, a stream identifier is required), while the appendix identifies the runtime name.
Note: If you add a new field to an input event, you don't need to provide the selector, you can just assign the runtime name as defined by the output strategy.
Reading Fields
You can get a field from an event by providing the corresponding selector:
@Override
public void onEvent(Event event, SpOutputCollector out) {
PrimitiveField temperatureField = event.getFieldBySelector(PROPERTY_SELECTOR).getAsPrimitive();
}
Similarly, if your mapping property links to a nested property, use
@Override
public void onEvent(Event event, SpOutputCollector out) {
NestedField nestedField = event.getFieldBySelector(PROPERTY_SELECTOR).getAsNested();
}
and for a list-based field:
@Override
public void onEvent(Event event, SpOutputCollector out) {
ListField listField = event.getFieldBySelector(PROPERTY_SELECTOR).getAsList();
}
Parsing Fields
Primitive Fields
A PrimitiveField
contains convenience methods to directly cast a field to the target datatype:
// parse the value as a float datatype
Float temperatureValue = event.getFieldBySelector(temperatureSelector).getAsPrimitive().getAsFloat();
// or do the same with a double datatype
Double temperatureValue = event.getFieldBySelector(temperatureSelector).getAsPrimitive().getAsDouble();
// extracting a string
String deviceId = event.getFieldBySelector(deviceIdSelector).getAsPrimitive().getAsString();
// this also works for extracting fields from nested fields:
Double latitude = event.getFieldBySelector(latitudeSelector).getAsPrimitive().getAsDouble();
// extracting boolean values
Boolean running = event.getFieldBySelector(runningSelector).getAsPrimitive().getAsBoolean();
In rare cases, you might want to receive a field directly based on the runtime name as follows:
Double temperature = event.getFieldByRuntimeName("temperature").getAsPrimitive().getAsDouble();
List Fields
Lists can also be retrieved by providing the corresponding selector and can automatically be parsed to a list of primitive datatypes:
List<Integer> lastValues = event.getFieldBySelector(lastValueSelector).getAsList().parseAsSimpleType(Integer.class);
(coming soon: parsing complex lists)
Adding/Updating Fields
Primitive fields can easily be added to an event by providing the runtime name and the object:
// add a primitive field with runtime name "city" and value "Karlsruhe"
event.addField("city", "Karlsruhe");
// remove the field "temperature" from the event
event.removeFieldBySelector(temperatureSelector);
// add a new field
event.addField("fahrenheit", 48);