Data Processing Rules

For data transformation, manipulation, and processing with the Rule Engine, various data processing rules are available. You can define these rules within the resources section of your service commissioning files.

For more information on the resources, see Resources. You can apply rules in the endpoint resources, mapping resources, and node resources.

The following data processing rules are available:

burst

This burst-mode mechanism consolidates multiple messages into a single payload array. It functions based on either a time interval or on a fixed number of messages. The system accumulates incoming messages and combines them into an array that includes their respective time stamps.

Once the specified condition is met (time elapsed or message count reached), it transmits the entire collection as a single message. You can set both interval and size limits for more precise control. If neither condition is specified, the rule becomes inactive and messages pass through unaffected.

PropertyTypeRequired

integer

Optional

integer

Optional

string

Optional

interval

This parameter defines the time window, measured in milliseconds, during which incoming messages are collected before being sent as a combined output. The countdown starts when the first message in the current batch is received.

This feature is designed to aggregate existing messages, not to generate output at regular intervals regardless of input. If no messages are received during the specified interval, no output will be generated. It does not create placeholder messages in the absence of input data.

maxSize

This parameter sets the upper limit on the number of messages that can be combined into a single burst. Once this threshold is reached, the collected messages are packaged and forwarded as one combined message.

If both maxSize and interval are defined, the system will trigger the forwarding of the aggregated message based on whichever condition is met first. As a result, messages sent due to the interval condition may contain fewer elements than those sent due to reaching maxSize. This dual-criteria approach allows for flexible message aggregation based on either time elapsed or quantity of messages received.

label

This parameter allows you to assign a unique identifier to the current burst aggregation in the stash. When using multiple aggregations, you must assign distinct labels for each burst aggregation. If not specified, the default label is burst.

Example: burst.yml

This example demonstrates a service configuration that aggregates messages from the MQTT topic factory/machine/temperature/single.

The system collects incoming messages in a buffer, which is then published as an array to factory/machine/temperature/combined when either the number of collected messages reaches the predefined maxSize or the time since the last buffer publication equals or exceeds the set interval.

This approach enables efficient batching of individual temperature readings into combined data sets, which can be beneficial for reducing network traffic or processing data in larger chunks.

description: >
    A burts rule example.

metadata:
    name: Burst

resources:
    mapping:
        type: Cybus::Mapping
        properties:
            mappings:
                - subscribe:
                      topic: 'factory/machine/temperature/single'
                  publish:
                      topic: 'factory/machine/temperature/combined'
                  rules:
                      - burst:
                            interval: 300000
                            maxSize: 100

collect

This rule is designed to aggregate data from multiple endpoints specified in the subscribe property of a Cybus::Mapping. It functions as a Last Value Cache, creating a single object that contains the most recent data from each subscribed endpoint or topic. Each data point in the output object is identified by a key, which can be the endpoint itself or a custom label if one has been defined.

This rule initializes with an empty output object and progressively adds keys as data arrives from the various subscribed sources. It doesn't require any additional properties, but due to YAML formatting requirements, it must be written with an empty object expression {}.

Example: collect.yml

This example demonstrates how you can use the collect rule to gather data from all MQTT topics matching the pattern factory/+machine/+sensor. The collected data is then published to factory/data using dynamically constructed labels in the format machine.sensor.

This approach allows for flexible data aggregation and republishing, adapting to the specific structure of the incoming topic names.

description: >
    A collect rule usage example.

metadata:
    name: Collect

resources:
    mapping:
        type: Cybus::Mapping
        properties:
            mappings:
                - subscribe:
                      topic: factory/+machine/+sensor
                      label: '{machine}.{sensor}'
                  publish:
                      topic: 'factory/data'
                  rules:
                      - collect: {}

cov

The change-of-value (COV) filter only transmits data when it identifies a difference from the preceding message. This filter treats all messages in a rule chain and output topic as if they originate from a single source, without differentiating between input topics. It can be configured to monitor a single specified key, which helps prevent interference from variables like timestamps.

The COV filter implements deadbands to reduce noise and insignificant fluctuations in the data. It requires input in JSON format and typically produces fewer output messages compared to the number of inputs it processes, as it only forwards when changes are detected. This approach helps streamline data flow by eliminating redundant or insignificant updates, focusing on meaningful changes in the monitored values.

PropertyTypeRequired

string or array of string

Required

number

Optional

enum

Optional

string

Optional

key

The name of the JSON property to filter. Currently, this filter only supports a single key, either as top-level property of the JSON object (when given as a string) or as a property inside some nested object in the JSON object (when given as an array of string, which specify the property path). You can chain multiple cov rule definitions to look at multiple keys.

Example: cov-on-change.yml

This example demonstrates how to apply a filter to messages in the format { "temperature": 12.34 }. The filter reacts to changes in the value of the temperature property. It will only pass on messages where the temperature reading is different from the previous one to highlight changes in temperature data.

description: >
    A COV on change rule example.

metadata:
    name: COV on change

resources:
    mapping:
        type: Cybus::Mapping
        properties:
            mappings:
                - subscribe:
                      topic: 'machine/sensor/raw'
                  publish:
                      topic: 'machine/sensor/changed'
                  rules:
                      - cov:
                            key: temperature

Example: cov-on-change-deep-key.yml

You can use a deep path to access nested properties within a JSON object for change-of-value (COV) filtering. The example demonstrates how to filter based on changes in the temperature property nested within an array. The filter can process messages with complex structures, such as { "values": [ { "temperature": 85.3 } ] }, by specifying a list of keys that form a path to the desired property.

When using this approach, the filter will only accept and forward messages where the entire path can be resolved. If any part of the path is missing in a message, that message is silently ignored without throwing an error.

You can specify array indices in the path in plain integer values (e.g., 0) or in quoted string versions (e.g., '0').

description: >
    A COV on change rule with deep path key example.

metadata:
    name: COV on change with deep path

resources:
    mapping:
        type: Cybus::Mapping
        properties:
            mappings:
                - subscribe:
                      topic: 'machine/sensor/raw'
                  publish:
                      topic: 'machine/sensor/changed'
                  rules:
                      - cov:
                            key:
                                - values
                                - 0
                                - temperature

deadband

When dealing with numerical data, you can define a deadband threshold. A change is only detected when the new value surpasses this threshold compared to the previous value. If you do not specify this parameter, the filter will detect any change.

deadbandMode

This parameter allows you to set the mode of the deadband to relative or absolute:

  • In relative mode, the deadband is interpreted as a percentage. For example, if set to 10, a new value will only be passed through if it differs by more than 10% from the last recorded value.

  • In absolute mode (which is the default setting), the deadband represents a fixed numeric difference.

Example: cov-on-percentage-change.yml

This example demonstrates the use of a deadband in a change-of-value (COV) rule. In this case, the rule is configured to detect and respond to temperature changes that exceed 10% of the previous value. This approach helps filter out small fluctuations while capturing significant temperature changes.

description: >
    A COV on percentage change rule example.

metadata:
    name: COV on percentage change

resources:
    mapping:
        type: Cybus::Mapping
        properties:
            mappings:
                - subscribe:
                      topic: 'machine/sensor/raw'
                  publish:
                      topic: 'machine/sensor/changed'
                  rules:
                      - cov:
                            key: temperature
                            deadband: 10
                            deadbandMode: relative

label

The change-of-value (COV) filter uses a stash system to compare current and previous values. By default, it compares the incoming value to the last value that passed through this filter. However, you can change this behavior by specifying a label:

  • Custom label: You can define your own label in a separate stash rule.

  • Built-in labels:

    • input: The initial value at the start of the rule chain.

    • lastOutput: The final value that passed through the entire rule chain.

    • lastInput: The last input value.

When using custom labels, you must explicitly store them using a stash rule. Without proper storage, the filter will allow all values to pass because it will have no reference point to compare against.

filter

The filter rule determines whether a message is forwarded or discarded based on an JSONATE expression. It does not modify the message content but disrupts the message flow if the expression evaluates to a false value, such as false, 0, or an empty string ''.

PropertyTypeRequired

string

Required

expression

The expression must be a JSONata expression that evaluates to a boolean, and it can utilize the full range of JSONata functions. For more details, see expression.

Example: filter.yml

The service installed using the sample service commissioning file publishes a message to the overheat warning topic /factory/warning/overheat if the machine temperature exceeds 95 degrees. Any temperature readings below this threshold will be filtered out and not forwarded.

description: >
    A filter rule usage example.

metadata:
    name: Filter

resources:
    mapping:
        type: Cybus::Mapping
        properties:
            mappings:
                - subscribe:
                      topic: factory/machine/temperature
                  publish:
                      topic: 'factory/warning/overheat'
                  rules:
                      - filter:
                            expression: $number($) > 95

parse

This rule converts non-JSON input data into a JSON object based on the specified format. The resulting JSON object can then be passed on to other rule operations.

PropertyTypeRequired

enum

Required

format

  • is required

  • type: enum

The value of this property must be equal to one of the following:

ValueDescription

json

Interprets the input bytes as a stringified JSON object which is parsed back into the JSON object

utf8

Interprets the input bytes as a utf-8 encoded string and parses this into a string

latin1

Interprets the input bytes as a latin-1 encoded string (ISO-8859-1) and parses this into a string

ascii

Interprets the input bytes as an ascii encoded string (for 7-bit ascii data only) and parses this into a string. Additionally, supports parsing integer primitives directly.

boolean

Interprets the first byte of the input message as a boolean value. If this first byte is non-zero, true is passed on, otherwise false.

int8

Interprets the input bytes as a signed 8-bit integer, using exactly 1 byte.

int16BE

Interprets the input bytes as a signed, big-endian 16-bit integer, using exactly 2 bytes.

int16LE

Interprets the input bytes as a signed, little-endian 16-bit integer, using exactly 2 bytes.

int32BE

Interprets the input bytes as a signed, big-endian 32-bit integer, using exactly 4 bytes.

int32LE

Interprets the input bytes as a signed, little-endian 32-bit integer, using exactly 4 bytes.

uint8

Interprets the input bytes as an unsigned 8-bit integer, using exactly 1 byte.

uint16BE

Interprets the input bytes as an unsigned, big-endian 16-bit integer, using exactly 2 bytes.

uint16LE

Interprets the input bytes as an unsigned, little-endian 16-bit integer, using exactly 2 bytes.

uint32BE

Interprets the input bytes as an unsigned, big-endian 32-bit integer, using exactly 4 bytes.

uint32LE

Interprets the input bytes as an unsigned, little-endian 32-bit integer, using exactly 4 bytes.

floatBE

Interprets the input bytes as a big-endian IEEE 754 float, using exactly 4 bytes.

floatLE

Interprets the input bytes as a little-endian IEEE 754 float, using exactly 4 bytes.

doubleBE

Interprets the input bytes as a big-endian IEEE 754 double, using exactly 8 bytes.

doubleLE

Interprets the input bytes as a little-endian IEEE 754 double, using exactly 8 bytes.

toBase64

Also base64. Parses a plain text input string into a base64 encoded output string

fromBase64

Parses a base64 encoded input string into a plain text output string

Byte Order

Big-endian (BE), also known as Motorola or network byte order, stores the most significant byte at the smallest memory address. Little-endian (LE), also referred to as Intel byte order, stores the least significant byte first. For more information on byte order, see https://en.wikipedia.org/wiki/Endianness.

Parsing Boolean Values

When parsing boolean values, only the first byte of the input message is used. For multi-byte integers (e.g., int16 or int32), first apply the appropriate integer parse rule, then convert to boolean.

For more details on parsing input bytes into JSON, see the Node.js documentation.

Example: parse.yml

The service installed with the sample service commissioning file collects the machine's power status from the MQTT topic factory/machine/power and publishes it to the topic factory/machine/state. The input data is interpreted as a boolean value.

description: >
    A parse rule usage example.

metadata:
    name: Parse

resources:
    mapping:
        type: Cybus::Mapping
        properties:
            mappings:
                - subscribe:
                      topic: factory/machine/power
                  publish:
                      topic: factory/machine/state
                  rules:
                      - parse:
                            format: boolean

setContextVars

This rule stores values as context variables that can be passed from one rule step to another. These variables can be used by target protocols such as MQTT and HTTP in output topics or properties, allowing dynamic implementation of MQTT topics or HTTP headers. This is also useful when working with named wildcards.

To configure the rule, you must specify a vars property that includes each identifier and its corresponding JSONata expression. The same context variables used in the transform and filter rules are available for use here as well.

PropertyTypeRequired

Object

Required

boolean

Optional, default: false

vars

This object contains key: expression properties.

  • key: An identifier for storing a context variable.

  • expression: A JSONata expression whose evaluated result is assigned to the context variable key.

setMissingAsUndefined

Default value: false

Determines the system's response when a context variable from the vars object cannot be set using input message values:

  • If set to true: Missing context variables are set to the string undefined, allowing Rule Engine and output publishing to proceed normally

  • If set to false: The input message is discarded and an error message is logged. For example: Failed processing setContextVars, because: setContextVars rule should set value "contextKey" from property "someKey" but it is missing in the message: { "wrongKey": "123" }.

Example: setcontextvars.yml

This example demonstrates how to create a concatenated string that is stored in the context variable identifier using the productionLine and machineId wildcards from the source subscription. You can then use the resulting identifier variable as a wildcard name in the target publish topic.

description: >
    A setContextVars rule usage example.

metadata:
    name: setContextVars

resources:
    mapping:
        type: Cybus::Mapping
        properties:
            mappings:
                - subscribe:
                      topic: factory/+productionLine/+machineId/temperature
                  publish:
                      topic: factory/temperatures
                  rules:
                      - setContextVars:
                            vars:
                                identifier: |
                                    $join([
                                      $context.vars.productionLine,
                                      $context.vars.machineId
                                    ], '-')
                      - transform:
                            expression: |
                                {
                                  "id": $context.vars.identifier,
                                  "temperature": $
                                }

stash

This rule allows you to store intermediate message states for future use. These stored states can be accessed by filter or transform rules later in the processing chain. This functionality is particularly useful when combining different rules, such as change-of-value (CoV) and burst rules. By stashing intermediate results, you can create more complex data processing workflows, allowing rules to reference or operate on previously processed data at different stages of the message handling process.

PropertyTypeRequired

string

Optional

label

This parameter assigns a unique identifier to a specific stash point within a single rules statement. The same label can be reused across different rules statements without conflict. If not specified, the default label is stash.

Example: stash.yml

The stash rule in this example is designed to work in conjunction with other rules, as its primary function is to store intermediate states for later use. It cannot operate independently.

This service configuration demonstrates the practical application of the stash rule. It monitors the MQTT topic factory/machine/raw/temperature for current temperature readings. The system then enhances this data by combining it with the previous measurement, which has been stored using the stash rule. The resulting enhanced data, containing both the current and previous temperature values, is then published to the ´factory/machine/enhanced/temperature` topic.

This approach allows for more context-rich temperature reporting by providing a comparison point with each new reading.

description: >
    A stash usage example

metadata:
    name: Stash

resources:
    mapping:
        type: 'Cybus::Mapping'
        properties:
            mappings:
                - subscribe:
                      topic: 'maschine/raw/temperature'
                  publish:
                      topic: 'machine/enhanced/temperature'
                  rules:
                      - transform:
                            expression: |
                                {
                                  "previous": $last("cache").current,
                                  "current": current
                                }
                      - stash:
                            label: cache

transform

This rule converts payloads into a different format or structure using JSONata expressions (http://jsonata.org/). Each input message generates one output message.

PropertyTypeRequired

string

Required

expression

The JSONata expression that you can apply to the given payload. See http://docs.jsonata.org for a full language reference.

Expression examples:

  • $ - Represents the payload in its original format, regardless of type.

  • $.value or simply value - Accesses the value property of the input JSON object.

  • ($.temp - 32) * 5 / 9 - Converts the temp property from Fahrenheit to Celsius.

  • $number($) * 0.1 - Interprets a non-JSON input value as a number and multiplies it by 0.1.

Example: value-transform.yml

The service installed with the sample service commissioning file collects a machine temperature given in Fahrenheit and published on the MQTT topic machine/temperature/fahrenheit. It will then convert it to degrees Celsius and publish it to the machine/temperature/celsius topic.

description: >
    A temperature value transformation example: Fahrenheit to degree Celsius.

metadata:
    name: Value transform

resources:
    mapping:
        type: 'Cybus::Mapping'
        properties:
            mappings:
                - subscribe:
                      topic: 'machine/temperature/farenheit'
                  publish:
                      topic: 'machine/temperature/celsius'
                  rules:
                      - transform:
                            expression: ( $number($) - 32 ) * 5 / 9

Additional Transformation Functions

In addition to the payload object and the full range of JSONata functions and data types, the Connectware Rule Engine also provides functions and values. These include protocol-specific context information, source and target mapping data, and access to wildcard field values. This enhances the flexibility of data transformations and processing within the system.

$context

This function contains protocol-specific context information. For example, in MQTT, it includes the raw MQTT packet, QoS, Retain information, and the message topic. For HTTP, it provides the full response object, including status codes and headers.

  • $source: Contains all entries from the source part of the mapping.

  • $target: Contains all entries from the target part of the mapping.

  • $context.vars: When using wildcard topics, this provides access to the values at wildcard positions, such as machine ID or data fields in dynamic topics. For example, if you define a topic like factory/productionline1/+machineId/#data, you can access the specific values at the wildcard positions, such as $context.vars.machineId and $context.vars.data.

Example: context-transform.yml

This example demonstrates the practical application of the $context variable in a data transformation scenario. It shows how context-specific information can be integrated into the rule to modify or enhance the data being processed.

description: >
    This commissioning file is an example of using a $context in a
    transformation rule to reference a topic path specified with a wildcard.

metadata:
    name: Context transform

resources:
    mapping:
        type: 'Cybus::Mapping'
        properties:
            mappings:
                - subscribe:
                      topic: 'factory/+machineId/temperature'
                  publish:
                      topic: 'factory/all/temperature'
                  rules:
                      - transform:
                            expression: |
                                {
                                  "machine": $context.vars.machineId,
                                  "temperature": $
                                }

$last([label])

This function references the last message sent in the rule stream, allowing you to aggregate successive messages. The optional label can point to other stash points and defaults to the last message processed in the rule chain if not specified.

When using $last() with object properties, note that in JSONata the dot operator *.* functions as a map operator and not just as an object operator. This can cause unexpected behavior if the input message is an array rather than a single value. For more information, see JSONata reference: path operators.

If you do not want the $last expression to apply to each input value, using the $lookup function may be more suitable than using the dot operator. For example, $lookup($last(), "val") retrieves the val property from the last stored object. For more information, see JSONata reference: object functions.

Example

The previous stored value was { time: 1632908050235 } and you apply the following transformation rule:

{ 'previous': $last().time, 'time': $millis() }

If the input value is an array with value [1, 2, 3], the result will unexpectedly contain an array value:

{ previous: [1632908050235, 1632908050235, 1632908050235, sequence: true], time: 1632908154336 }

To avoid this, use $lookupto correctly reference the .time property:

{
  "previous": $lookup($last(), "time"),
  "time": $millis()
}

This results in the expected output:

{ 'previous': 1632908050235, 'time': 1632908154336 }

$encodeBitmask(array)

This function converts an array of boolean values into a bitmask and returns the result as a number.

Note: The first value in the array is the rightmost bit.

[ true, false, false, true, false ]
   2^0   2^1    2^2    2^3   2^4    = 9  ( 00001001 )

$decodeBitmask(number, array)

This function accepts a number and an array of strings as input. It returns an object where the keys are taken from the array, and the corresponding values are booleans derived by applying the bitmask of the key's position to the number.

$decodeBitmask(9, ["ok", "alarm", "door", "powerOn", "warning"])
         00001001 -> 1      0         0        1         0

{ "ok": true, "alarm": false, "door": false, "powerOn": true, "warning": false }

$bitmask(number, index)

This function returns a boolean value depending on whether the bit at position <index> in <number> is set.

$bitmask(9, 3) -> true     $bitmask(9, 2) -> false

Last updated

Logo

© Copyright 2024, Cybus GmbH