# InfluxDB

This protocol implementation provides an abstraction layer for connecting to InfluxDB which is a time series database.

As a time series database InfluxDB store data in collections called ‘measurements’ and each ‘measurement’ stores ‘points’ which in turn are formed by a timestamp, a value, fields and tags to achieve different groupings.

This version of the protocol supports the Flux query language for InfluxDB.

Below you can see configuration parameters and examples on how to build a service commissioning file for this protocol.

[Connection Properties](https://docs.cybus.io/2-0-6/documentation/industry-protocol-details/influxdb/influxdbconnection)

[Endpoint Properties](https://docs.cybus.io/2-0-6/documentation/industry-protocol-details/influxdb/influxdbendpoint)

### Reading Data

To read data from InfluxDB, an endpoint has to be defined with either *read* or *subscribe* properties; a valid Flux query needs to be provided as part of the endpoint configuration using the field *query*. Subscribe works by defining a polling interval, hence the query will be executed on a regular basis. The result of the query is provided in JSON format on the MQTT broker.

The provided Flux query supports value interpolation by using tags of the type ‘@tag’.

{% code lineNumbers="true" %}

```
from(bucket:”@bucket”) |> range(start: @startMeasurementTime) |> filter(fn: (r) => r.\_measurement == “@measurement”)
```

{% endcode %}

In the previous example requesting a read providing values for ‘bucket’, ‘measurement’ and ‘startMeasurementTime’ will generate a valid Flux query.

### Output Format on Read

When data is read from InfluxDB results are published to the */res* topic of the Endpoint. When a subscription is configured the results are published to the Endpoint default topic.

The output in both cases will be provided as a JSON array representing the InfluxDB query result.

{% code lineNumbers="true" %}

```yaml
[
  {
    'result': '_result',
    'table': 0,
    '_start': '2021-02-14T09:29:24.514083303Z',
    '_stop': '2021-02-15T09:29:24.514083303Z',
    '_time': '2021-02-15T09:29:06.059Z',
    '_value': 19.7,
    '_field': 'value',
    '_measurement': 'temperature',
  },
  {
    'result': '_result',
    'table': 0,
    '_start': '2021-02-14T09:29:24.514083303Z',
    '_stop': '2021-02-15T09:29:24.514083303Z',
    '_time': '2021-02-15T09:29:06.059623817Z',
    '_value': 21.3,
    '_field': 'value',
    '_measurement': 'temperature',
  },
]
```

{% endcode %}

### Writing Data

To write data to InfluxDB, an endpoint with a *write* property has to be defined. If a *measurement* property is set, it is used by default for all data points sent to the endpoint. This property can also be overridden by providing a property *measurement* in the data message.

To write data you must send an MQTT message, to the */set* topic of the Endpoint, like the following (with a *measurement* property set in the data message):

{% code lineNumbers="true" %}

```yaml
{ 'tags': { 'rpm': '8000', 'oil_temp': '250' }, 'value': 91, 'fields': { 'engine_number': 1 }, 'timestamp': 1813127450710, 'measurement': 'temperature' }
```

{% endcode %}

Both tags and fields are supported and if the timestamp is not present it is assigned by InfluxDB.

It is also possible to write several data points per message. To do this, they just have to be sent as an array.

Note that, respecting InfluxDB client design, the writes are asynchronous and data is written to influx based on the value of the InfluxDB connection property *flushInterval* which by default is 10 seconds.

### Output Format on Write

When data is written to an InfluxDB Endpoint a message is published to the */res* topic of the Endpoint. The output message is an object with two properties:

* timestamp: is the unix timestamp, in milliseconds, of when the write was executed
* value: is set to true when the write was successful

## Service Commissioning File Example

{% file src="<https://639096190-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FfDpOJO2upcq5EpoSahvK%2Fuploads%2Fgit-blob-a851a9536ac3daa5d155846fa0269267d82013a6%2Finfluxdb-example.yml?alt=media>" %}

{% code title="influxdb-example.yml" lineNumbers="true" %}

```yaml
description: |
  Sample InfluxDB commissioning file

metadata:
  name: Cybus InfluxDB Example
  provider: cybus
  homepage: https://cybus.io
  version: 1.0.0

#------------------------------------------------------------------------------
# Parameters
#------------------------------------------------------------------------------

parameters:
  influxHost:
    type: string
    description: 'HTTP address of InfluxDB server'
    default: 'influxdbhost'

  influxPort:
    type: integer
    description: 'Influx Port'
    default: 8086

  influxScheme:
    type: 'string'
    description: 'Either use http or https for the server url'
    default: 'http'

#------------------------------------------------------------------------------
# Resources
#------------------------------------------------------------------------------
resources:
  #----------------------------------------------------------------------------
  # Connections
  #----------------------------------------------------------------------------

  influxdbConnection:
    type: Cybus::Connection
    properties:
      protocol: Influxdb
      connection:
        host: !ref influxHost
        token: '-an-influx-db-jwt-token-'
        port: !ref influxPort
        bucket: turbine
        scheme: !ref influxScheme
        flushInterval: 5000

  #----------------------------------------------------------------------------
  # Endpoints
  #----------------------------------------------------------------------------

  turbineWrite:
    type: Cybus::Endpoint
    properties:
      protocol: Influxdb
      connection: !ref influxdbConnection
      write:
        measurement: 'turbine'

  rotaryEncoderWrite:
    type: Cybus::Endpoint
    properties:
      protocol: Influxdb
      connection: !ref influxdbConnection
      write:
        measurement: 'rotary_encoder'

  rotary_encoder_angle:
    type: Cybus::Endpoint
    properties:
      protocol: Influxdb
      connection: !ref influxdbConnection
      subscribe:
        interval: 6000
        query: >
          from(bucket:"turbine") |> range(start: -1d) |> filter(fn: (r) => r._measurement == "rotary_encoder")

  #----------------------------------------------------------------------------
  # Mappings
  #----------------------------------------------------------------------------

  # A mapping that overrides the measurement value with the name of the MQTT topic
  mappings:
    type: Cybus::Mapping
    properties:
      mappings:
        - subscribe:
            topic: 'turbine/#'
          publish:
            endpoint: !ref turbineWrite
          rules:
            - transform:
                # Add the topic as measurement
                expression: '$merge([$,{"measurement": $context.topic}])'
        # A mapping that will pass data from the MQTT topic to the write endpoint
        # allowing overriding the measurement by providing it in the topic
        - subscribe:
            topic: 'encoder/#'
          publish:
            endpoint: !ref rotaryEncoderWrite
```

{% endcode %}
