LogoLogo
Contact Uscybus.io
Connectware 1.8.0
Connectware 1.8.0
  • Getting Started
    • Introduction
    • Installing Connectware
      • System Requirements
      • Acquiring your License Key
      • Installing Connectware on Docker
      • Installing Connectware on Kubernetes
    • Connectware Admin UI
    • Basic Components of Connectware
    • Connecting your First Machine
      • Your First Service Commissioning File
  • Documentation
    • Services
      • Service Commissioning Files
        • Structure of Service Commissioning Files
          • description
          • metadata
          • parameters
          • definitions
          • resources
            • Cybus::Connection
            • Cybus::Endpoint
            • Cybus:Mapping
            • Cybus::Container
              • Docker problem with network changes
            • Cybus::Link
            • Cybus::IngressRoute
            • Cybus::User
            • Cybus::Role
            • Cybus::Volume
            • Cybus::File
            • Cybus::Server
            • Cybus::Node
        • Sample Service Commissioning Files
          • Modbus
            • “Bearbeitungszentrum BAZ” - Single File
            • “Bearbeitungszentrum BAZ” - Multiple Files
            • “Bearbeitungszentrum BAZ” - Single File and Custom Topics
            • “Bearbeitungszentrum BAZ” - Agent Mode
          • Machine Condition Monitoring : OPC UA + InfluxDB + Grafana Dashboard
            • “Machine Condition Monitoring Example” - Single File
          • Machine Utilization Example (Multi file service composition) : Modbus TCP + InfluxDB + Grafana + MSS
            • “Machine Utilization Example” - Machine Connectivity
            • “Machine Utilization Example” - Dashboards with role based access permission
            • “Machine Utilization Example” - Push data to MSSQL Database
      • Services View
      • Setting Up and Configuring Services
        • Installing Services
        • Enabling Services
        • Updating Services
        • Disabling Services
        • Deleting Services
      • Service Details View
      • ServiceID
      • Inter-Service Referencing
      • Deviation
      • Service Logs
        • Logs of Individual Services
        • Logs of All Services
      • Rule Engine
        • Data Processing Rules
        • Rule Sandbox
      • API Definition
    • Resources
      • Servers
      • Containers
      • Volumes
      • Connections
      • Endpoints
      • Mappings
      • Nodes
      • API Definition
    • User Management
      • Users and Roles View
      • Users
      • Roles
      • Permissions
      • Password Policy Rules
      • Default Admin User
      • MQTT Users
      • Adding a MQTT Publish Prefix for Users
      • Multi-Factor Authentication
      • Long lived JSON Web Tokens
      • Access Permissions for Admin-UI
        • UI Access
        • Minimum Access Role Pages
      • API Definition
    • Client Registry
      • Implicit Flow
      • Explicit Flow
      • Granting Access
      • API Definition
    • Certificates
    • Monitoring
      • Data Explorer
      • Live Data
    • Workbench
      • Flows in Git Repositories
    • System Status
      • Info
      • Metrics
      • Status
      • Retrieving More System Information
      • System Health
      • API Definition
    • Backup and Restore
      • Volumes
      • User Database
    • Configuration
      • Environment Variables
      • LDAP Configuration
      • MFA Configuration
    • Agents
      • Agents View
      • Installing Agents
        • Installing Agents via Docker
        • Installing Agents via Docker Compose
        • Installing Agents via Kubernetes
        • Using Mutual TLS for Agents
      • Registering Agents in Connectware
      • Using Agents
      • Monitoring Agents
      • Troubleshooting Agents
    • Industry Protocol Details
      • ADS
        • AdsConnection
        • AdsEndpoint
      • BACnet
        • BacnetConnection
        • BacnetEndpoint
      • EtherNet/IP
        • EthernetIpConnection
        • EthernetIpEndpoint
      • Focas
        • FocasConnection
        • FocasEndpoint
      • Generic VRPC
        • GenericVrpcConnection
        • GenericVrpcEndpoint
      • Hottinger Baldwin Messtechnik (HBM)
        • HbmdaqConnection
        • HbmdaqEndpoint
      • Heidenhain DNC
        • HeidenhainConnection
        • HeidenhainEndpoint
      • HTTP/REST
        • HttpConnection
        • HttpEndpoint
      • HTTP Server
        • HttpServer
        • HttpNode
      • InfluxDB
        • InfluxdbConnection
        • InfluxdbEndpoint
      • Kafka
        • KafkaConnection
        • KafkaEndpoint
      • Modbus/TCP
        • ModbusConnection
        • ModbusEndpoint
      • MQTT
        • MqttConnection
        • MqttEndpoint
      • MSSQL
        • MssqlConnection
        • MssqlEndpoint
      • OPC DA
        • OpcdaConnection
        • OpcdaEndpoint
      • OPC UA
        • OPC UA Client
          • OpcuaConnection
          • OpcuaEndpoint
        • OPC UA Server
          • OpcuaServer
          • OpcuaNode
        • OPC UA Object Types
        • OPC UA Server References
          • OpcuaReferenceNode
          • OpcuaObjectNode
      • Siemens SIMATIC S7
        • S7Connection
        • S7Endpoint
      • Shdr
        • ShdrConnection
        • ShdrEndpoint
      • Sinumerik
        • SinumerikConnection
        • SinumerikEndpoint
      • Sopas
        • SopasConnection
        • SopasEndpoint
      • SQL
        • SqlConnection
        • SqlEndpoint
      • Werma WIN Ethernet
        • WermaConnection
        • WermaEndpoint
      • Systemstate
        • SystemstateConnection
        • SystemstateEndpoint
      • API Definition
    • Connectware Licensing
    • Changelog
      • General changes from 0.x to 1.0
        • Upgrading from 0.x to 1.0
    • Upgrade Guide
      • Upgrading from 1.x to 1.7.0
      • Upgrading from 1.x to 1.5.0
Powered by GitBook
LogoLogo

Cybus

  • Terms and Condition
  • Imprint
  • Data Privacy

© Copyright 2025, Cybus GmbH

On this page
  • Reading Data
  • Output Format on Read
  • Writing Data
  • Output Format on Write
  • Sample Commissioning File

Was this helpful?

  1. Documentation
  2. Industry Protocol Details

InfluxDB

PreviousHttpNodeNextInfluxdbConnection

Last updated 6 months ago

Was this helpful?

This protocol implementation provides an abstraction layer for connecting to InfluxDB which is a time series database.

As a time series database InfluxDB store data in collections called ‘measurements’ and each ‘measurement’ stores ‘points’ which in turn are formed by a timestamp, a value, fields and tags to achieve different groupings.

This version of the protocol supports the Flux query language for InfluxDB.

Below you can see configuration parameters and examples on how to build a commissioning file for this protocol.

Reading Data

To read data from InfluxDB, an endpoint has to be defined with either read or subscribe properties; a valid Flux query needs to be provided as part of the endpoint configuration using the field query. Subscribe works by defining a polling interval, hence the query will be executed on a regular basis. The result of the query is provided in JSON format on the MQTT broker.

The provided Flux query supports value interpolation by using tags of the type ‘@tag’.

from(bucket:”@bucket”) |> range(start: @startMeasurementTime) |> filter(fn: (r) => r.\_measurement == “@measurement”)

In the previous example requesting a read providing values for ‘bucket’, ‘measurement’ and ‘startMeasurementTime’ will generate a valid Flux query.

Output Format on Read

When data is read from InfluxDB results are published to the /res topic of the Endpoint. When a subscription is configured the results are published to the Endpoint default topic.

The output in both cases will be provided as a JSON array representing the InfluxDB query result.

[
    {
        'result': '_result',
        'table': 0,
        '_start': '2021-02-14T09:29:24.514083303Z',
        '_stop': '2021-02-15T09:29:24.514083303Z',
        '_time': '2021-02-15T09:29:06.059Z',
        '_value': 19.7,
        '_field': 'value',
        '_measurement': 'temperature',
    },
    {
        'result': '_result',
        'table': 0,
        '_start': '2021-02-14T09:29:24.514083303Z',
        '_stop': '2021-02-15T09:29:24.514083303Z',
        '_time': '2021-02-15T09:29:06.059623817Z',
        '_value': 21.3,
        '_field': 'value',
        '_measurement': 'temperature',
    },
]

Writing Data

To write data to InfluxDB, an endpoint with a write property has to be defined. If a measurement property is set, it is used by default for all data points sent to the endpoint. This property can also be overridden by providing a property measurement in the data message.

To write data you must send an MQTT message, to the /set topic of the Endpoint, like the following (with a measurement property set in the data message):

{ 'tags': { 'rpm': '8000', 'oil_temp': '250' }, 'value': 91, 'fields': { 'engine_number': 1 }, 'timestamp': 1813127450710, 'measurement': 'temperature' }

Both tags and fields are supported and if the timestamp is not present it is assigned by InfluxDB.

It is also possible to write several data points per message. To do this, they just have to be sent as an array.

Note that, respecting InfluxDB client design, the writes are asynchronous and data is written to influx based on the value of the InfluxDB connection property flushInterval which by default is 10 seconds.

Output Format on Write

When data is written to an InfluxDB Endpoint a message is published to the /res topic of the Endpoint. The output message is an object with two properties:

  • timestamp: is the unix timestamp, in milliseconds, of when the write was executed

  • value: is set to true when the write was successful

Sample Commissioning File

Download:

description: |
    Sample InfluxDB commissioning file

metadata:
    name: Cybus InfluxDB Example
    provider: cybus
    homepage: https://cybus.io
    version: 1.0.0

#------------------------------------------------------------------------------
# Parameters
#------------------------------------------------------------------------------

parameters:
    influxHost:
        type: string
        description: 'HTTP address of InfluxDB server'
        default: 'influxdbhost'

    influxPort:
        type: integer
        description: 'Influx Port'
        default: 8086

    influxScheme:
        type: 'string'
        description: 'Either use http or https for the server url'
        default: 'http'

#------------------------------------------------------------------------------
# Resources
#------------------------------------------------------------------------------
resources:
    #----------------------------------------------------------------------------
    # Connections
    #----------------------------------------------------------------------------

    influxdbConnection:
        type: Cybus::Connection
        properties:
            protocol: Influxdb
            connection:
                host: !ref influxHost
                token: '-an-influx-db-jwt-token-'
                port: !ref influxPort
                bucket: turbine
                scheme: !ref influxScheme
                flushInterval: 5000

    #----------------------------------------------------------------------------
    # Endpoints
    #----------------------------------------------------------------------------

    turbineWrite:
        type: Cybus::Endpoint
        properties:
            protocol: Influxdb
            connection: !ref influxdbConnection
            write:
                measurement: 'turbine'

    rotaryEncoderWrite:
        type: Cybus::Endpoint
        properties:
            protocol: Influxdb
            connection: !ref influxdbConnection
            write:
                measurement: 'rotary_encoder'

    rotary_encoder_angle:
        type: Cybus::Endpoint
        properties:
            protocol: Influxdb
            connection: !ref influxdbConnection
            subscribe:
                interval: 6000
                query: >
                    from(bucket:"turbine") |> range(start: -1d) |> filter(fn: (r) => r._measurement == "rotary_encoder")

    #----------------------------------------------------------------------------
    # Mappings
    #----------------------------------------------------------------------------

    # A mapping that overrides the measurement value with the name of the MQTT topic
    mappings:
        type: Cybus::Mapping
        properties:
            mappings:
                - subscribe:
                      topic: 'turbine/#'
                  publish:
                      endpoint: !ref turbineWrite
                  rules:
                      - transform:
                            # Add the topic as measurement
                            expression: '$merge([$,{"measurement": $context.topic}])'
                # A mapping that will pass data from the MQTT topic to the write endpoint
                # allowing overriding the measurement by providing it in the topic
                - subscribe:
                      topic: 'encoder/#'
                  publish:
                      endpoint: !ref rotaryEncoderWrite
Connection Properties
Endpoint Properties
3KB
influxdb-example.yml