# MQTT

Connectware provides native support for MQTT (Message Queuing Telemetry Transport) protocol connections, enabling integration with external MQTT brokers.

This guide covers the configuration of MQTT connection resources and endpoints for external broker integration. For information about topic mapping on the internal Connectware MQTT broker, see [Cybus::Mapping](https://docs.cybus.io/documentation/services/service-commissioning-files/resources/cybus-mapping).

## Configuration

To communicate with an MQTT broker, start by defining a connection resource with the protocol type `MQTT`. At minimum, specify the broker's hostname and port. For all available connection parameters, see [Connection Properties](https://docs.cybus.io/connectors/enterprise-connectors/mqtt/mqttconnection).

Next, create endpoint resources to send or receive data:

* **To receive data**: Define an endpoint with the `subscribe` property.
* **To send data**: Define an endpoint with the `write` property.

Each endpoint supports either `subscribe` or `write`, but not both. For all available endpoint parameters, see [Endpoint Properties](https://docs.cybus.io/connectors/enterprise-connectors/mqtt/mqttendpoint).

### Output Format on Write

The MQTT protocol does not output results to the `/res` endpoint.

### Output Format on Read

When you read data from MQTT via a subscription endpoint, messages arriving at the target MQTT server are passed through unmodified.

### Input Format on Write

When you write data to MQTT via a write endpoint, no special format is required. Data is published to the target MQTT server unmodified.

### Duplicates on Overlapping Subscriptions

When you use multiple overlapping topic subscriptions with wildcards, you may encounter duplicate messages.

For example, if you have one subscription (such as a [Cybus::Mapping](https://docs.cybus.io/documentation/services/service-commissioning-files/resources/cybus-mapping)) to topic `factory/machine1/temperature` and another to `factory/#`, each connected target (mapping or endpoint) will receive two messages instead of one.

This behavior is caused by the internal MQTT broker and cannot be avoided, as it is allowed by the MQTT 3.1 specification.

## Persistent Sessions and Message Buffering

When using persistent sessions by setting the MQTT connection property `clean` to `false`, the broker buffers messages for disconnected clients and delivers them immediately after reconnection. If Connectware endpoints are still being reconstructed at that time, messages may arrive before all subscriptions are ready, which can result in message loss.

To prevent this, Connectware provides a configurable message buffer for MQTT connections. Incoming messages are temporarily retained after the connection is established and released once endpoints are ready.

This is particularly useful when:

* Defining custom connections in [Cybus::Mapping](https://docs.cybus.io/documentation/services/service-commissioning-files/resources/cybus-mapping) resources to distribute load across the broker.
* Using MQTT connections with multiple QoS 1 or QoS 2 endpoints.

### Configuring Message Buffering

Configure message buffering behavior using the following properties in your MQTT connection. For the complete list of MQTT connection properties, see [MQTT Connection Properties](https://docs.cybus.io/connectors/enterprise-connectors/mqtt/mqttconnection).

* **`messageBufferWindowMs`** - Duration in milliseconds during which messages are buffered after the connection is established. The default is 10,000 ms. The window extends automatically while new endpoints are being created to avoid premature delivery. Depending on throughput and reconstruction time, this can be configured to a maximum of 30,000 ms.
* **`maxBufferedMessages`** - Maximum number of messages that can be retained in the buffer. The default is 20,000 messages. Once this limit has been reached, all currently buffered messages are delivered immediately to prevent excessive memory usage. The maximum is 300,000 messages.

{% hint style="warning" %}
In high throughput scenarios or after prolonged agent downtime, the broker may accumulate a large number of buffered messages. If you are using persistent sessions extensively, consider increasing the value of `maxBufferedMessages` and ensuring that there is sufficient memory allocated, or implement autoscaling to handle peak loads during reconnection.
{% endhint %}

## Service Commissioning File Example

The following example shows how to configure a basic MQTT connection and endpoint that subscribes to the topic `factory/line1/temperature`.

{% file src="<https://2355450750-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FGzXPesVecsUM1eHBfwea%2Fuploads%2Fgit-blob-238dfc43eabb57dca64360c2e7235d1784db7557%2Fmqtt-example.yml?alt=media>" %}

{% code title="mqtt-example.yml" lineNumbers="true" %}

```yaml
---
description: MQTT Connection Test

metadata:
  name: MQTT Connection Test

parameters:
  mqttHost:
    type: string
    description: IP address or hostname of the MQTT broker

  mqttUsername:
    type: string
    description: Username for MQTT authentication

  mqttPassword:
    type: string
    description: Password for MQTT authentication

resources:
  mqttConnection:
    type: Cybus::Connection
    properties:
      protocol: Mqtt
      connection:
        host: !ref mqttHost
        port: 8883
        scheme: mqtts
        username: !ref mqttUsername
        password: !ref mqttPassword
        trustAllCertificates: false # choose true to allow self-signed certificates

  temperatureEndpoint:
    type: Cybus::Endpoint
    properties:
      protocol: Mqtt
      connection: !ref mqttConnection
      subscribe:
        topic: factory/line1/temperature
```

{% endcode %}


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.cybus.io/connectors/enterprise-connectors/mqtt.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
