This protocol implementation provides an abstraction layer for connecting to InfluxDB which is a time series database.
As a time series database InfluxDB store data in collections called ‘measurements’ and each ‘measurement’ stores ‘points’ which in turn are formed by a timestamp, a value, fields and tags to achieve different groupings.
This version of the protocol supports the Flux query language for InfluxDB.
Below you can see configuration parameters and examples on how to build a commissioning file for this protocol.
To read data from InfluxDB, an endpoint has to be defined with either read or subscribe properties; a valid Flux query needs to be provided as part of the endpoint configuration using the field query. Subscribe works by defining a polling interval, hence the query will be executed on a regular basis. The result of the query is provided in JSON format on the MQTT broker.
The provided Flux query supports value interpolation by using tags of the type ‘@tag’.
In the previous example requesting a read providing values for ‘bucket’, ‘measurement’ and ‘startMeasurementTime’ will generate a valid Flux query.
Output Format on Read
When data is read from InfluxDB results are published to the /res topic of the Endpoint. When a subscription is configured the results are published to the Endpoint default topic.
The output in both cases will be provided as a JSON array representing the InfluxDB query result.
To write data to InfluxDB, an endpoint with a write property has to be defined. If a measurement property is set, it is used by default for all data points sent to the endpoint. This property can also be overridden by providing a property measurement in the data message.
To write data you must send an MQTT message, to the /set topic of the Endpoint, like the following (with a measurement property set in the data message):
Both tags and fields are supported and if the timestamp is not present it is assigned by InfluxDB.
It is also possible to write several data points per message. To do this, they just have to be sent as an array.
Note that, respecting InfluxDB client design, the writes are asynchronous and data is written to influx based on the value of the InfluxDB connection property flushInterval which by default is 10 seconds.
Output Format on Write
When data is written to an InfluxDB Endpoint a message is published to the /res topic of the Endpoint. The output message is an object with two properties:
timestamp: is the unix timestamp, in milliseconds, of when the write was executed
value: is set to true when the write was successful
Sample Commissioning file
Download:
description:| Sample InfluxDB commissioning filemetadata:name:Cybus InfluxDB Exampleprovider:cybushomepage:https://cybus.ioversion:1.0.0#------------------------------------------------------------------------------# Parameters#------------------------------------------------------------------------------parameters:influxHost:type:stringdescription:'HTTP address of InfluxDB server'default:'influxdbhost'influxPort:type:integerdescription:'Influx Port'default:8086influxScheme:type:'string'description:'Either use http or https for the server url'default:'http'#------------------------------------------------------------------------------# Resources#------------------------------------------------------------------------------resources:#----------------------------------------------------------------------------# Connections#----------------------------------------------------------------------------influxdbConnection:type:Cybus::Connectionproperties:protocol:Influxdbconnection:host:!refinfluxHosttoken:'-an-influx-db-jwt-token-'port:!refinfluxPortbucket:turbinescheme:!refinfluxSchemeflushInterval:5000#----------------------------------------------------------------------------# Endpoints#----------------------------------------------------------------------------turbineWrite:type:Cybus::Endpointproperties:protocol:Influxdbconnection:!refinfluxdbConnectionwrite:measurement:'turbine'rotaryEncoderWrite:type:Cybus::Endpointproperties:protocol:Influxdbconnection:!refinfluxdbConnectionwrite:measurement:'rotary_encoder'rotary_encoder_angle:type:Cybus::Endpointproperties:protocol:Influxdbconnection:!refinfluxdbConnectionsubscribe:interval:6000query:> from(bucket:"turbine") |> range(start: -1d) |> filter(fn: (r) => r._measurement == "rotary_encoder")
#---------------------------------------------------------------------------- # Mappings #---------------------------------------------------------------------------- # A mapping that overrides the measurement value with the name of the MQTT topic mappings: type: Cybus::Mapping properties: mappings: - subscribe: topic: 'turbine/#' publish: endpoint: !ref turbineWrite rules: - transform: # Add the topic as measurement expression: '$merge([$,{"measurement": $context.topic}])' # A mapping that will pass data from the MQTT topic to the write endpoint # allowing overriding the measurement by providing it in the topic - subscribe: topic: 'encoder/#' publish: endpoint: !ref rotaryEncoderWrite