InfluxDB

This protocol implementation provides an abstraction layer for connecting to InfluxDB which is a time series database.

As a time series database InfluxDB store data in collections called ‘measurements’ and each ‘measurement’ stores ‘points’ which in turn are formed by a timestamp, a value, fields and tags to achieve different groupings.

This version of the protocol supports the Flux query language for InfluxDB.

Below you can see configuration parameters and examples on how to build a commissioning file for this protocol.

Connection Properties

scheme (string, enum)

Transport scheme to be used

This element must be one of the following enum values:

  • http

  • https

Default: "http"

Example: "https"

host (string, required)

The HTTP host of the InfluxDB server

Example: "connectware"

port (integer)

The port of the InfluxDB server

Default: 8086

Example: 8086

token (string)

An InfluxDB authentication token

Default: ""

Example: "an-influx-auth-token"

probeInterval (integer)

Time interval to check if connection is still there, in milliseconds

Default: 2000

Additional restrictions:

  • Minimum: 1000

timeout (integer)

Connection timeout, in milliseconds

Default: 10000

Example: 10000

Additional restrictions:

  • Minimum: 1000

transportOptions (object)

Additional connection options for InfluxDB

Default:

{}

org (string)

Organization for InfluxDB. An Organization is a workspace for a group of users, all objects stored in Influxdb belong to an organization

Default: "generic"

bucket (string)

InfluxDB bucket

Default: "measurement_data"

precision (string, enum)

Defines the precision to use for timestamps for InfluxDB

This element must be one of the following enum values:

  • ns

  • us

  • ms

  • s

Default: "ms"

maxRetryDelay (integer)

Maximum delay between write retries in milliseconds

Default: 180000

Example: 180000

minRetryDelay (integer)

Minimum delay between write retries in milliseconds

Default: 5000

Example: 5000

retryJitter (integer)

Random value of up to Retry Jitter is added when attempting the next retry

Default: 200

Example: 200

maxBufferLines (integer)

Maximum size of the buffer for items that could not be sent in a previous write

Default: 100000

Example: 100000

maxRetries (integer)

Maximum number of retries to attempt a write

Default: 3

Example: 3

exponentialBase (integer)

Exponential base of the Retry Jitter used when computing the next delay

Default: 5

Example: 5

batchSize (integer)

Maximum points/line to send in a single batch to InfluxDB

Default: 1000

Example: 1000

flushInterval (integer)

Maximum time in milliseconds to keep points in an unflushed batch

Default: 10000

Example: 10000

Additional restrictions:

  • Minimum: 1

connectionStrategy (object)

If a connection attempt fails, retries will be performed with increasing delay (waiting time) in between. The following parameters control how these delays behave.

Properties of the connectionStrategy object:

initialDelay (integer)

Delay (waiting time) of the first connection retry (in milliseconds). For subsequent retries, the delay will be increased according to the parameter incrementFactor which has a default value of 2.

Default: 1000

Additional restrictions:

  • Minimum: 1000

maxDelay (integer)

Maximum delay (waiting time) to wait until the next retry (in milliseconds). The delay (waiting time) for any subsequent connection retry will not be larger than this value. Must be strictly greater than initialDelay.

Default: 30000

incrementFactor (integer)

The factor used to increment initialDelay up to maxDelay. For example if initialDelay is set to 1000 and maxDelay to 5000 the values for the delay would be 1000, 2000, 4000, 5000.

Default: 2

Additional restrictions:

  • Minimum: 2

Endpoint Properties

query (string)

The InfluxDB Flux query used to read or subscribe to measurements. This property is only used for read or subscribe endpoints

Example: "from(bucket:\"my-bucket\") |> range(start: -1h) |> filter(fn: (r) => r._measurement == \"temperature\")"

interval (integer)

The amount of milliseconds between queries. If not provided defaults to 1000 milliseconds

cronExpression (string)

The Cron expression used to poll the endpoint. (For examples, see: https://github.com/node-cron/node-cron)

Examples: "1,2,4,5 * * * *", "1-5 * * * *", "*/2 * * * *", "* * * January,September Sunday"

measurement (string)

The name of the measurement on which to store data points when writing data. This property can be overriden by providing the measurement in the data of a message, but it has to be set in at least one of the two places.

Example: "exhaust_temperature"

measurementPrefix (string)

An optional prefix for the measurement name that it is prepend to it.

Example: "engine_"

fields (object)

An optional object made up of key value pairs. These constant fields are merged with the fields from the message.

tags (object)

An optional object made up of key value pairs. These constant tags are merged with the tags from the message.

Reading Data

To read data from InfluxDB, an endpoint has to be defined with either read or subscribe properties; a valid Flux query needs to be provided as part of the endpoint configuration using the field query. Subscribe works by defining a polling interval, hence the query will be executed on a regular basis. The result of the query is provided in JSON format on the MQTT broker.

The provided Flux query supports value interpolation by using tags of the type ‘@tag’.

from(bucket:”@bucket”) |> range(start: @startMeasurementTime) |> filter(fn: (r) => r._measurement == “@measurement”)

In the previous example requesting a read providing values for ‘bucket’, ‘measurement’ and ‘startMeasurementTime’ will generate a valid Flux query.

Output Format on Read

When data is read from InfluxDB results are published to the /res topic of the Endpoint. When a subscription is configured the results are published to the Endpoint default topic.

The output in both cases will be provided as a JSON array representing the InfluxDB query result.

[
  {
    "result": "_result",
    "table": 0,
    "_start": "2021-02-14T09:29:24.514083303Z",
    "_stop": "2021-02-15T09:29:24.514083303Z",
    "_time": "2021-02-15T09:29:06.059Z",
    "_value": 19.7,
    "_field": "value",
    "_measurement": "temperature",
  },
  {
    "result": "_result",
    "table": 0,
    "_start": "2021-02-14T09:29:24.514083303Z",
    "_stop": "2021-02-15T09:29:24.514083303Z",
    "_time": "2021-02-15T09:29:06.059623817Z",
    "_value": 21.3,
    "_field": "value",
    "_measurement": "temperature",
  },
]

Writing Data

To write data to InfluxDB, an endpoint with a write property has to be defined. If a measurement property is set, it is used by default for all data points sent to the endpoint. This property can also be overridden by providing a property measurement in the data message.

To write data you must send an MQTT message, to the /set topic of the Endpoint, like the following (with a measurement property set in the data message):

{
  "tags": {  "rpm": "8000", "oil_temp": "250" },
  "value": 91,
  "fields": { "engine_number": 1 },
  "timestamp": 1813127450710,
  "measurement": "temperature"
}

Both tags and fields are supported and if the timestamp is not present it is assigned by InfluxDB.

It is also possible to write several data points per message. To do this, they just have to be sent as an array.

Note that, respecting InfluxDB client design, the writes are asynchronous and data is written to influx based on the value of the InfluxDB connection property flushInterval which by default is 10 seconds.

Output Format on Write

When data is written to an InfluxDB Endpoint a message is published to the /res topic of the Endpoint. The output message is an object with two properties:

  • timestamp: is the unix timestamp, in milliseconds, of when the write was executed

  • value: is set to true when the write was successful

Sample Commissioning file

Download: influxdb-example.yml

  1description: |
  2  Sample InfluxDB commissioning file
  3
  4metadata:
  5  name: Cybus InfluxDB Example
  6  provider: cybus
  7  homepage: https://cybus.io
  8  version: 1.0.0
  9
 10#------------------------------------------------------------------------------
 11# Parameters
 12#------------------------------------------------------------------------------
 13
 14parameters:
 15  influxHost:
 16    type: string
 17    description: "HTTP address of InfluxDB server"
 18    default: "influxdbhost"
 19
 20  influxPort:
 21    type: integer
 22    description: "Influx Port"
 23    default: 8086
 24  
 25  influxScheme:
 26    type: "string"
 27    description: "Either use http or https for the server url"
 28    default: "http"
 29
 30#------------------------------------------------------------------------------
 31# Resources
 32#------------------------------------------------------------------------------
 33resources:
 34
 35  #----------------------------------------------------------------------------
 36  # Connections
 37  #----------------------------------------------------------------------------
 38
 39  influxdbConnection:
 40    type: Cybus::Connection
 41    properties:
 42      protocol: Influxdb
 43      connection:
 44        host: !ref influxHost
 45        token: "-an-influx-db-jwt-token-"
 46        port: !ref influxPort
 47        bucket: turbine
 48        scheme: !ref influxScheme
 49        flushInterval: 5000
 50
 51  #----------------------------------------------------------------------------
 52  # Endpoints
 53  #----------------------------------------------------------------------------
 54
 55  turbineWrite:
 56    type: Cybus::Endpoint
 57    properties:
 58      protocol: Influxdb
 59      connection: !ref influxdbConnection
 60      write:
 61        measurement: "turbine"
 62
 63  rotaryEncoderWrite:
 64    type: Cybus::Endpoint
 65    properties:
 66      protocol: Influxdb
 67      connection: !ref influxdbConnection
 68      write:
 69        measurement: "rotary_encoder"
 70
 71  rotary_encoder_angle:
 72    type: Cybus::Endpoint
 73    properties:
 74      protocol: Influxdb
 75      connection: !ref influxdbConnection
 76      subscribe:
 77        interval: 6000
 78        query: >
 79          from(bucket:"turbine") |> range(start: -1d) |> filter(fn: (r) => r._measurement == "rotary_encoder")
 80
 81  #----------------------------------------------------------------------------
 82  # Mappings
 83  #----------------------------------------------------------------------------
 84
 85  # A mapping that overrides the measurement value with the name of the MQTT topic
 86  mappings:
 87    type: Cybus::Mapping
 88    properties:
 89      mappings:
 90        - subscribe:
 91            topic: 'turbine/#'
 92          publish:
 93            endpoint: !ref turbineWrite
 94          rules:
 95            - transform:
 96                # Add the topic as measurement
 97                expression: '$merge([$,{"measurement": $context.topic}])'
 98        # A mapping that will pass data from the MQTT topic to the write endpoint
 99        # allowing overriding the measurement by providing it in the topic
100        - subscribe:
101            topic: 'encoder/#'
102          publish:
103            endpoint: !ref rotaryEncoderWrite