InfluxDB

This protocol implementation provides an abstraction layer for connecting to InfluxDB which is a time series database.

As a time series database InfluxDB store data in collections called ‘measurements’ and each ‘measurement’ stores ‘points’ which in turn are formed by a timestamp, a value, fields and tags to achieve different groupings.

This version of the protocol supports the Flux query language for InfluxDB.

Below you can see configuration parameters and examples on how to build a commissioning file for this protocol.

Connection Properties

scheme (string, enum)

Transport scheme to be used

This element must be one of the following enum values:

  • http

  • https

Default: "http"

Example: "https"

host (string, required)

The HTTP host of the InfluxDB server

Example: "connectware"

port (integer)

The port of the InfluxDB server

Default: 8086

Example: 8086

token (string)

An InfluxDB authentication token

Default: ""

Example: "an-influx-auth-token"

probeInterval (integer)

Time interval to check if connection is still there, in milliseconds

Default: 2000

Additional restrictions:

  • Minimum: 1000

timeout (integer)

Connection timeout, in milliseconds

Default: 10000

Example: 10000

Additional restrictions:

  • Minimum: 1000

transportOptions (object)

Additional connection options for InfluxDB

Default:

{}

org (string)

Organization for InfluxDB. An Organization is a workspace for a group of users, all objects stored in Influxdb belong to an organization

Default: "generic"

bucket (string)

InfluxDB bucket

Default: "measurement_data"

precision (string, enum)

Defines the precision to use for timestamps for InfluxDB

This element must be one of the following enum values:

  • ns

  • us

  • ms

  • s

Default: "ms"

maxRetryDelay (integer)

Maximum delay between write retries in milliseconds

Default: 180000

Example: 180000

minRetryDelay (integer)

Minimum delay between write retries in milliseconds

Default: 5000

Example: 5000

retryJitter (integer)

Random value of up to Retry Jitter is added when attempting the next retry

Default: 200

Example: 200

maxBufferLines (integer)

Maximum size of the buffer for items that could not be sent in a previous write

Default: 100000

Example: 100000

maxRetries (integer)

Maximum number of retries to attempt a write

Default: 3

Example: 3

exponentialBase (integer)

Exponential base of the Retry Jitter used when computing the next delay

Default: 5

Example: 5

batchSize (integer)

Maximum points/line to send in a single batch to InfluxDB

Default: 1000

Example: 1000

flushInterval (integer)

Maximum time in milliseconds to keep points in an unflushed batch

Default: 10000

Example: 10000

Additional restrictions:

  • Minimum: 1

connectionStrategy (object)

If a connection attempt fails, retries will be performed with increasing delay (waiting time) in between. The following parameters control how these delays behave.

Properties of the connectionStrategy object:

initialDelay (integer)

Delay (waiting time) of the first connection retry (in milliseconds). For subsequent retries, the delay will be increased according to the parameter incrementFactor which has a default value of 2.

Default: 1000

Additional restrictions:

  • Minimum: 1000

maxDelay (integer)

Maximum delay (waiting time) to wait until the next retry (in milliseconds). The delay (waiting time) for any subsequent connection retry will not be larger than this value. Must be strictly greater than initialDelay.

Default: 30000

incrementFactor (integer)

The factor used to increment initialDelay up to maxDelay. For example if initialDelay is set to 1000 and maxDelay to 5000 the values for the delay would be 1000, 2000, 4000, 5000.

Default: 2

Additional restrictions:

  • Minimum: 2

Endpoint Properties

query (string)

The InfluxDB Flux query used to read or subscribe to measurements. This property is only used for read or subscribe endpoints

Example: "from(bucket:\"my-bucket\") |> range(start: -1h) |> filter(fn: (r) => r._measurement == \"temperature\")"

interval (integer)

The amount of milliseconds between queries. If not provided defaults to 1000 milliseconds

cronExpression (string)

The Cron expression used to poll the endpoint. (For examples, see: https://github.com/node-cron/node-cron)

Examples: "1,2,4,5 * * * *", "1-5 * * * *", "*/2 * * * *", "* * * January,September Sunday"

measurement (string)

The name of the measurement on which to store data points when writing data. This property can be overriden by providing the measurement in the data of a message, but it has to be set in at least one of the two places.

Example: "exhaust_temperature"

measurementPrefix (string)

An optional prefix for the measurement name that it is prepend to it.

Example: "engine_"

fields (object)

An optional object made up of key value pairs. These constant fields are merged with the fields from the message.

tags (object)

An optional object made up of key value pairs. These constant tags are merged with the tags from the message.

Reading Data

To read data from InfluxDB, an endpoint has to be defined with either read or subscribe properties; a valid Flux query needs to be provided as part of the endpoint configuration using the field query. Subscribe works by defining a polling interval, hence the query will be executed on a regular basis. The result of the query is provided in JSON format on the MQTT broker.

The provided Flux query supports value interpolation by using tags of the type ‘@tag’.

from(bucket:”@bucket”) |> range(start: @startMeasurementTime) |> filter(fn: (r) => r._measurement == “@measurement”)

In the previous example requesting a read providing values for ‘bucket’, ‘measurement’ and ‘startMeasurementTime’ will generate a valid Flux query.

Output Format on Read

When data is read from InfluxDB results are published to the /res topic of the Endpoint. When a subscription is configured the results are published to the Endpoint default topic.

The output in both cases will be provided as a JSON array representing the InfluxDB query result.

[
  {
    "result": "_result",
    "table": 0,
    "_start": "2021-02-14T09:29:24.514083303Z",
    "_stop": "2021-02-15T09:29:24.514083303Z",
    "_time": "2021-02-15T09:29:06.059Z",
    "_value": 19.7,
    "_field": "value",
    "_measurement": "temperature",
  },
  {
    "result": "_result",
    "table": 0,
    "_start": "2021-02-14T09:29:24.514083303Z",
    "_stop": "2021-02-15T09:29:24.514083303Z",
    "_time": "2021-02-15T09:29:06.059623817Z",
    "_value": 21.3,
    "_field": "value",
    "_measurement": "temperature",
  },
]

Writing Data

To write data to InfluxDB, an endpoint with a write property has to be defined. If a measurement property is set, it is used by default for all data points sent to the endpoint. This property can also be overridden by providing a property measurement in the data message.

To write data you must send an MQTT message, to the /set topic of the Endpoint, like the following (with a measurement property set in the data message):

{
  "tags": {  "rpm": "8000", "oil_temp": "250" },
  "value": 91,
  "fields": { "engine_number": 1 },
  "timestamp": 1813127450710,
  "measurement": "temperature"
}

Both tags and fields are supported and if the timestamp is not present it is assigned by InfluxDB.

It is also possible to write several data points per message. To do this, they just have to be sent as an array.

Note that, respecting InfluxDB client design, the writes are asynchronous and data is written to influx based on the value of the InfluxDB connection property flushInterval which by default is 10 seconds.

Output Format on Write

When data is written to an InfluxDB Endpoint a message is published to the /res topic of the Endpoint. The output message is an object with two properties:

  • timestamp: is the unix timestamp, in milliseconds, of when the write was executed

  • value: is set to true when the write was successful

Sample Commissioning file

Download: influxdb-example.yml

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
description: |
  Sample InfluxDB commissioning file

metadata:
  name: Cybus InfluxDB Example
  provider: cybus
  homepage: https://cybus.io
  version: 1.0.0

#------------------------------------------------------------------------------
# Parameters
#------------------------------------------------------------------------------

parameters:
  influxHost:
    type: string
    description: "HTTP address of InfluxDB server"
    default: "influxdbhost"

  influxPort:
    type: integer
    description: "Influx Port"
    default: 8086
  
  influxScheme:
    type: "string"
    description: "Either use http or https for the server url"
    default: "http"

#------------------------------------------------------------------------------
# Resources
#------------------------------------------------------------------------------
resources:

  #----------------------------------------------------------------------------
  # Connections
  #----------------------------------------------------------------------------

  influxdbConnection:
    type: Cybus::Connection
    properties:
      protocol: Influxdb
      connection:
        host: !ref influxHost
        token: "-an-influx-db-jwt-token-"
        port: !ref influxPort
        bucket: turbine
        scheme: !ref influxScheme
        flushInterval: 5000

  #----------------------------------------------------------------------------
  # Endpoints
  #----------------------------------------------------------------------------

  turbineWrite:
    type: Cybus::Endpoint
    properties:
      protocol: Influxdb
      connection: !ref influxdbConnection
      write:
        measurement: "turbine"

  rotaryEncoderWrite:
    type: Cybus::Endpoint
    properties:
      protocol: Influxdb
      connection: !ref influxdbConnection
      write:
        measurement: "rotary_encoder"

  rotary_encoder_angle:
    type: Cybus::Endpoint
    properties:
      protocol: Influxdb
      connection: !ref influxdbConnection
      subscribe:
        interval: 6000
        query: >
          from(bucket:"turbine") |> range(start: -1d) |> filter(fn: (r) => r._measurement == "rotary_encoder")

  #----------------------------------------------------------------------------
  # Mappings
  #----------------------------------------------------------------------------

  # A mapping that overrides the measurement value with the name of the MQTT topic
  mappings:
    type: Cybus::Mapping
    properties:
      mappings:
        - subscribe:
            topic: 'turbine/#'
          publish:
            endpoint: !ref turbineWrite
          rules:
            - transform:
                # Add the topic as measurement
                expression: '$merge([$,{"measurement": $context.topic}])'
        # A mapping that will pass data from the MQTT topic to the write endpoint
        # allowing overriding the measurement by providing it in the topic
        - subscribe:
            topic: 'encoder/#'
          publish:
            endpoint: !ref rotaryEncoderWrite