# Cybus::Mapping

The **Cybus::Mapping** resource describes how data is mapped from one MQTT topic or an array of topics ([subscribe](#subscribe)) to another MQTT topic ([publish](#publish)), with the possibility of adding [rules](#rules) to perform powerful transformations of the data directly within Connectware.

Mappings by default occur between topics of the internal broker, but are not limited to this. Mappings may also be established between different MQTT brokers.

## Mapping Properties

| Property                          | Type       | Required     | Default             |
| --------------------------------- | ---------- | ------------ | ------------------- |
| [mappings](#mappings)             | `object[]` | **Required** |                     |
| [targetState](#targetstate)       | `enum`     | Optional     | `"enabled"`         |
| [agentName](#agentname)           | `string`   | Optional     | `"protocol-mapper"` |
| [inputBuffering](#inputbuffering) | `object`   | Optional     |                     |

### mappings

* **Required**
* Type: `object[]`

All items must be of type `object` with the following properties:

| Property                | Type   | Required     |
| ----------------------- | ------ | ------------ |
| [subscribe](#subscribe) | object | **Required** |
| [publish](#publish)     | object | **Required** |
| [rules](#rules)         | array  | Optional     |

#### subscribe

The `subscribe` part of the `Cybus::Mapping` can subscribe either to a single endpoint or topic, or to an array of endpoints or topics. When subscribing to an array, the [collect](https://docs.cybus.io/2-0-6/documentation/rule-engine/data-processing-rules#collect) rule is particularly useful in the `rules` property of this mapping entry.

* **Required**
* Type: `object` or `object[]`

| Property   | Type    | Required                 |
| ---------- | ------- | ------------------------ |
| endpoint   | string  | One of endpoint or topic |
| topic      | string  | One of endpoint or topic |
| qos        | integer | Optional                 |
| connection | string  | Optional                 |
| label      | string  | Optional                 |

**endpoint**

Reference to a [Cybus::Endpoint](https://docs.cybus.io/2-0-6/documentation/services/service-commissioning-files/resources/cybus-endpoint) resource.

* Type: `string`

**topic**

An explicit topic name.

{% hint style="warning" %}
This topic will not automatically be prefixed.
{% endhint %}

* Type: `string`

**qos**

The quality of service for MQTT messages.

* Type: `integer`
* Allowed values:
  * `0`
  * `1`
  * `2`
* Default: `0`

**connection**

Reference to a [Cybus::Connection](https://docs.cybus.io/2-0-6/documentation/services/service-commissioning-files/resources/cybus-connection) resource with protocol `Mqtt` only. When set, the mapping subscribes via that MQTT connection instead of the internal broker.

* Type: `string`

{% hint style="info" %}
Only MQTT protocol connections are supported here. Using any other protocol will fail.
{% endhint %}

**Example**

{% code lineNumbers="true" %}

```yaml
resources:
  externalMqtt:
    type: Cybus::Connection
    properties:
      protocol: Mqtt
      connection:
        host: broker.example.org
        port: 1883

  mapFromExternal:
    type: Cybus::Mapping
    properties:
      mappings:
        - subscribe:
            topic: 'shopfloor/in/#topic'
            connection: !ref externalMqtt
          publish:
            topic: 'internal/out/$topic' # published on internal broker
```

{% endcode %}

**label**

An optional label used as the key of the output object built when the [collect](https://docs.cybus.io/2-0-6/documentation/rule-engine/data-processing-rules#collect) rule is used in combination with an array of subscriptions. If no label is provided, the topic is used instead.

Labels can be dynamically constructed when [wildcards](#wildcards) are used in the topics. To construct dynamic labels, the name of a wildcard enclosed in curly braces must be used to specify that the final value of the label will be replaced with the value of the wildcard.

When using dynamic labels, the label **must** be enclosed in double quotes.

Example:

{% code lineNumbers="true" %}

```yaml
testMapping:
  type: Cybus::Mapping
  properties:
    mappings:
      - subscribe:
          - topic: factory/+machineName/+sensorName
            label: 'Machine: {machineName} Sensor: {sensorName}'
        publish:
          topic: 'factory/out'
        rules:
          - collect: {}
```

{% endcode %}

In this case, a message arriving at **/factory/Robot001/Temperature** will yield a label **Machine: Robot001 Sensor: Temperature**, and a message arriving at **/factory/Robot001/Current** will yield a label **Machine: Robot001 Sensor: Current**.

The wildcards used in the topic can be placed in any order in the label template, as the matching to wildcards is done by name. For example, another option for the previous example could be a label configured as **“Sensor: {sensorName} Machine: {machineName}”**, which will yield a label **“Sensor: Temperature Machine: Robot001”**.

Dynamic labels make it easy to write mappings that use the [collect](https://docs.cybus.io/2-0-6/documentation/rule-engine/data-processing-rules#collect) rule in a very concise manner.

* Type: `string`

#### publish

* **Required**
* Type: `object`

| Property   | Type    | Required                 |
| ---------- | ------- | ------------------------ |
| endpoint   | string  | One of endpoint or topic |
| topic      | string  | One of endpoint or topic |
| qos        | integer | Optional                 |
| retain     | boolean | Optional                 |
| connection | string  | Optional                 |

**endpoint**

Reference to a [Cybus::Endpoint](https://docs.cybus.io/2-0-6/documentation/services/service-commissioning-files/resources/cybus-endpoint) resource.

* Type: `string`

**topic**

An explicit topic name.

{% hint style="warning" %}
This topic will not automatically be prefixed.
{% endhint %}

* Type: `string`

**qos**

The quality of service for MQTT messages.

* Type: `integer`
* Allowed values:
  * `0`
  * `1`
  * `2`
* Default: `0`

Note: This particular `qos` setting is applied **only for this particular** subscribe (or publish) MQTT connection of this particular mapping, i.e., from the broker to the protocol-mapper instance of this mapping or the other way around. If your intention is to set up an end-to-end data connection with `qos` level 1 or 2, every single MQTT connection involved must have this specific `qos` level set.

**retain**

Whether the last message should be retained (last-value-cached) on the internal or external broker (default: false).

* Type: `boolean`
* Default: `false`

**connection**

Reference to a [Cybus::Connection](https://docs.cybus.io/2-0-6/documentation/services/service-commissioning-files/resources/cybus-connection) resource with protocol `Mqtt` only. When set, the mapping publishes via that MQTT connection instead of the internal broker.

* Type: `string`

{% hint style="info" %}
Only MQTT protocol connections are supported here. Using any other protocol will fail.
{% endhint %}

**Example**

{% code lineNumbers="true" %}

```yaml
resources:
  externalMqttOut:
    type: Cybus::Connection
    properties:
      protocol: Mqtt
      connection:
        host: mqtt.acme.cloud
        port: 1883
        # username: myUser
        # password: myPass

  mapToExternal:
    type: Cybus::Mapping
    properties:
      mappings:
        - subscribe:
            topic: 'internal/in/#topic' # read from internal broker
          publish:
            topic: 'cloud/$topic'
            connection: !ref externalMqttOut
            qos: 1
            retain: false
```

{% endcode %}

#### rules

* Optional
* Type: `array` of [Rule Engine](https://docs.cybus.io/2-0-6/documentation/services/rule-engine)

This modifies your payload/topic before publishing it.

### targetState

* **Optional**
* Type: `enum`
* Default: `"enabled"`
* The value of this property **must** be one of:
  * `enabled`
  * `disabled`

### agentName

* **Optional**
* Type: `string`
* Default: `"protocol-mapper"`

If this property is set to any (non-default) value, it is used as the agent name on which this particular Mapping resource should run. This is useful for load sharing if Mapping instances with a lot of processing rules need to be distributed across a larger number of Agent instances. However, the specified agent name must match exactly the actual agent name; otherwise, an error is thrown upon enabling this resource.

Also, keep in mind that each Mapping resource subscribes to the central MQTT broker for the subscribe side and publishes the results to the central MQTT broker. Running this on a distributed agent causes an extra two MQTT message transmissions for each mapping.

### inputBuffering

Each mapping’s subscription receives input data. This input data at the subscription can optionally be managed through an individual input buffer (also called *input queue*) to establish fine-grained control for high data rate behavior. By default, this input buffering is disabled, and instead, all input data is handled on the global event queue, which works fine as long as there is no risk of out-of-memory exceptions due to unexpected slow data processing or forwarding.

When enabling the individual input buffer, the buffer properties determine the behavior in situations when the input buffer is filling up. The buffer is filling up when the message arrival rate is higher than the processing or forwarding (publishing) data rate. If this situation happens for longer durations, the input buffer will reach its configured capacity limits, and arriving messages will be dropped to prevent an uncontrollable out-of-memory exception. This is a fundamental and unavoidable property of distributed systems due to finite resources. However, the actual behavior of the input buffer can be adapted to the application scenario by setting the properties in the `inputBuffering` section (optional).

Supported properties are (all optional):

* `enabled` (type: boolean, default: `false`): Enable or disable input buffering.
* `maxInputBufferSize` (type: integer, default: `5000`): Maximum number of input messages that are queued in the input buffer. Exceeding messages will be discarded. Adjust this to a higher value if you are handling bursty traffic.
* `maxConcurrentMessages` (type: integer, default: `2`): Maximum number of concurrently processed messages as long as the input buffer queue is non-empty.
* `waitingTimeOnEmptyQueue` (type: integer, default: `10`): Waiting time in milliseconds after the input buffer queue runs empty and before checking again for newly queued input messages. Regardless of this value, on a non-empty input buffer queue, all messages will be processed without waiting time in between until the queue is empty again.

## Mapping Concept

It is possible to map data from one topic to another, from endpoints to topics, from topics to endpoints, or from endpoints to endpoints directly.

An example is shown in the following code snippet, where the topic `test/topic/from` is mapped to the other topic `test/topic/to`, and data from the endpoint `testEndpoint1` is mapped to the topic `test/endpoint/to/topic`.

{% code lineNumbers="true" %}

```yaml
testMapping:
  type: Cybus::Mapping
  properties:
    mappings:
      - subscribe:
          topic: test/topic/from
        publish:
          topic: test/topic/to
      - subscribe:
          endpoint: !ref testEndpoint1
        publish:
          topic: 'test/endpoint/to/topic'
```

{% endcode %}

It is also possible to map data from several topics and endpoints to a single topic.

When an array of subscribe objects is provided in a mapping, the data received in the subscribed topics is published to the target publish topic.

{% code lineNumbers="true" %}

```yaml
testMapping:
  type: Cybus::Mapping
  properties:
    mappings:
      - subscribe:
          - topic: test/topic/from
            label: test_topic_subs
          - endpoint: !ref testEndpoint1
            label: test_endpoint_1
          - endpoint: no/labels/are/ok/too
        publish:
          topic: 'test/endpoint/to/topic'
      - subscribe:
          endpoint: regular/subscribe
        publish:
          topic: output/topic
```

{% endcode %}

Additionally, if the `collect` rule is used, all the values from the subscribed topics and endpoints are stored in a Last Value Cache and published to the target topic as an object using the labels, or the topics if no labels were provided, as keys for the object.

{% code lineNumbers="true" %}

```yaml
testMapping:
  type: Cybus::Mapping
  properties:
    mappings:
      - subscribe:
          - topic: test/topic/from
            label: test_topic_subs
          - endpoint: !ref testEndpoint1
            label: test_endpoint_1
          - endpoint: no/labels/are/ok/too
        publish:
          topic: 'test/endpoint/to/topic'
        rules:
          - collect: {}
```

{% endcode %}

## Wildcards

The `Cybus:Mapping` resource supports wildcards in both subscribe and publish mode. In most use cases, it is desirable to have the ability to reference the actual topic value at the wildcard position either in the target topic or in the payload. Both are possible. To use this, use a named wildcard `+` and add a variable name after it. The content of the wildcard variable is then made available in the `$context.vars` object in the Transform, Filter, and SetContextVars rules of a [Rule Engine](https://docs.cybus.io/2-0-6/documentation/services/rule-engine). The following code example shows some combinations.

This example will swap the last two topic components:

{% code lineNumbers="true" %}

```yaml
wildcardMapping:
  type: Cybus::Mapping
  properties:
    mappings:
      - subscribe:
          topic: source/+a/+b
        publish:
          topic: target/$b/$a
```

{% endcode %}

This example will store some value from the message payload using the [setContextVars](https://docs.cybus.io/2-0-6/documentation/rule-engine/data-processing-rules#setcontextvars) rule, then use that value as the output topic. The expected input message must be a JSON object with at least the property `value`, e.g., `{"value": "abc"}`, and it might contain further values that are forwarded unchanged in this example.

{% code lineNumbers="true" %}

```yaml
wildcardMapping:
  type: Cybus::Mapping
  properties:
    mappings:
      - subscribe:
          topic: source/+x
        rules:
          - setContextVars:
              vars:
                fromPayload: 'value'
        publish:
          topic: target/$x/$fromPayload
```

{% endcode %}

You may even map entire sub-topic trees using the `#` wildcard, as shown here:

{% code lineNumbers="true" %}

```yaml
wildcardMapping:
  type: Cybus::Mapping
  properties:
    mappings:
      - subscribe:
          topic: in/#topic
        publish:
          topic: out/$topic
```

{% endcode %}

See below some examples of how messages arriving on `in` are mapped to `out`:

* `in/hall2` -> `out/hall2`
* `in/hall2/machine1` -> `out/hall2/machine1`

{% hint style="info" %}
Since in MQTT the named `#` wildcard includes the parent level while matching, a message published directly to `in` would be mapped to `out` in this case.
{% endhint %}

### Duplicated Messages Due to Wildcards

Duplicated messages are typically observed either in the [Data Explorer](https://docs.cybus.io/2-0-6/documentation/monitoring/data-explorer) or in target applications that receive the same message twice. This arises in configurations where mappings use wildcards (`#` or `+`). If you encounter duplicated messages, check for the use of wildcards first.

For example, consider the following mapping:

{% code lineNumbers="true" %}

```yaml
mapping:
  type: Cybus::Mapping
  properties:
    mappings:
      - subscribe:
          - topic: test/from/topic1
          - topic: test/from/#
        publish:
          topic: test/to
```

{% endcode %}

The overlapping subscriptions do not need to be defined within the same mapping. Whenever an MQTT client creates overlapping subscriptions, duplication will occur. Since the protocol-mapper uses the same MQTT client by default for all resources across services, overlapping can also occur across multiple services. This means that using wildcards can be risky, as future subscriptions may unintentionally overlap.

The MQTT broker implementation used in Connectware (VerneMQ) delivers messages from overlapping subscriptions twice to each subscription. Since the MQTT Specification (3.1.1) does not define the exact behavior for this scenario, it is not necessarily considered a bug. Other brokers, such as Mosquitto, behave differently and do not deliver duplicated messages in the same situation.

* To prevent duplicates, create a dedicated MQTT connection for wildcard subscriptions as shown below:

{% code lineNumbers="true" %}

```yaml
resources:
  mqttConnection:
    type: Cybus::Connection
    properties:
      protocol: Mqtt
      connection:
        host: !ref Cybus::MqttHost
        port: !ref Cybus::MqttPort
        username: !ref Cybus::MqttUser
        password: !ref Cybus::MqttPassword

  mapping:
    type: Cybus::Mapping
    properties:
      mappings:
        - subscribe:
            - topic: services/+serviceId/connections/+connection
              connection: !ref mqttConnection
          publish:
            topic: !sub '${Cybus::ServiceId}/connections/state'

  defaultRole:
    type: Cybus::Role
    properties:
      permissions:
        - resource: services/#
          operation: read
          context: mqtt
```

{% endcode %}

All other mappings can then be used as usual without explicitly defining a connection and will not be affected by message duplication.

{% hint style="info" %}
You may need to extend the default user permissions (`defaultRole`) with the topics that should be subscribed to. By default, access is restricted to the service’s MQTT root.
{% endhint %}

## Examples

Simple mapping from endpoint to custom topic:

{% code lineNumbers="true" %}

```yaml
myMapping:
  type: Cybus::Mapping
  properties:
    mappings:
      - subscribe:
          endpoint: !ref myEndpoint
        publish:
          topic: !sub '${Cybus::MqttRoot}/topic-in-my-container'
```

{% endcode %}

Relaying messages from internal broker to external one:

{% code lineNumbers="true" %}

```yaml
myMapping:
  type: Cybus::Mapping
  properties:
    mappings:
      - subscribe:
          topic: 'internal-scope/#topic'
        publish:
          topic: external-scope/$topic
          connection: !ref myMqttConnection
```

{% endcode %}
