# InfluxDB

This protocol implementation provides an abstraction layer for connecting to InfluxDB which is a time series database.

As a time series database InfluxDB store data in collections called ‘measurements’ and each ‘measurement’ stores ‘points’ which in turn are formed by a timestamp, a value, fields and tags to achieve different groupings.

This version of the protocol supports the Flux query language for InfluxDB.

Below you can see configuration parameters and examples on how to build a commissioning file for this protocol.

[Connection Properties](https://docs.cybus.io/1-8-0/documentation/industry-protocol-details/influxdb/influxdbconnection)

[Endpoint Properties](https://docs.cybus.io/1-8-0/documentation/industry-protocol-details/influxdb/influxdbendpoint)

### Reading Data

To read data from InfluxDB, an endpoint has to be defined with either *read* or *subscribe* properties; a valid Flux query needs to be provided as part of the endpoint configuration using the field *query*. Subscribe works by defining a polling interval, hence the query will be executed on a regular basis. The result of the query is provided in JSON format on the MQTT broker.

The provided Flux query supports value interpolation by using tags of the type ‘@tag’.

{% code lineNumbers="true" %}

```
from(bucket:”@bucket”) |> range(start: @startMeasurementTime) |> filter(fn: (r) => r.\_measurement == “@measurement”)
```

{% endcode %}

In the previous example requesting a read providing values for ‘bucket’, ‘measurement’ and ‘startMeasurementTime’ will generate a valid Flux query.

### Output Format on Read

When data is read from InfluxDB results are published to the */res* topic of the Endpoint. When a subscription is configured the results are published to the Endpoint default topic.

The output in both cases will be provided as a JSON array representing the InfluxDB query result.

{% code lineNumbers="true" %}

```yaml
[
    {
        'result': '_result',
        'table': 0,
        '_start': '2021-02-14T09:29:24.514083303Z',
        '_stop': '2021-02-15T09:29:24.514083303Z',
        '_time': '2021-02-15T09:29:06.059Z',
        '_value': 19.7,
        '_field': 'value',
        '_measurement': 'temperature',
    },
    {
        'result': '_result',
        'table': 0,
        '_start': '2021-02-14T09:29:24.514083303Z',
        '_stop': '2021-02-15T09:29:24.514083303Z',
        '_time': '2021-02-15T09:29:06.059623817Z',
        '_value': 21.3,
        '_field': 'value',
        '_measurement': 'temperature',
    },
]
```

{% endcode %}

### Writing Data

To write data to InfluxDB, an endpoint with a *write* property has to be defined. If a *measurement* property is set, it is used by default for all data points sent to the endpoint. This property can also be overridden by providing a property *measurement* in the data message.

To write data you must send an MQTT message, to the */set* topic of the Endpoint, like the following (with a *measurement* property set in the data message):

{% code lineNumbers="true" %}

```yaml
{ 'tags': { 'rpm': '8000', 'oil_temp': '250' }, 'value': 91, 'fields': { 'engine_number': 1 }, 'timestamp': 1813127450710, 'measurement': 'temperature' }
```

{% endcode %}

Both tags and fields are supported and if the timestamp is not present it is assigned by InfluxDB.

It is also possible to write several data points per message. To do this, they just have to be sent as an array.

Note that, respecting InfluxDB client design, the writes are asynchronous and data is written to influx based on the value of the InfluxDB connection property *flushInterval* which by default is 10 seconds.

### Output Format on Write

When data is written to an InfluxDB Endpoint a message is published to the */res* topic of the Endpoint. The output message is an object with two properties:

* timestamp: is the unix timestamp, in milliseconds, of when the write was executed
* value: is set to true when the write was successful

## Sample Commissioning File

Download:

{% file src="<https://3008235441-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FshLLdPEhD7SHVDYSquU1%2Fuploads%2Fgit-blob-099c4cf656ca0d40ed75e495d379a4c12a5a7a5f%2Finfluxdb-example.yml?alt=media>" %}

{% code lineNumbers="true" %}

```yaml
description: |
    Sample InfluxDB commissioning file

metadata:
    name: Cybus InfluxDB Example
    provider: cybus
    homepage: https://cybus.io
    version: 1.0.0

#------------------------------------------------------------------------------
# Parameters
#------------------------------------------------------------------------------

parameters:
    influxHost:
        type: string
        description: 'HTTP address of InfluxDB server'
        default: 'influxdbhost'

    influxPort:
        type: integer
        description: 'Influx Port'
        default: 8086

    influxScheme:
        type: 'string'
        description: 'Either use http or https for the server url'
        default: 'http'

#------------------------------------------------------------------------------
# Resources
#------------------------------------------------------------------------------
resources:
    #----------------------------------------------------------------------------
    # Connections
    #----------------------------------------------------------------------------

    influxdbConnection:
        type: Cybus::Connection
        properties:
            protocol: Influxdb
            connection:
                host: !ref influxHost
                token: '-an-influx-db-jwt-token-'
                port: !ref influxPort
                bucket: turbine
                scheme: !ref influxScheme
                flushInterval: 5000

    #----------------------------------------------------------------------------
    # Endpoints
    #----------------------------------------------------------------------------

    turbineWrite:
        type: Cybus::Endpoint
        properties:
            protocol: Influxdb
            connection: !ref influxdbConnection
            write:
                measurement: 'turbine'

    rotaryEncoderWrite:
        type: Cybus::Endpoint
        properties:
            protocol: Influxdb
            connection: !ref influxdbConnection
            write:
                measurement: 'rotary_encoder'

    rotary_encoder_angle:
        type: Cybus::Endpoint
        properties:
            protocol: Influxdb
            connection: !ref influxdbConnection
            subscribe:
                interval: 6000
                query: >
                    from(bucket:"turbine") |> range(start: -1d) |> filter(fn: (r) => r._measurement == "rotary_encoder")

    #----------------------------------------------------------------------------
    # Mappings
    #----------------------------------------------------------------------------

    # A mapping that overrides the measurement value with the name of the MQTT topic
    mappings:
        type: Cybus::Mapping
        properties:
            mappings:
                - subscribe:
                      topic: 'turbine/#'
                  publish:
                      endpoint: !ref turbineWrite
                  rules:
                      - transform:
                            # Add the topic as measurement
                            expression: '$merge([$,{"measurement": $context.topic}])'
                # A mapping that will pass data from the MQTT topic to the write endpoint
                # allowing overriding the measurement by providing it in the topic
                - subscribe:
                      topic: 'encoder/#'
                  publish:
                      endpoint: !ref rotaryEncoderWrite
```

{% endcode %}
