.. _user/services/structure/resources/mapping: ************** Cybus::Mapping ************** The *mapping* resource describes how data is mapped from one MQTT topic or an array of topics (`subscribe`_) to another MQTT topic (`publish`_) with the possibility of adding `rules`_ to perform powerful transformations of the data right inside the Connectware. Mappings by default happen between topics of the internal broker but are not limited to. Mappings may be established between different MQTT brokers. Properties ========== ================= ============ ============ ===================== Property Type Required Default ================= ============ ============ ===================== `mappings`_ ``object[]`` **Required** `targetState`_ ``enum`` Optional ``"enabled"`` `agentName`_ ``string`` Optional ``"protocol-mapper"`` `inputBuffering`_ ``object`` Optional ================= ============ ============ ===================== mappings -------- - is **required** - type: ``object[]`` - All items must be of the type: ``object`` with following properties: ============= ====== ================================= Property Type Required ============= ====== ================================= `subscribe`_ object Required `publish`_ object Required `rules`_ array Optional ============= ====== ================================= .. _user/services/structure/resources/mapping/subscribe: subscribe --------- The ``subscribe`` part of the `Mapping` can subscribe either to one single endpoint/topic, or to an array of endpoints or topics. When subscribing to an array of endpoints/topics, the :ref:`user/services/structure/resources/rules/collect` rule is particularly useful in the `rules` property of this mapping entry. - type: ``object`` or ``object[]`` =================================== ======= ========================================================================== Property Type Required =================================== ======= ========================================================================== :ref:`mapping/subscribe/endpoint` string One of :ref:`mapping/subscribe/endpoint` or :ref:`mapping/subscribe/topic` :ref:`mapping/subscribe/topic` string One of :ref:`mapping/subscribe/endpoint` or :ref:`mapping/subscribe/topic` :ref:`mapping/subscribe/qos` integer Optional :ref:`mapping/subscribe/connection` string Optional :ref:`mapping/subscribe/label` string Optional =================================== ======= ========================================================================== .. _mapping/subscribe/endpoint: endpoint ~~~~~~~~ Reference to an :ref:`user/services/structure/resources/endpoint` resource - type: ``string`` .. _mapping/subscribe/topic: topic ~~~~~ An explicit topic name. NOTE: This topic will not automatically be prefixed! - type: ``string`` .. _mapping/subscribe/qos: qos ~~~ The quality of service for mqtt messages. - type: ``integer`` - allowed values: ``0``, ``1``, ``2`` - default: ``0`` .. _mapping/subscribe/connection: connection ~~~~~~~~~~ Reference to an :ref:`user/services/structure/resources/connection` resource - type: ``string`` .. _mapping/subscribe/label: label ~~~~~ An optional label used as the key of the output object built when the :ref:`user/services/structure/resources/rules/collect` rule is used in combination with an array of subscriptions. If no label is provided, the topic is used instead. Labels can be dynamically constructed when ``_ are used in the topics. To construct dynamic labels the name of a wildcard enclosed in curly braces has to be used to specify that the final value of the label will be replaced with the value of the wildcard. When using dynamic labels the label most be enclosed in double quotes. Example: .. code-block:: yaml :linenos: testMapping: type: Cybus::Mapping properties: mappings: - subscribe: - topic: factory/+machineName/+sensorName label: "Machine: {machineName} Sensor: {sensorName}" publish: topic: 'factory/out' rules: - collect: {} In this case a message arriving at **/factory/Robot001/Temperature** will yield a label **Machine: Robot001 Sensor: Temperature** and a message arriving at **/factory/Robot001/Current** will yield a label of **Machine: Robot001 Sensor: Current** The wildcards used in the topic can be placed in any order in the label template as the matching to wildcards is done by name so another option for the previous example could be a label configured as **"Sensor: {sensorName} Machine: {machineName}"** which will yield a label **"Sensor: Temperature Machine: Robot001"** Dynamic labels make it easy to write Mappings that use the :ref:`user/services/structure/resources/rules/collect` rule in a very brief manner. - type: ``string`` publish ------- - type: ``object`` ================================= ======= ====================================================================== Property Type Required ================================= ======= ====================================================================== :ref:`mapping/publish/endpoint` string One of :ref:`mapping/publish/endpoint` or :ref:`mapping/publish/topic` :ref:`mapping/publish/topic` string One of :ref:`mapping/publish/endpoint` or :ref:`mapping/publish/topic` :ref:`mapping/publish/qos` integer Optional :ref:`mapping/publish/retain` boolean Optional :ref:`mapping/publish/connection` string Optional ================================= ======= ====================================================================== .. _mapping/publish/endpoint: endpoint ~~~~~~~~ Reference to an :ref:`user/services/structure/resources/endpoint` resource - type: ``string`` .. _mapping/publish/topic: topic ~~~~~ An explicit topic name. NOTE: This topic will not automatically be prefixed! - type: ``string`` .. _mapping/publish/qos: qos ~~~ The quality of service for mqtt messages. - type: ``integer`` - allowed values: ``0``, ``1``, ``2`` - default: ``0`` Note: This particular `qos` setting is applied for only for this particular subscribe (or publish) MQTT connection of this particular mapping, i.e. from the broker to the protocol-mapper instance of this mapping or the other way round. If your intention is to set up an end-to-end data connection with `qos` level 1 or 2, every single MQTT connection involved must have this specific `qos` level set! .. _mapping/publish/retain: retain ~~~~~~ Whether the last message should be retained (last-value-cached) on the internal or external broker (default: false). - type: ``boolean`` - default: ``false`` .. _mapping/publish/connection: connection ~~~~~~~~~~ Reference to an :ref:`user/services/structure/resources/connection` resource - type: ``string`` rules ------ - is optional - type: ``array`` of :ref:`user/services/structure/resources/rules` This modifies your payload/topic while before publishing it. targetState ----------- - is optional - type: ``enum`` - default: ``"enabled"`` - The value of this property **must** be equal to one of the below - ``enabled`` - ``disabled`` agentName ----------- - is optional - type: ``string`` - default: ``"protocol-mapper"`` If this property is set to any (non-default) value, it is used as the :ref:`Agent ` name on which this particular Mapping resource should run. This is useful for load sharing if Mapping instances with a lot of processing rules need to be distributed on a larger number of Agent instances. However, the specified agent name must match exactly the actual agent name, otherwise an error is thrown upon enabling of this resource. Also keep in mind that each Mapping resource subscribes to the central MQTT broker for the subscribe side, and publishes the results to the central MQTT broker with the results, so running this on a distributed agent causes an extra two MQTT message transmissions for each mapping. .. _user/services/structure/resources/mapping/inputBuffering: inputBuffering --------------- Each mapping's subscription receives input data. This input data at the subscription can optionally be managed through an individual input buffer (also called *input queue*) to establish fine-grained control for high data rate behaviour. By default, this input buffering is disabled and instead all input data is handled on the global event queue, which works fine as long as there is no risk of out-of-memory exceptions due to unexpected slow data processing or forwarding. When enabling the individual input buffer, the buffer properties determine the behaviour in situations when the input buffer is filling up. The buffer is filling up when the message arrival rate is larger than the processing data rate or the forwarding (publishing) data rate. Or, in other words, the input buffer is filling up if the messages arrive faster than how they can be processed or be forwarded (published). If this situation happens for longer time durations, the input buffer will reach its configured capacity limits and arriving messages will be dropped, so that the system will not run into an uncontrollable out-of-memory exception. This is a fundamental and unavoidable property of distributed systems due to its finite resources. But the actual behaviour of the input buffer can be adapted to the actual application scenario by setting the properties in the ``inputBuffering`` section (optional). Supported properties are (all optional): - ``enabled`` (type: `boolean`, default: ``false``) Enable or disable input buffering. - ``maxInputBufferSize`` (type: `integer`, default: ``5000``) Maximum number of input messages that are queued in the input buffer. Exceeding messages will be discarded. Adjust this to a higher value if you are handling bursty traffic. - ``maxConcurrentMessages`` (type: `integer`, default: ``2``) Maximum number of concurrently processed messages as long as the input buffer queue is non-empty. - ``waitingTimeOnEmptyQueue`` (type: `integer`, default: ``10``) Waiting time in milliseconds after the input buffer queue ran empty and before checking again for newly queued input messages. Regardless of this value, on non-empty input buffer queue all messages will be processed without waiting time in between until the queue is empty again. Mapping Concept =============== It is possible to map data from one topic to another, from endpoints to topics, from topics to endpoints or from endpoints to endpoints directly. A sample is shown in the following code snippet, where the topic ``test/topic/from`` is mapped to the other topic ``test/topic/to``, and data from the endpoint ``testEndpoint1`` is mapped to the topic ``test/endpoint/to/topic``. .. code-block:: yaml :linenos: testMapping: type: Cybus::Mapping properties: mappings: - subscribe: topic: test/topic/from publish: topic: test/topic/to - subscribe: endpoint: !ref testEndpoint1 publish: topic: 'test/endpoint/to/topic' It is also possible to map data from several topics and endpoints to a single topic. When an array of subscribe objects is provided in a mapping the data received in the subscribed topics is published to the target publish topic. .. code-block:: yaml :linenos: testMapping: type: Cybus::Mapping properties: mappings: - subscribe: - topic: test/topic/from label: test_topic_subs - endpoint: !ref testEndpoint1 label: test_endpoint_1 - endpoint: no/labels/are/ok/too publish: topic: 'test/endpoint/to/topic' - subscribe: endpoint: regular/subscribe publish: topic: output/topic Additionally if the ``collect`` rule is used all the values from the subscribed topics and endpoints are stored in a Last Value Cache and published to the target topic as an object using the labels, or the topics if no labels were provided, as keys for the object. .. code-block:: yaml :linenos: testMapping: type: Cybus::Mapping properties: mappings: - subscribe: - topic: test/topic/from label: test_topic_subs - endpoint: !ref testEndpoint1 label: test_endpoint_1 - endpoint: no/labels/are/ok/too publish: topic: 'test/endpoint/to/topic' rules: - collect: {} Wildcards ========= The *mapping* resource supports wildcards in both subscribe and publish mode. In most use-cases, it is desirable to have the ability to reference the actual topic value at the wildcard position either in the target topic or in the payload. Both is possible. To use this, use a named wildcard ``+`` and add a variable name after it. The content of the wildcard variable is then made available in the ``$context.vars`` object in the Transform, Filter and SetContextVars rules of a :ref:`user/services/structure/resources/rules`. The following code sample shows some combinations. This example will swap the last two topic components: .. code-block:: yaml :linenos: wildcardMapping: type: Cybus::Mapping properties: mappings: - subscribe: topic: source/+a/+b publish: topic: target/$b/$a This example will store some value from the message payload using the :ref:`user/services/structure/resources/rules/setcontextvars` rule, then use that value as the output topic. The expected input message must be a JSON object with at least the property `value`, e.g. ``{"value": "abc"}``, and it might contain further values that are forwarded unchanged in this example. .. code-block:: yaml :linenos: wildcardMapping: type: Cybus::Mapping properties: mappings: - subscribe: topic: source/+x rules: - setContextVars: vars: fromPayload: 'value' publish: topic: target/$x/$fromPayload You may even map entire sub-topic trees using the ``#`` wildcard, like shown here: .. code-block:: yaml :linenos: wildcardMapping: type: Cybus::Mapping properties: mappings: - subscribe: topic: in/#topic publish: topic: out/$topic See below some examples of how messages arriving on ``in`` are mapped to ``out``: - ``in/hall2`` -> ``out/hall2`` - ``in/hall2/machine1`` -> ``out/hall2/machine1`` .. note:: Since in MQTT the named '#' wildcard includes the parent level while matching, a message published directly to ``in`` would simply be mapped to ``out`` in this case. Examples ======== *simple mapping from endpoint to custom topic* .. code-block:: yaml :linenos: myMapping: type: Cybus::Mapping properties: mappings: - subscribe: endpoint: !ref myEndpoint publish: topic: !sub '${Cybus::MqttRoot}/topic-in-my-container' *relaying messages from internal broker to external one* .. code-block:: yaml :linenos: myMapping: type: Cybus::Mapping properties: mappings: - subscribe: topic: 'internal-scope/#topic' publish: topic: external-scope/$topic connection: !ref myMqttConnection