# InfluxDB

Connect Connectware to InfluxDB, a time series database designed for storing and querying time-stamped data. This integration lets you read from and write to your InfluxDB instance using familiar MQTT topics.

InfluxDB organizes data into measurements (similar to tables), where each measurement contains data points. Each point includes a timestamp, a value, fields, and tags that help you organize and query your data effectively.

This protocol implementation supports the Flux query language, giving you powerful querying capabilities for your time series data.

Use the configuration parameters and examples below to integrate InfluxDB with your Connectware services.

* [Connection Properties](https://docs.cybus.io/connectors/enterprise-connectors/influxdb/influxdbconnection)
* [Endpoint Properties](https://docs.cybus.io/connectors/enterprise-connectors/influxdb/influxdbendpoint)

### Reading Data

You can read data from InfluxDB in two ways:

* **One-time read**: Define an endpoint with a `read` property to fetch data on demand.
* **Continuous subscription**: Define an endpoint with a `subscribe` property to poll data at regular intervals.

Both methods require a valid Flux query in the endpoint's `query` field. When you subscribe to data, specify a polling interval. Your query runs automatically at that frequency. Query results are delivered as JSON to your MQTT broker.

**Dynamic queries**: Use value interpolation in your Flux queries with tags like `@tag` to make your queries flexible and reusable.

{% code lineNumbers="true" %}

```
from(bucket:”@bucket”) |> range(start: @startMeasurementTime) |> filter(fn: (r) => r.\_measurement == “@measurement”)
```

{% endcode %}

In this example, when you request a read and provide values for `bucket`, `measurement`, and `startMeasurementTime`, the system automatically generates a complete Flux query by replacing the `@` placeholders with your values.

### Output Format on Read

Query results are automatically published to MQTT topics:

* **One-time reads**: Results appear on the endpoint's `/res` topic.
* **Subscriptions**: Results appear on the endpoint's default topic.

You will receive the data as a JSON array containing your InfluxDB query results.

{% code lineNumbers="true" %}

```yaml
[
  {
    'result': '_result',
    'table': 0,
    '_start': '2021-02-14T09:29:24.514083303Z',
    '_stop': '2021-02-15T09:29:24.514083303Z',
    '_time': '2021-02-15T09:29:06.059Z',
    '_value': 19.7,
    '_field': 'value',
    '_measurement': 'temperature',
  },
  {
    'result': '_result',
    'table': 0,
    '_start': '2021-02-14T09:29:24.514083303Z',
    '_stop': '2021-02-15T09:29:24.514083303Z',
    '_time': '2021-02-15T09:29:06.059623817Z',
    '_value': 21.3,
    '_field': 'value',
    '_measurement': 'temperature',
  },
]
```

{% endcode %}

### Writing Data

* To write data to InfluxDB, create an endpoint with a `write` property.

**Setting the measurement name**: You can set a default `measurement` property in your endpoint configuration. This applies to all data points you send. If you need flexibility, override this default by including a `measurement` property in your individual data messages.

**How to write**: Send an MQTT message to your endpoint's `/set` topic. Here is an example that includes a measurement name in the message:

{% code lineNumbers="true" %}

```yaml
{ 'tags': { 'rpm': '8000', 'oil_temp': '250' }, 'value': 91, 'fields': { 'engine_number': 1 }, 'timestamp': 1813127450710, 'measurement': 'temperature' }
```

{% endcode %}

**What you can include**:

* **Tags and fields**: Both are fully supported for organizing your data.
* **Timestamp**: Optional. If you don't provide one, InfluxDB automatically assigns the current time.

**Writing multiple points**: Send an array of data points in a single message to write multiple values at once.

**Write behavior**: Writes are asynchronous to optimize performance. Data is buffered and written to InfluxDB at intervals defined by the `flushInterval` connection property (default: 10 seconds). This follows InfluxDB's recommended client design pattern.

### Output Format on Write

After writing data, you will receive a confirmation message on the endpoint's `/res` topic. This response contains:

* **timestamp**: Unix timestamp (in milliseconds) when the write was executed.
* **value**: Set to `true` when the write succeeds.

## Service Commissioning File Example

This example demonstrates a complete InfluxDB integration with write endpoints and a subscription for reading data.

{% file src="<https://2355450750-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FGzXPesVecsUM1eHBfwea%2Fuploads%2Fgit-blob-a851a9536ac3daa5d155846fa0269267d82013a6%2Finfluxdb-example.yml?alt=media>" %}

{% code title="influxdb-example.yml" lineNumbers="true" %}

```yaml
description: |
  Sample InfluxDB commissioning file

metadata:
  name: Cybus InfluxDB Example
  provider: cybus
  homepage: https://cybus.io
  version: 1.0.0

#------------------------------------------------------------------------------
# Parameters
#------------------------------------------------------------------------------

parameters:
  influxHost:
    type: string
    description: 'HTTP address of InfluxDB server'
    default: 'influxdbhost'

  influxPort:
    type: integer
    description: 'Influx Port'
    default: 8086

  influxScheme:
    type: 'string'
    description: 'Either use http or https for the server url'
    default: 'http'

#------------------------------------------------------------------------------
# Resources
#------------------------------------------------------------------------------
resources:
  #----------------------------------------------------------------------------
  # Connections
  #----------------------------------------------------------------------------

  influxdbConnection:
    type: Cybus::Connection
    properties:
      protocol: Influxdb
      connection:
        host: !ref influxHost
        token: '-an-influx-db-jwt-token-'
        port: !ref influxPort
        bucket: turbine
        scheme: !ref influxScheme
        flushInterval: 5000

  #----------------------------------------------------------------------------
  # Endpoints
  #----------------------------------------------------------------------------

  turbineWrite:
    type: Cybus::Endpoint
    properties:
      protocol: Influxdb
      connection: !ref influxdbConnection
      write:
        measurement: 'turbine'

  rotaryEncoderWrite:
    type: Cybus::Endpoint
    properties:
      protocol: Influxdb
      connection: !ref influxdbConnection
      write:
        measurement: 'rotary_encoder'

  rotary_encoder_angle:
    type: Cybus::Endpoint
    properties:
      protocol: Influxdb
      connection: !ref influxdbConnection
      subscribe:
        interval: 6000
        query: >
          from(bucket:"turbine") |> range(start: -1d) |> filter(fn: (r) => r._measurement == "rotary_encoder")

  #----------------------------------------------------------------------------
  # Mappings
  #----------------------------------------------------------------------------

  # A mapping that overrides the measurement value with the name of the MQTT topic
  mappings:
    type: Cybus::Mapping
    properties:
      mappings:
        - subscribe:
            topic: 'turbine/#'
          publish:
            endpoint: !ref turbineWrite
          rules:
            - transform:
                # Add the topic as measurement
                expression: '$merge([$,{"measurement": $context.topic}])'
        # A mapping that will pass data from the MQTT topic to the write endpoint
        # allowing overriding the measurement by providing it in the topic
        - subscribe:
            topic: 'encoder/#'
          publish:
            endpoint: !ref rotaryEncoderWrite
```

{% endcode %}
