InfluxDB

This protocol implementation provides an abstraction layer for connecting to InfluxDB which is a time series database.

As a time series database InfluxDB store data in collections called ‘measurements’ and each ‘measurement’ stores ‘points’ which in turn are formed by a timestamp, a value, fields and tags to achieve different groupings.

This version of the protocol supports the Flux query language for InfluxDB.

Below you can see configuration parameters and examples on how to build a commissioning file for this protocol.

Connection Properties

Endpoint Properties

Reading Data

To read data from InfluxDB, an endpoint has to be defined with either read or subscribe properties; a valid Flux query needs to be provided as part of the endpoint configuration using the field query. Subscribe works by defining a polling interval, hence the query will be executed on a regular basis. The result of the query is provided in JSON format on the MQTT broker.

The provided Flux query supports value interpolation by using tags of the type ‘@tag’.

from(bucket:”@bucket”) |> range(start: @startMeasurementTime) |> filter(fn: (r) => r.\_measurement == “@measurement”)

In the previous example requesting a read providing values for ‘bucket’, ‘measurement’ and ‘startMeasurementTime’ will generate a valid Flux query.

Output Format on Read

When data is read from InfluxDB results are published to the /res topic of the Endpoint. When a subscription is configured the results are published to the Endpoint default topic.

The output in both cases will be provided as a JSON array representing the InfluxDB query result.

[
    {
        'result': '_result',
        'table': 0,
        '_start': '2021-02-14T09:29:24.514083303Z',
        '_stop': '2021-02-15T09:29:24.514083303Z',
        '_time': '2021-02-15T09:29:06.059Z',
        '_value': 19.7,
        '_field': 'value',
        '_measurement': 'temperature',
    },
    {
        'result': '_result',
        'table': 0,
        '_start': '2021-02-14T09:29:24.514083303Z',
        '_stop': '2021-02-15T09:29:24.514083303Z',
        '_time': '2021-02-15T09:29:06.059623817Z',
        '_value': 21.3,
        '_field': 'value',
        '_measurement': 'temperature',
    },
]

Writing Data

To write data to InfluxDB, an endpoint with a write property has to be defined. If a measurement property is set, it is used by default for all data points sent to the endpoint. This property can also be overridden by providing a property measurement in the data message.

To write data you must send an MQTT message, to the /set topic of the Endpoint, like the following (with a measurement property set in the data message):

{ 'tags': { 'rpm': '8000', 'oil_temp': '250' }, 'value': 91, 'fields': { 'engine_number': 1 }, 'timestamp': 1813127450710, 'measurement': 'temperature' }

Both tags and fields are supported and if the timestamp is not present it is assigned by InfluxDB.

It is also possible to write several data points per message. To do this, they just have to be sent as an array.

Note that, respecting InfluxDB client design, the writes are asynchronous and data is written to influx based on the value of the InfluxDB connection property flushInterval which by default is 10 seconds.

Output Format on Write

When data is written to an InfluxDB Endpoint a message is published to the /res topic of the Endpoint. The output message is an object with two properties:

  • timestamp: is the unix timestamp, in milliseconds, of when the write was executed

  • value: is set to true when the write was successful

Service Commissioning File Example

influxdb-example.yml
description: |
    Sample InfluxDB commissioning file

metadata:
    name: Cybus InfluxDB Example
    provider: cybus
    homepage: https://cybus.io
    version: 1.0.0

#------------------------------------------------------------------------------
# Parameters
#------------------------------------------------------------------------------

parameters:
    influxHost:
        type: string
        description: 'HTTP address of InfluxDB server'
        default: 'influxdbhost'

    influxPort:
        type: integer
        description: 'Influx Port'
        default: 8086

    influxScheme:
        type: 'string'
        description: 'Either use http or https for the server url'
        default: 'http'

#------------------------------------------------------------------------------
# Resources
#------------------------------------------------------------------------------
resources:
    #----------------------------------------------------------------------------
    # Connections
    #----------------------------------------------------------------------------

    influxdbConnection:
        type: Cybus::Connection
        properties:
            protocol: Influxdb
            connection:
                host: !ref influxHost
                token: '-an-influx-db-jwt-token-'
                port: !ref influxPort
                bucket: turbine
                scheme: !ref influxScheme
                flushInterval: 5000

    #----------------------------------------------------------------------------
    # Endpoints
    #----------------------------------------------------------------------------

    turbineWrite:
        type: Cybus::Endpoint
        properties:
            protocol: Influxdb
            connection: !ref influxdbConnection
            write:
                measurement: 'turbine'

    rotaryEncoderWrite:
        type: Cybus::Endpoint
        properties:
            protocol: Influxdb
            connection: !ref influxdbConnection
            write:
                measurement: 'rotary_encoder'

    rotary_encoder_angle:
        type: Cybus::Endpoint
        properties:
            protocol: Influxdb
            connection: !ref influxdbConnection
            subscribe:
                interval: 6000
                query: >
                    from(bucket:"turbine") |> range(start: -1d) |> filter(fn: (r) => r._measurement == "rotary_encoder")

    #----------------------------------------------------------------------------
    # Mappings
    #----------------------------------------------------------------------------

    # A mapping that overrides the measurement value with the name of the MQTT topic
    mappings:
        type: Cybus::Mapping
        properties:
            mappings:
                - subscribe:
                      topic: 'turbine/#'
                  publish:
                      endpoint: !ref turbineWrite
                  rules:
                      - transform:
                            # Add the topic as measurement
                            expression: '$merge([$,{"measurement": $context.topic}])'
                # A mapping that will pass data from the MQTT topic to the write endpoint
                # allowing overriding the measurement by providing it in the topic
                - subscribe:
                      topic: 'encoder/#'
                  publish:
                      endpoint: !ref rotaryEncoderWrite

Last updated

Logo

© Copyright 2024, Cybus GmbH