SQL

Overview

This protocol implementation is a wrapper around the excellent Node.js ORM (Object-Relational Mapping tool) Sequelize (https://sequelize.org/)

It provides read, write and subscribe capabilities to most common databases.

Currently, only MariaDB and Postgres are supported.

Usage

In general, the implementation works by specifying a query or query template.

The following examples use a sample table called people that looks like this:

id

name

lastname

gender

1

Alice

Miller

female

2

Bob

Jones

male

Connection

To connect to a database, we only need a URL when specifying the Cybus::Connection resource:

<schema>://<user>:<password>@<host>:<port>/<database>

For example, to connect to a MariaDB database use this:

'mariadb://johndoe:my-secret-pw@localhost:3306/test'

To connect to a Postgres database use this:

'postgres://johndoe:my-secret-pw@localhost/test'

It is recommended to define these properties as parameters in the commissioning file and reference them in the connection settings using !ref, so that the actual value can be edited at deployment time. See Example Commissioning File below.

More details about the connection properties are described below: Connection Properties

Reading Data

To read data from a database, an endpoint has to be defined with either read or subscribe properties. Subscribe works by defining a polling interval, hence the query will be executed on a regular basis. Read is executed each time a MQTT message is sent to the respective endpoint topic with the /req (request) suffix, where the result is sent to the endpoint topic with the /res (result) suffix. The result of the query is provided in JSON format on the MQTT broker.

Example endpoint definition:

sqlQuery1:
  type: Cybus::Endpoint
  properties:
    protocol: Sql
    connection: !ref sqlConnection
    subscribe:
      query: 'SELECT * FROM people'
      interval: 2000

This endpoint will execute the given query and return the data as MQTT messages like in the following example. If no rows are returned, you will receive an empty array ([]) as a value.

{
  "timestamp": 1231782312, // unix timestamp in ms
  "value": [
    {
      "id": 1,
      "name": "Alice",
      "lastname": "Miller"
      "gender": "female"
    },
    {
      "id": 2,
      "name": "Bob",
      "lastname": "Jones"
      "gender": "male"
    }
  ]
}

The SQL query definition can be defined as a template string containing placeholders in the WHERE clause. In the template, the dollar sign character $ followed by an identifier is used to denote such placeholders [1]. The placeholders will be replaced by the values from the payload of the JSON message received via MQTT. If no placeholders are defined, the query will simply be executed as-is.

If the SQL query contains placeholder definitions, all their names must exist in the message payload, otherwise an error will be logged and the message will be ignored. The value of the placeholders must have the right data format matching the target schema of the database.

Example endpoint definition:

sqlQuery1:
  type: Cybus::Endpoint
  properties:
    protocol: Sql
    connection: !ref sqlConnection
    read:
      query: 'SELECT * FROM people WHERE lastname = $lastname'

Sending a message to the /req topic of this Endpoint with the following payload:

{
  "lastname": "Miller"
}

will return results filtered based on the where clause configured:

{
  "timestamp": 1231792312, // unix timestamp in ms
  "value": [
    {
      "id": 1,
      "name": "Alice",
      "lastname": "Miller"
      "gender": "female"
    }
  ]
}

Output Format on Read

When data is read from SQL results are published to the /res topic of the Endpoint. The output message is an object with two properties:

  • timestamp: is the unix timestamp, in milliseconds, of when the read was executed

  • value: is an array of results as returned by the SQL query

Writing Data

To write data to the database, an endpoint with write properties has to be defined. This endpoint needs the definition of an SQL query with a query template containing variables. The variables will be filled by the values of the payload of the JSON message received via MQTT.

In the SQL query definition, the query syntax is used as a template string containing placeholders. In the template, the at-character $ followed by an identifier is used to denote such placeholders [1]. The placeholders will be replaced by the values from the payload of the JSON message received via MQTT.

All specified template variables must exist in the payload and must have the right target format matching the schema of the database.

Example endpoint definition:

sqlQuery1:
  type: Cybus::Endpoint
  properties:
    protocol: Sql
    connection: !ref sqlConnection
    write:
      query: 'INSERT INTO people (name, lastname, gender) VALUES ($name, $lastname, $gender)'

When using bulk insert you need to specify the endpoint like this:

sqlQuery2:
  type: Cybus::Endpoint
  properties:
    protocol: Sql
    connection: !ref sqlConnection
    write:
      query: 'INSERT INTO people (name, lastname, gender) VALUES'
      queryValues: '($name, $lastname, $gender)'

To write data, you must send a MQTT message like the following to the /set topic of the Endpoint:

{
  "name": "Alice",
  "lastname": "Miller",
  "gender": "female"
}

Alternatively, you can also send multiple rows into a single message for performance reasons like this:

Important

When using this method of insertion make sure all rows have the same amount of columns

You will also need to specify the parameter queryValues in the endpoint definition.

[
  {
    "name": "Alice",
    "lastname": "Miller",
    "gender": "female"
  },
  {
    "name": "John",
    "lastname": "Clark",
    "gender": "male"
  }
]

Important

The SQL connection on the Connectware side does not perform any data validation against the database schema. The senders of the MQTT messages themselves must ensure to send the data in the correct format.

Output Format on Write

When data is written to an SQL Endpoint a message is published to the /res topic of the Endpoint. The output message is an object with two properties:

  • timestamp: is the unix timestamp, in milliseconds, of when the write was executed

  • value: is set to true when the write was successful

Connection Properties

url (string, required)

The URL of the database server

Example: "mariadb://user:pass@example.com:5432/dbname"

probeConnectionDelay (integer)

The number of milliseconds between checking the connection for connectivity

Default: 10000

logLevel (string, enum)

Configures the additional log level for the connection. If set to error (default), only errors are logged. If set to debug, additionally all SQL queries are logged.

This element must be one of the following enum values:

  • error

  • debug

Default: "error"

trustAllCertificates (boolean)

If true, all certificates will be accepted, regardless of whether they can be validated or not. Use this option if self-signed server certificates should be accepted, or if there are other reasons which prevent this client to validate the certificates.

Default: false

Examples: true, false

connectionStrategy (object)

If a connection attempt fails, retries will be performed with increasing delay (waiting time) in between. The following parameters control how these delays behave.

Properties of the connectionStrategy object:

initialDelay (integer)

Delay (waiting time) of the first connection retry (in milliseconds). For subsequent retries, the delay will be increased according to the parameter incrementFactor which has a default value of 2.

Default: 1000

Additional restrictions:

  • Minimum: 1000

maxDelay (integer)

Maximum delay (waiting time) to wait until the next retry (in milliseconds). The delay (waiting time) for any subsequent connection retry will not be larger than this value. Must be strictly greater than initialDelay.

Default: 30000

incrementFactor (integer)

The factor used to increment initialDelay up to maxDelay. For example if initialDelay is set to 1000 and maxDelay to 5000 the values for the delay would be 1000, 2000, 4000, 5000.

Default: 2

Additional restrictions:

  • Minimum: 2

Endpoint Properties

query (string, required)

The SQL query used to write or to subscribe

Example: "INSERT INTO test (some_column) VALUES ($some_column)"

queryValues (string)

The SQL query values used to bulk insert

Example: "($id, $name, $age)"

interval (integer)

The amount of milliseconds between queries. Only required when subscribing

Default: 1000

cronExpression (string)

The Cron expression used to poll the endpoint. (For examples, see: https://github.com/node-cron/node-cron)

Examples: "1,2,4,5 * * * *", "1-5 * * * *", "*/2 * * * *", "* * * January,September Sunday"

Example Commissioning File

Download: sql-example.yml

 1---
 2description: >
 3  Sample SQL service commissioning file
 4
 5metadata:
 6  name: Sample SQL service
 7  icon: https://www.cybus.io/wp-content/uploads/2017/10/for-whom1.svg
 8  provider: cybus
 9  homepage: https://www.cybus.io
10  version: 1.0.0
11
12parameters:
13  sqlURL:
14    type: string
15    default: 'mariadb://root:my-secret-pw@localhost:3306/mysql'
16  initialReconnectDelay:
17    type: integer
18    default: 1000
19  maxReconnectDelay:
20    type: integer
21    default: 30000
22  factorReconnectDelay:
23    type: integer
24    default: 2
25
26resources:
27  sqlConnection:
28    type: Cybus::Connection
29    properties:
30      protocol: Sql
31      connection:
32        url: !ref sqlURL
33        connectionStrategy:
34          initialDelay: !ref initialReconnectDelay
35          maxDelay: !ref maxReconnectDelay
36          incrementFactor: !ref factorReconnectDelay
37
38  sqlQuery1:
39    type: Cybus::Endpoint
40    properties:
41      protocol: Sql
42      connection: !ref sqlConnection
43      subscribe:
44        query: 'SELECT User FROM user'
45        # Be very careful with this setting. A low value might overload
46        # the database (the unit is milliseconds!).
47        interval: 2000
48
49  sqlQuery2:
50    type: Cybus::Endpoint
51    properties:
52      protocol: Sql
53      connection: !ref sqlConnection
54      write:
55        # Here we use the placeholder $someValue. The protocol driver will insert
56        # the value from the input JSON message under the key someValue
57        query: 'INSERT INTO test (id, some_column) VALUES ($id, $someValue)'
58
59  mapping:
60    type: Cybus::Mapping
61    properties:
62      mappings:
63        - subscribe:
64            endpoint: !ref sqlQuery1
65          publish:
66            topic: 'database-users'
67        - subscribe:
68            topic: 'database-insert-test'
69          publish:
70            endpoint: !ref sqlQuery2

Footnotes