Data Processing Rules
Last updated
Last updated
© Copyright 2024, Cybus GmbH
For data transformation, manipulation, and processing with the Rule Engine, various data processing rules are available. You can define these rules within the resources section of your service commissioning files.
For more information on the resources, see Resources. You can apply rules in the endpoint resources, mapping resources, and node resources.
The following data processing rules are available:
burst
This burst-mode mechanism consolidates multiple messages into a single payload array. It functions based on either a time interval or on a fixed number of messages. The system accumulates incoming messages and combines them into an array that includes their respective time stamps.
Once the specified condition is met (time elapsed or message count reached), it transmits the entire collection as a single message. You can set both interval and size limits for more precise control. If neither condition is specified, the rule becomes inactive and messages pass through unaffected.
Property | Type | Required |
---|---|---|
This parameter defines the time window, measured in milliseconds, during which incoming messages are collected before being sent as a combined output. The countdown starts when the first message in the current batch is received.
This feature is designed to aggregate existing messages, not to generate output at regular intervals regardless of input. If no messages are received during the specified interval, no output will be generated. It does not create placeholder messages in the absence of input data.
This parameter sets the upper limit on the number of messages that can be combined into a single burst. Once this threshold is reached, the collected messages are packaged and forwarded as one combined message.
If both maxSize
and interval
are defined, the system will trigger the forwarding of the aggregated message based on whichever condition is met first. As a result, messages sent due to the interval
condition may contain fewer elements than those sent due to reaching maxSize
. This dual-criteria approach allows for flexible message aggregation based on either time elapsed or quantity of messages received.
This parameter allows you to assign a unique identifier to the current burst aggregation in the stash. When using multiple aggregations, you must assign distinct labels for each burst aggregation. If not specified, the default label is burst
.
Example: burst.yml
This example demonstrates a service configuration that aggregates messages from the MQTT topic factory/machine/temperature/single
.
The system collects incoming messages in a buffer, which is then published as an array to factory/machine/temperature/combined
when either the number of collected messages reaches the predefined maxSize
or the time since the last buffer publication equals or exceeds the set interval
.
This approach enables efficient batching of individual temperature readings into combined data sets, which can be beneficial for reducing network traffic or processing data in larger chunks.
collect
This rule is designed to aggregate data from multiple endpoints specified in the subscribe
property of a Cybus::Mapping
. It functions as a Last Value Cache, creating a single object that contains the most recent data from each subscribed endpoint or topic. Each data point in the output object is identified by a key, which can be the endpoint itself or a custom label if one has been defined.
This rule initializes with an empty output object and progressively adds keys as data arrives from the various subscribed sources. It doesn't require any additional properties, but due to YAML formatting requirements, it must be written with an empty object expression {}
.
Example: collect.yml
This example demonstrates how you can use the collect
rule to gather data from all MQTT topics matching the pattern factory/+machine/+sensor
. The collected data is then published to factory/data
using dynamically constructed labels in the format machine.sensor
.
This approach allows for flexible data aggregation and republishing, adapting to the specific structure of the incoming topic names.
cov
The change-of-value (COV) filter only transmits data when it identifies a difference from the preceding message. This filter treats all messages in a rule chain and output topic as if they originate from a single source, without differentiating between input topics. It can be configured to monitor a single specified key, which helps prevent interference from variables like timestamps.
The COV filter implements deadbands to reduce noise and insignificant fluctuations in the data. It requires input in JSON format and typically produces fewer output messages compared to the number of inputs it processes, as it only forwards when changes are detected. This approach helps streamline data flow by eliminating redundant or insignificant updates, focusing on meaningful changes in the monitored values.
The name of the JSON property to filter. Currently, this filter only supports a single key, either as top-level property of the JSON object (when given as a string) or as a property inside some nested object in the JSON object (when given as an array of string, which specify the property path). You can chain multiple cov rule definitions to look at multiple keys.
Example: cov-on-change.yml
This example demonstrates how to apply a filter to messages in the format { "temperature": 12.34 }
. The filter reacts to changes in the value of the temperature
property. It will only pass on messages where the temperature reading is different from the previous one to highlight changes in temperature data.
Example: cov-on-change-deep-key.yml
You can use a deep path to access nested properties within a JSON object for change-of-value (COV) filtering. The example demonstrates how to filter based on changes in the temperature property nested within an array. The filter can process messages with complex structures, such as { "values": [ { "temperature": 85.3 } ] }
, by specifying a list of keys that form a path to the desired property.
When using this approach, the filter will only accept and forward messages where the entire path can be resolved. If any part of the path is missing in a message, that message is silently ignored without throwing an error.
You can specify array indices in the path in plain integer values (e.g., 0
) or in quoted string versions (e.g., '0'
).
When dealing with numerical data, you can define a deadband threshold. A change is only detected when the new value surpasses this threshold compared to the previous value. If you do not specify this parameter, the filter will detect any change.
This parameter allows you to set the mode of the deadband to relative
or absolute
:
In relative
mode, the deadband is interpreted as a percentage. For example, if set to 10
, a new value will only be passed through if it differs by more than 10% from the last recorded value.
In absolute
mode (which is the default setting), the deadband represents a fixed numeric difference.
Example: cov-on-percentage-change.yml
This example demonstrates the use of a deadband in a change-of-value (COV) rule. In this case, the rule is configured to detect and respond to temperature changes that exceed 10% of the previous value. This approach helps filter out small fluctuations while capturing significant temperature changes.
The change-of-value (COV) filter uses a stash system to compare current and previous values. By default, it compares the incoming value to the last value that passed through this filter. However, you can change this behavior by specifying a label:
Custom label: You can define your own label in a separate stash rule.
Built-in labels:
input
: The initial value at the start of the rule chain.
lastOutput
: The final value that passed through the entire rule chain.
lastInput
: The last input value.
When using custom labels, you must explicitly store them using a stash rule. Without proper storage, the filter will allow all values to pass because it will have no reference point to compare against.
filter
The filter rule determines whether a message is forwarded or discarded based on an JSONATE expression. It does not modify the message content but disrupts the message flow if the expression evaluates to a false value, such as false
, 0
, or an empty string ''
.
The expression must be a JSONata expression that evaluates to a boolean, and it can utilize the full range of JSONata functions. For more details, see expression.
Example: filter.yml
The service installed using the sample service commissioning file publishes a message to the overheat warning topic /factory/warning/overheat
if the machine temperature exceeds 95 degrees. Any temperature readings below this threshold will be filtered out and not forwarded.
parse
This rule converts non-JSON input data into a JSON object based on the specified format. The resulting JSON object can then be passed on to other rule operations.
is required
type: enum
The value of this property must be equal to one of the following:
Big-endian (BE), also known as Motorola or network byte order, stores the most significant byte at the smallest memory address. Little-endian (LE), also referred to as Intel byte order, stores the least significant byte first. For more information on byte order, see https://en.wikipedia.org/wiki/Endianness.
When parsing boolean values, only the first byte of the input message is used. For multi-byte integers (e.g., int16 or int32), first apply the appropriate integer parse rule, then convert to boolean.
For more details on parsing input bytes into JSON, see the Node.js documentation.
Example: parse.yml
The service installed with the sample service commissioning file collects the machine's power status from the MQTT topic factory/machine/power
and publishes it to the topic factory/machine/state
. The input data is interpreted as a boolean value.
setContextVars
This rule stores values as context variables that can be passed from one rule step to another. These variables can be used by target protocols such as MQTT and HTTP in output topics or properties, allowing dynamic implementation of MQTT topics or HTTP headers. This is also useful when working with named wildcards.
To configure the rule, you must specify a vars
property that includes each identifier and its corresponding JSONata expression. The same context variables used in the transform and filter rules are available for use here as well.
This object contains key: expression
properties.
key
: An identifier for storing a context variable.
expression
: A JSONata expression whose evaluated result is assigned to the context variable key
.
Default value: false
Determines the system's response when a context variable from the vars object cannot be set using input message values:
If set to true
: Missing context variables are set to the string undefined
, allowing Rule Engine and output publishing to proceed normally
If set to false
: The input message is discarded and an error message is logged. For example: Failed processing setContextVars, because: setContextVars rule should set value "contextKey" from property "someKey" but it is missing in the message: { "wrongKey": "123" }.
Example: setcontextvars.yml
This example demonstrates how to create a concatenated string that is stored in the context variable identifier
using the productionLine
and machineId
wildcards from the source subscription. You can then use the resulting identifier
variable as a wildcard name in the target publish topic.
stash
This rule allows you to store intermediate message states for future use. These stored states can be accessed by filter or transform rules later in the processing chain. This functionality is particularly useful when combining different rules, such as change-of-value (CoV) and burst rules. By stashing intermediate results, you can create more complex data processing workflows, allowing rules to reference or operate on previously processed data at different stages of the message handling process.
This parameter assigns a unique identifier to a specific stash point within a single rules statement. The same label can be reused across different rules statements without conflict. If not specified, the default label is stash
.
Example: stash.yml
The stash
rule in this example is designed to work in conjunction with other rules, as its primary function is to store intermediate states for later use. It cannot operate independently.
This service configuration demonstrates the practical application of the stash rule. It monitors the MQTT topic factory/machine/raw/temperature
for current temperature readings. The system then enhances this data by combining it with the previous measurement, which has been stored using the stash rule. The resulting enhanced data, containing both the current and previous temperature values, is then published to the ´factory/machine/enhanced/temperature` topic.
This approach allows for more context-rich temperature reporting by providing a comparison point with each new reading.
transform
This rule converts payloads into a different format or structure using JSONata expressions (http://jsonata.org/). Each input message generates one output message.
The JSONata expression that you can apply to the given payload. See http://docs.jsonata.org for a full language reference.
Expression examples:
$
- Represents the payload in its original format, regardless of type.
$.value
or simply value
- Accesses the value property of the input JSON object.
($.temp - 32) * 5 / 9
- Converts the temp property from Fahrenheit to Celsius.
$number($) * 0.1
- Interprets a non-JSON input value as a number and multiplies it by 0.1.
Example: value-transform.yml
The service installed with the sample service commissioning file collects a machine temperature given in Fahrenheit and published on the MQTT topic machine/temperature/fahrenheit
. It will then convert it to degrees Celsius and publish it to the machine/temperature/celsius
topic.
In addition to the payload object and the full range of JSONata functions and data types, the Connectware Rule Engine also provides functions and values. These include protocol-specific context information, source and target mapping data, and access to wildcard field values. This enhances the flexibility of data transformations and processing within the system.
$context
This function contains protocol-specific context information. For example, in MQTT, it includes the raw MQTT packet, QoS, Retain information, and the message topic. For HTTP, it provides the full response object, including status codes and headers.
$source
: Contains all entries from the source part of the mapping.
$target
: Contains all entries from the target part of the mapping.
$context.vars
: When using wildcard topics, this provides access to the values at wildcard positions, such as machine ID or data fields in dynamic topics. For example, if you define a topic like factory/productionline1/+machineId/#data
, you can access the specific values at the wildcard positions, such as $context.vars.machineId
and $context.vars.data
.
Example: context-transform.yml
This example demonstrates the practical application of the $context
variable in a data transformation scenario. It shows how context-specific information can be integrated into the rule to modify or enhance the data being processed.
$last([label])
This function references the last message sent in the rule stream, allowing you to aggregate successive messages. The optional label
can point to other stash points and defaults to the last message processed in the rule chain if not specified.
When using $last()
with object properties, note that in JSONata the dot operator *.*
functions as a map operator and not just as an object operator. This can cause unexpected behavior if the input message is an array rather than a single value. For more information, see JSONata reference: path operators.
If you do not want the $last
expression to apply to each input value, using the $lookup
function may be more suitable than using the dot operator. For example, $lookup($last(), "val")
retrieves the val
property from the last stored object. For more information, see JSONata reference: object functions.
Example
The previous stored value was { time: 1632908050235 }
and you apply the following transformation rule:
If the input value is an array with value [1, 2, 3]
, the result will unexpectedly contain an array value:
To avoid this, use $lookup
to correctly reference the .time
property:
This results in the expected output:
$encodeBitmask(array)
This function converts an array of boolean values into a bitmask and returns the result as a number.
Note: The first value in the array is the rightmost bit.
$decodeBitmask(number, array)
This function accepts a number and an array of strings as input. It returns an object where the keys are taken from the array, and the corresponding values are booleans derived by applying the bitmask of the key's position to the number.
$bitmask(number, index)
This function returns a boolean value depending on whether the bit at position <index> in <number> is set.
Property | Type | Required |
---|---|---|
Property | Type | Required |
---|---|---|
Property | Type | Required |
---|---|---|
Value | Description |
---|---|
Property | Type | Required |
---|---|---|
Property | Type | Required |
---|---|---|
Property | Type | Required |
---|---|---|
integer
Optional
integer
Optional
string
Optional
string
or array of string
Required
number
Optional
enum
Optional
string
Optional
string
Required
enum
Required
json
Interprets the input bytes as a stringified JSON object which is parsed back into the JSON object
utf8
Interprets the input bytes as a utf-8 encoded string and parses this into a string
latin1
Interprets the input bytes as a latin-1 encoded string (ISO-8859-1) and parses this into a string
ascii
Interprets the input bytes as an ascii encoded string (for 7-bit ascii data only) and parses this into a string. Additionally, supports parsing integer primitives directly.
boolean
Interprets the first byte of the input message as a boolean value. If this first byte is non-zero, true is passed on, otherwise false.
int8
Interprets the input bytes as a signed 8-bit integer, using exactly 1 byte.
int16BE
Interprets the input bytes as a signed, big-endian 16-bit integer, using exactly 2 bytes.
int16LE
Interprets the input bytes as a signed, little-endian 16-bit integer, using exactly 2 bytes.
int32BE
Interprets the input bytes as a signed, big-endian 32-bit integer, using exactly 4 bytes.
int32LE
Interprets the input bytes as a signed, little-endian 32-bit integer, using exactly 4 bytes.
uint8
Interprets the input bytes as an unsigned 8-bit integer, using exactly 1 byte.
uint16BE
Interprets the input bytes as an unsigned, big-endian 16-bit integer, using exactly 2 bytes.
uint16LE
Interprets the input bytes as an unsigned, little-endian 16-bit integer, using exactly 2 bytes.
uint32BE
Interprets the input bytes as an unsigned, big-endian 32-bit integer, using exactly 4 bytes.
uint32LE
Interprets the input bytes as an unsigned, little-endian 32-bit integer, using exactly 4 bytes.
floatBE
Interprets the input bytes as a big-endian IEEE 754 float, using exactly 4 bytes.
floatLE
Interprets the input bytes as a little-endian IEEE 754 float, using exactly 4 bytes.
doubleBE
Interprets the input bytes as a big-endian IEEE 754 double, using exactly 8 bytes.
doubleLE
Interprets the input bytes as a little-endian IEEE 754 double, using exactly 8 bytes.
toBase64
Also base64
. Parses a plain text input string into a base64 encoded output string
fromBase64
Parses a base64 encoded input string into a plain text output string
Object
Required
boolean
Optional, default: false
string
Optional
string
Required