Rules Objects

A rule is used to perform powerful transformations of data messages right inside the Connectware data processing.

Rules are always specified as a list (array). The order is important, as rules are executed from top to bottom. Some rules might have requirements regarding the format of the input message. Some rules may or may not pass on data messages to the next rule or the output, e.g. when the COV (change of value) filter decides whether the input value needs to be passed on or not.

Rules are always configured as named object with properties specific to the rule type.

The following rules are available today:

parse

This rule parses any non-JSON data from input bytes into a JSON object of the chosen type. The resulting JSON object can then be passed on to other rule operations.

PropertyTypeRequired

enum

Required

format

  • is required

  • type: enum The value of this property must be equal to one of the below:

    • json interprets the input bytes as a stringified JSON object which is parsed back into the JSON object

    • utf8 interprets the input bytes as a utf-8 encoded string and parses this into a string

    • latin1 interprets the input bytes as a latin-1 encoded string (ISO-8859-1) and parses this into a string

    • ascii interprets the input bytes as an ascii encoded string (for 7-bit ascii data only) and parses this into a string. Additionally, supports parsing integer primitives directly.

    • boolean interprets the first byte of the input message as a boolean value. If this first byte (see note below) is non-zero, true is passed on, otherwise false.

    • int8 interprets the input bytes as a signed 8-bit integer, using exactly 1 byte.

    • int16BE interprets the input bytes as a signed, big-endian 16-bit integer, using exactly 2 bytes.

    • int16LE interprets the input bytes as a signed, little-endian 16-bit integer, using exactly 2 bytes.

    • int32BE interprets the input bytes as a signed, big-endian 32-bit integer, using exactly 4 bytes.

    • int32LE interprets the input bytes as a signed, little-endian 32-bit integer, using exactly 4 bytes.

    • uint8 interprets the input bytes as an unsigned 8-bit integer, using exactly 1 byte.

    • uint16BE interprets the input bytes as an unsigned, big-endian 16-bit integer, using exactly 2 bytes.

    • uint16LE interprets the input bytes as an unsigned, little-endian 16-bit integer, using exactly 2 bytes.

    • uint32BE interprets the input bytes as an unsigned, big-endian 32-bit integer, using exactly 4 bytes.

    • uint32LE interprets the input bytes as an unsigned, little-endian 32-bit integer, using exactly 4 bytes.

    • floatBE interprets the input bytes as a big-endian IEEE 754 float, using exactly 4 bytes.

    • floatLE interprets the input bytes as a little-endian IEEE 754 float, using exactly 4 bytes.

    • doubleBE interprets the input bytes as a big-endian IEEE 754 double, using exactly 8 bytes.

    • doubleLE interprets the input bytes as a little-endian IEEE 754 double, using exactly 8 bytes.

    • toBase64 (also base64) parses a plain text input string into a base64 encoded output string

    • fromBase64 parses a base64 encoded input string into a plain text output string

For reference: Big-endian byte order (BE, sometimes also called Motorola or network byte order) refers to the storage order where the most significant byte of the value is stored at the smallest memory address, whereas little-endian (LE, sometimes also called Intel byte order) refers to the other way round, where the least-significant byte is stored at the smallest address. (See also: https://en.wikipedia.org/wiki/Endianness )

For more details about the parsing of input bytes into JSON values see also: https://nodejs.org/api/buffer.html

Note about the boolean parsing: If the input message represents an integer value that is larger than one byte (e.g. a int16 or int32 value), you cannot directly parse this into a boolean by this parse rule, because this rule considers only the first byte. Instead, for multi-byte integers you first have to apply a parse rule with the correct integer size, then apply a parse rule for parsing into boolean.

Example:

description: >
    A parse rule usage example.

metadata:
    name: Parse

resources:
    mapping:
        type: Cybus::Mapping
        properties:
            mappings:
                - subscribe:
                      topic: factory/machine/power
                  publish:
                      topic: factory/machine/state
                  rules:
                      - parse:
                            format: boolean

Download:

The service installed using the commissioning file above will collect the machine power status published on the MQTT topic factory/machine/power and publish it on the factory/machine/state, interpreting the input as boolean.

transform

This rule is used to transform payloads into a new format or structure. Expressions are written in the JSONATA language (see http://jsonata.org/). This is a direct transformation, for every input message, exactly one output message is generated.

PropertyTypeRequired

string

Required

expression

The JSONata expression to apply to the given payload. See http://docs.jsonata.org for a full language reference.

Some simple examples:

  • $ - the input value (payload) in unchanged format, regardless of its type

  • $.value or just shorthand value - the property value of the input JSON object

  • ($.temp - 32) * 5 / 9 - the property temp which denoted a temperature in degrees Fahrenheit, converted to degrees Celsius

  • $number($) * 0.1 - interpreting a non-JSON input value as a number and multiplying it by 0.1

The service installed with the commissioning file below will collect a machine temperature, given in Fahrenheit, published on the MQTT topic machine/temperature/fahrenheit. Then it will convert it to degrees Celsius and publish it to the topic machine/temperature/celsius.

description: >
  A temperature value transformation example: Fahrenheit to degree Celsius.

metadata:
  name: Value transform

resources:
  mapping:
    type: 'Cybus::Mapping'
    properties:
      mappings:
        - subscribe:
            topic: "machine/temperature/farenheit"
          publish:
            topic: "machine/temperature/celsius"
          rules:
            - transform: 
                expression:
                  ( $number($) - 32 ) * 5 / 9

Download:

There are many functions in JSONata that can be used for all common data types. See http://docs.jsonata.org for a full language reference.

Apart from the payload object and the full set of JSONata functions and data types, the following additional functions and values are defined in the context of the Connectware rule engine:

$context

contains protocol specific context information. For example for MQTT, this contains the raw MQTT packet with QoS and Retain information and the topic this message has arrived at. For HTTP, this contains the full response object with statusCodes and all headers.

$source

contains all entries of the source part of this mapping

$target

contains all entries of the target part of this mapping

$context.vars If using wildcard topics, this gives you access to the field values at the wildcard positions. Example: if you have specified a topic factory/productionline1/+machineId/#data you will be able to access the fields $context.vars.machineIdand$context.vars.data

A $context usage example in the transformation rule:

description: >
  This commissioning file is an example of using a $context in a
  transformation rule to reference a topic path specified with a wildcard.

metadata:
  name: Context transform

resources:
  mapping:
    type: 'Cybus::Mapping'
    properties:
      mappings:
        - subscribe:
            topic: "factory/+machineId/temperature"
          publish:
            topic: "factory/all/temperature"
          rules:
            - transform: 
                expression: |
                  {
                    "machine": $context.vars.machineId,
                    "temperature": $
                  }

Download:

$last([label])

refers to the last message that has been sent on this rule stream. This function allows you to aggregate consecutive messages.

label is optional. It can be used to refer to other stash points. If omitted, it defaults to the last message that has completed the full rule chain.

When writing expressions using $last() and object properties, it must be considered that in JSONATA, the dot operator *.* is a map operator, not (only) an object dereferencing operator (see JSONATA reference: path operators ). This can lead to unexpected results if the input message is not a plain value but rather an array of values.

If the expression using $last is not intended to be applied to each input value, potentially the function $lookup (see JSONATA reference: object functions) may be a more useful choice compared to the dot operator: $lookup($last(), "val") resulting in the val property of the object stored in $last.

For example, if the last value stored was { time: 1632908050235 }, and the transform-rule is used with the following expression:

{ 'previous': $last().time, 'time': $millis() }

If the input value is an array with value [1, 2, 3], the result will unexpectedly contain an array value:

{ previous: [1632908050235, 1632908050235, 1632908050235, sequence: true], time: 1632908154336 }

Instead, the dereferencing of the .time property can be done using $lookup, like so:

{
  "previous": $lookup($last(), "time"),
  "time": $millis()
}

This yields the expected result:

{ 'previous': 1632908050235, 'time': 1632908154336 }

In addition to the default Jsonata Functions, the following special functions are available:

$encodeBitmask(array)

takes an array of boolean values and converts into a bitmask. Returns a number

Sample:

[ true, false, false, true, false ]
   2^0   2^1    2^2    2^3   2^4    = 9  ( 00001001 )

Attention: first value in the array will be the right-most bit

$decodeBitmask(number, array)

takes a number and an array of strings as arguments. Returns an object with keys taken from the arrays and values being booleans that result from the bitmask of the key-position being applied to the number

Sample:

$decodeBitmask(9, ["ok", "alarm", "door", "powerOn", "warning"])
         00001001 -> 1      0         0        1         0

{ "ok": true, "alarm": false, "door": false, "powerOn": true, "warning": false }

$bitmask(number, index)

returns a boolean value depending on the bit at position <index> in <number> being set

Sample:

$bitmask(9, 3) -> true     $bitmask(9, 2) -> false

filter

The Filter rule breaks or forwards the message based on the result of an expression. The expression is also written in JSONATA language. This rule does not modify the message in any way. It breaks the message flow, if the expression evaluates to a false value (false, 0 or '')

PropertyTypeRequired

string

Required

expression

Must be a JSONata expression that evaluates to a boolean, but can use the full JSONata function set, see expression.

Example:

description: >
    A filter rule usage example.

metadata:
    name: Filter

resources:
    mapping:
        type: Cybus::Mapping
        properties:
            mappings:
                - subscribe:
                      topic: factory/machine/temperature
                  publish:
                      topic: 'factory/warning/overheat'
                  rules:
                      - filter:
                            expression: $number($) > 95

Download:

The service installed using the commissioning file above will publish on an overheat warning topic /factory/warning/overheat if the machine temperature in the factory exceeds the value of 95 degrees.

Everything below this value will be filtered out.

setContextVars

This rule will store values as context variables. This can be used to pass data from one rule step to a later. Some target protocols (like MQTT and HTTP) also support using those context variables in the output topics or output properties. This can be used to implement MQTT topics depending on the actual data, or dynamic HTTP headers. It is also useful when working with named wildcards.

The rule must be configured with a vars property that lists each identifier. For each identifier, a JSONATA expression is specified that has the same context variables available as the Transform and Filter rule.

PropertyTypeRequired

vars

Object

Required

setMissingAsUndefined

boolean

Optional, default: false

vars

An object of key: expression properties where key is any identifier to be stored as a context variable, and expression is the JSONata expression whose evaluated value is assigned to the context variable key.

setMissingAsUndefined (default: false)

Chooses the behaviour if configured context variable cannot be set from the input message values. If true, any missing context variables from the vars object are set to the string undefined, and the rule engine and output publish continues as normal. If false, the input message is discarded and an error message is logged, like so: Failed processing setContextVars, because: setContextVars rule should set value “contextKey” from property “someKey” but it is missing in the message: { “wrongKey”: “123” }.

Example

The following example takes the productionLine and machineId wildcard from the source subscription and creates a concatenated string to be stored in the context variable identifier. This variable can be used in the target publish topic as another wildcard name.

description: >
  A setContextVars rule usage example.

metadata:
  name: setContextVars

resources:
  mapping:
    type: Cybus::Mapping
    properties:
      mappings:
        - subscribe:
            topic: factory/+productionLine/+machineId/temperature
          publish:
            topic: factory/temperatures
          rules:
            - setContextVars:
                vars:
                  identifier: |
                    $join([
                      $context.vars.productionLine,
                      $context.vars.machineId
                    ], '-')
            - transform:
                expression: |
                  {
                    "id": $context.vars.identifier,
                    "temperature": $
                  }

Download:

cov

Simple Change-of-Value (COV) filter that only forwards data when it has changed compared to the previous message. The filter does not distinguish different input topics and works on all messages of a rule chain and output topic as if they belonged to the same source.

Supports only looking at one specified key (e.g. to avoid confusion by timestamps) and deadbands (to filter noise). Expects JSON input format. Usually outputs less messages than it receives.

PropertyTypeRequired

string or array of string

Required

number

Optional

enum

Optional

label

string

Optional

key

The name of the JSON property to filter. Currently, this filter only supports a single key, either as top-level property of the JSON object (when given as a string) or as a property inside some nested object in the JSON object (when given as an array of string, which specify the property path). You can chain multiple cov rule definitions to look at multiple keys.

Example for filtering any message { "temperature": 12.34 } for changing value of the temperature property:

description: >
    A COV on change rule example.

metadata:
    name: COV on change

resources:
    mapping:
        type: Cybus::Mapping
        properties:
            mappings:
                - subscribe:
                      topic: 'machine/sensor/raw'
                  publish:
                      topic: 'machine/sensor/changed'
                  rules:
                      - cov:
                            key: temperature

Download:

If a list of keys is specified, they are treated as a deep path to a JSON object property. If the path is not resolvable, because on item of the path does not exist in a message, no error is thrown but all input messages are ignored. The COV filter will accept and forward only those messages on which the path is resolvable.

Example for filtering a message such as { "values": [ { "temperature": 85.3 } ] } according to the temperature property:

description: >
    A COV on change rule with deep path key example.

metadata:
    name: COV on change with deep path

resources:
    mapping:
        type: Cybus::Mapping
        properties:
            mappings:
                - subscribe:
                      topic: 'machine/sensor/raw'
                  publish:
                      topic: 'machine/sensor/changed'
                  rules:
                      - cov:
                            key:
                                - values
                                - 0
                                - temperature

Download:

(Note: Integer numbers such as the index 0 above can be written either as plain integer values, or within quotes '0' so that they are interpreted as strings. Both is allowed.)

deadband

for numeric data, you can specify a deadband that must be exceeded before a change is detected. If this parameter is omitted, the filter compares for exact matches.

deadbandMode

value values are relative and absolute. In relative mode, the deadband parameter is treated as a percentage, thus if the parameter is 10, the next value is only released if it deviates by more than 10% from the previous value. Default is absolute.

label

The Cov filter looks up the stash to compare the current value with the last one. By default, the CoV Filter compares the current input value with the last value that was let through by this filter. You can let the filter compare the value to another stored message by specifying a label, either a label you have specified yourself in another stash, or you can use the labels input or lastOutput or lastInput, each being set automatically at the beginning and the end of the rule engine chain. If custom labels are used, be aware that you need to explicitely store them with a stash rule, otherwise the filter will let everything through.

An example of deadband, which apply the COV rule on a temperature change over 10%, is shown below:

description: >
    A COV on percentage change rule example.

metadata:
    name: COV on percentage change

resources:
    mapping:
        type: Cybus::Mapping
        properties:
            mappings:
                - subscribe:
                      topic: 'machine/sensor/raw'
                  publish:
                      topic: 'machine/sensor/changed'
                  rules:
                      - cov:
                            key: temperature
                            deadband: 10
                            deadbandMode: relative

Download:

burst

Simple burst-mode that allows aggregation of many messages into a payload array. This operates on a time interval or on a fixed size. It collects all messages that arrive, joins them into an array (with their respective timestamps) and forwards them as a single message. Interval and size constraints can be combined. If none of them is specified, the rule has no effect.

PropertyTypeRequired

integer

Optional

integer

Optional

label

string

Optional

interval

Time interval in milliseconds to send out the combined message after its first message was received.

(Note: If no message at all has been received, no output message will be sent, too. This option does not implement a regular time-based output message for the case when no input data arrives.)

maxSize

Maximum number of messages in the burst aggregation. The aggregation message will be forwarded once this number of messages have arrived.

If both maxSize and interval were specified, the combined message will be sent according to whatever criterion is fulfilled first. This probably means the output message triggered by the interval criterion contains less array elements than the output message triggered by the maxSize criterion.

label

The burst rule stores the current burst aggregation in the stash using a label. If more than one aggregation is used, different labels must be assigned by the user. The default is burst

The example service commissioning file below will collect every message published on the MQTT topic factory/machine/temperature/single in a buffer. Once the number of messages reaches the configured maxSize or the time since last publishing of the buffer reaches the configured interval an array of all the collected values is published to factory/machine/temperature/combined.

description: >
    A burts rule example.

metadata:
    name: Burst

resources:
    mapping:
        type: Cybus::Mapping
        properties:
            mappings:
                - subscribe:
                      topic: 'factory/machine/temperature/single'
                  publish:
                      topic: 'factory/machine/temperature/combined'
                  rules:
                      - burst:
                            interval: 300000
                            maxSize: 100

Download:

stash

This rule can be used to stash intermediate states of messages for later reference. These can be refered to by filter or transform rule. This may for example be used when Cov and Burst rule are used in combination.

PropertyTypeRequired

label

string

Optional

label

This label is used to refer to this stash-point. This needs to be unique only within one rules statement. It is no problem to use the same label in multiple rules statements. The default is stash

As the stash rule is used only to store intermediate states for later reference it can’t be used on its own. Instead another rule provide values to this rule like in the following example:

description: >
    A stash usage example

metadata:
    name: Stash

resources:
    mapping:
        type: 'Cybus::Mapping'
        properties:
            mappings:
                - subscribe:
                      topic: 'maschine/raw/temperature'
                  publish:
                      topic: 'machine/enhanced/temperature'
                  rules:
                      - transform:
                            expression: |
                                {
                                  "previous": $last("cache").current,
                                  "current": current
                                }
                      - stash:
                            label: cache

Download:

The service installed using the commissioning file above will collect the current temperature published on the MQTT topics factory/machine/raw/temperature, and will publish it, enhanced with the value of the previous measurement, to the factory/machine/enhanced/temperature.

collect

This rule can be used to collect values from multiple endpoints configured in the subscribe property of a Cybus::Mapping and publish them together in a single object. The rule will function as a Last Value Cache, so that the output object contains a key for each of the subscribed endpoints/topics, potentially using its respective label property if it was set.

The last message on each of the subscribe entries will be sent to the output. At start-up, the output object starts empty, and keys will be added as they arrive, one after the other.

This rule does not take any property. Due to YAML format rules, you must write the rule together with the expression for an empty object {}, like so:

description: >
    A collect rule usage example.

metadata:
    name: Collect

resources:
    mapping:
        type: Cybus::Mapping
        properties:
            mappings:
                - subscribe:
                      topic: factory/+machine/+sensor
                      label: '{machine}.{sensor}'
                  publish:
                      topic: 'factory/data'
                  rules:
                      - collect: {}

Download:

The service installed using the commissioning file above will collect the data published on all MQTT topics factory/+machine/+sensor and publish it on the factory/data using the label machine.sensor.

The example uses dynamic label which means the label is going to be constructed from the values of the actual topic to which data was published too.

Last updated

Logo

© Copyright 2024, Cybus GmbH