# Cybus::Endpoint

A **Cybus::Endpoint** resource represents the address of a single data endpoint in the context of a specific protocol. Each endpoint is always mapped to exactly one internal MQTT topic.

The MQTT topic can either be:

* Explicitly defined using the [topic](#topic) property, or
* Automatically generated (see note at [topic](#topic)).

In most services, endpoint topics are further mapped via a [Cybus::Mapping](https://docs.cybus.io/2-0-6/documentation/services/service-commissioning-files/resources/cybus-mapping) resource to application-specific topics, which can then be consumed by dashboards, applications, or other services.

The actual protocol-specific data address is configured under the [subscribe, read, or write](#subscribe-read-write) properties. These properties are one level deeper in the YAML structure (beneath endpoint `subscribe`/`read`/`write`) and must not be confused with properties defined directly on the endpoint resource itself.

## Operation Results

Endpoints of type `read` or `write` generate an additional topic with a `/res` suffix (“result”) where the results of the operation are sent, loosely following the JSON-RPC 2.0 Specification.

* A `read` endpoint named `myEndpoint` will listen to requests on the MQTT topic `myEndpoint/req`, and publish the result as a message to the MQTT topic `myEndpoint/res`.
* A `write` endpoint named `myEndpoint` similarly will listen to requests on the MQTT topic `myEndpoint/set`, and publish the result as a message to the MQTT topic `myEndpoint/res`.

The data message on the result topic will have the following format in the successful case:

{% code lineNumbers="true" %}

```yaml
{ 'id': 29194, 'timestamp': 1629351968526, 'result': { 'value': 0 } }
```

{% endcode %}

{% hint style="info" %}
The structure and content of the `result` field will vary based on the protocol being used and the type of endpoint configured.
{% endhint %}

In this result message, `id` is the request identifier that was sent in the original request, `timestamp` is the Unix timestamp of when the result was received, and `result` is the JSON object with the actual result. Its content depends on the specific protocol implementation.

If there was an error, the resulting JSON object does not contain a property `result` but instead a property `error`. The content of the `error` property depends on the specific protocol implementation as well. Often, it could be of string `type` and contain an explanatory message of the error. Hence, in the error case, the data message on the result topic will have the following format:

{% code lineNumbers="true" %}

```yaml
{ 'id': 29194, 'timestamp': 1629351968526, 'error': 'Wrong input values' }
```

{% endcode %}

HTTP and OPC UA error formats can be enabled by setting `publishError: true`. This flag should be set to `true` when the endpoint is used within a [FlowSync](https://docs.cybus.io/2-0-6/documentation/services/flowsync) flow.

```json
{
  "id": 29194,
  "timestamp": 1629351968526,
  "error": {
    "code": "<protocol dependent error code>",
    "message": "Wrong input values"
  }
}
```

## Polling Interval and Subscribe

When an endpoint should subscribe to some data point on the device, it should be defined with the `subscribe` operation. Some protocols support such a subscription directly (e.g., OPC UA), whereas others only support regular `polling` the data point from the Connectware side. Depending on the available choices, the actual behavior can be chosen by the properties in the [subscribe / read / write](#subscribe-read-write) section.

If the endpoint is set to `polling`, there is the choice between specifying an `interval` or a `time schedule expression` for polling from the Connectware side.

* An `interval` specifies the waiting time between subsequent polls. There is no guarantee on the exact time interval, only that on average the time interval should be matched. For example, if the protocol needed longer for one interval, the next one will be chosen shorter. Typical numbers for specified time intervals of 1000 milliseconds are actual intervals in the range of 950 to 1050 milliseconds, but this also strongly depends on the protocol and device behavior.
* A `time schedule expression` is specified in the `cronExpression` syntax, see <https://github.com/node-cron/node-cron>, for example `"0 * * * *"` for “every hour at the 00 minute, such as 00:00h, 01:00h, 02:00h, and so on. In this case, there is no guarantee on the exact time when data is received, but one polling will be triggered for each time expression match. So it can be relied on receiving 24 polling results per day if “once per hour” has been specified in the cronExpression.

For any subscribe endpoint in the protocols where polling is available, you can either specify an `interval`, or a `cronExpression` (which takes precedence over the `interval` property), or neither, in which case `interval` will be used with its default value.

## Endpoint Properties

| Property                           | Type       | Required     |
| ---------------------------------- | ---------- | ------------ |
| [protocol](#protocol)              | `enum`     | **Required** |
| [connection](#connection)          | `string`   | **Required** |
| [subscribe](#subscribe-read-write) | `object`   | Optional\*   |
| [read](#subscribe-read-write)      | `object`   | Optional\*   |
| [write](#subscribe-read-write)     | `object`   | Optional\*   |
| [rules](#rules)                    | `object[]` | Optional     |
| [qos](#qos)                        | `integer`  | Optional     |
| [retain](#retain)                  | `boolean`  | Optional     |
| [targetState](#targetstate)        | `enum`     | Optional     |
| [topic](#topic)                    | `string`   | Optional     |
| [agentName](#agentname)            | `string`   | Obsolete     |
| [buffering](#buffering)            | `object`   | Optional     |
| [inputBuffering](#inputbuffering)  | `object`   | Optional     |
| [publishError](#publisherror)      | `boolean`  | Optional     |

\* One out of [subscribe, read, and write](#subscribe-read-write) is **required**.

### protocol

Identifies the protocol for which a connection should be established.

* **Required**
* Type: `enum`

The value of this property **must** be equal to one of the following:

* `Ads`
* `Bacnet`
* `EthernetIp`
* `Focas`
* `Hbmdaq`
* `Heidenhain`
* `Http`
* `Influxdb`
* `Kafka`
* `Modbus`
* `Mqtt`
* `Mssql`
* `Opcda`
* `Opcua`
* `S7`
* `Shdr`
* `Sinumerik`
* `Sopas`
* `Sql`
* `Systemstate`
* `Werma`

### connection

Reference to a [Cybus::Connection](https://docs.cybus.io/2-0-6/documentation/services/service-commissioning-files/resources/cybus-connection) resource.

* **Required**
* Type: `string`

### subscribe / read / write

* One of these is **required**
* Type: `object`

  Depending on the [protocol](#protocol) type, this property needs the following parameters (properties) which specify the actual data address in the respective protocol:

  * `Ads` [Endpoint Properties](https://docs.cybus.io/2-0-6/industry-protocol-details/ads#endpoint-properties)
  * `Bacnet` [Endpoint Properties](https://docs.cybus.io/2-0-6/industry-protocol-details/bacnet#endpoint-properties)
  * `EthernetIp` [Endpoint Properties](https://docs.cybus.io/2-0-6/industry-protocol-details/ethernet-ip#endpoint-properties)
  * `Focas` [Endpoint Properties](https://docs.cybus.io/2-0-6/industry-protocol-details/focas#endpoint-properties)
  * `Hbmdaq` [Endpoint Properties](https://docs.cybus.io/2-0-6/industry-protocol-details/hottinger-baldwin-messtechnik-hbm#endpoint-properties)
  * `Heidenhain` [Endpoint Properties](https://docs.cybus.io/2-0-6/industry-protocol-details/heidenhain-dnc#endpoint-properties)
  * `Http` [Endpoint Properties](https://docs.cybus.io/2-0-6/industry-protocol-details/http-rest#endpoint-properties)
  * `Influxdb` [Endpoint Properties](https://docs.cybus.io/2-0-6/industry-protocol-details/influxdb#endpoint-properties)
  * `Kafka` [Endpoint Properties](https://docs.cybus.io/2-0-6/industry-protocol-details/kafka#endpoint-properties)
  * `Modbus` [Endpoint Properties](https://docs.cybus.io/2-0-6/industry-protocol-details/modbus-tcp#endpoint-properties)
  * `Mqtt` [Endpoint Properties](https://docs.cybus.io/2-0-6/industry-protocol-details/mqtt#endpoint-properties)
  * `Mssql` [Endpoint Properties](https://docs.cybus.io/2-0-6/industry-protocol-details/mssql#endpoint-properties)
  * `Opcda` [Endpoint Properties](https://docs.cybus.io/2-0-6/industry-protocol-details/opc-da#endpoint-properties)
  * `Opcua` [Endpoint Properties](https://docs.cybus.io/2-0-6/industry-protocol-details/opc-ua/opc-ua-client#endpoint-properties)
  * `S7` [Endpoint Properties](https://docs.cybus.io/2-0-6/industry-protocol-details/siemens-simatic-s7#endpoint-properties)
  * `Shdr` [Endpoint Properties](https://docs.cybus.io/2-0-6/industry-protocol-details/shdr#endpoint-properties)
  * `Sinumerik` [Endpoint Properties](https://docs.cybus.io/2-0-6/industry-protocol-details/sinumerik#endpoint-properties)
  * `Sopas` [Endpoint Properties](https://docs.cybus.io/2-0-6/industry-protocol-details/sopas#endpoint-properties)
  * `Sql` [Endpoint Properties](https://docs.cybus.io/2-0-6/industry-protocol-details/sql#endpoint-properties)
  * `Systemstate` [Endpoint Properties](https://docs.cybus.io/2-0-6/industry-protocol-details/systemstate#endpoint-properties)
  * `Werma` [Endpoint Properties](https://docs.cybus.io/2-0-6/industry-protocol-details/werma-win-ethernet#endpoint-properties)

{% hint style="warning" %}
Strictly speaking, the protocol’s properties mentioned here are not properties of the endpoint but rather those of the [subscribe / read / write](#subscribe-read-write) property of the endpoint. In other words, those important properties must appear one level deeper in the YAML file: Not directly below `endpoint` but below `subscribe`/`read`/`write`, which in turn is below `endpoint`. These two different levels must not be confused.
{% endhint %}

### rules

* **Optional**
* Type: `array` of [Rule Engine](https://docs.cybus.io/2-0-6/documentation/services/rule-engine)

You may specify rules to your payload here before sending it to the internal broker for the first time.

{% hint style="warning" %}
These rules will transform the raw data as received from this protocol. It affects all further steps in the processing chain.
{% endhint %}

### qos

MQTT Quality of Service (QoS) for the internal messaging from Endpoint to the internal MQTT broker.

If this endpoint runs on an agent, setting this to `1` instead of the default `0` will activate the simple buffering of MQTT client implementations.

* **Optional**
* Type: `integer`, must be one of `0`, `1`, `2`
* Default: `0`

{% hint style="warning" %}
QoS level 2 is most likely not useful in the industry context and is not recommended here.
{% endhint %}

### retain

Whether the last message should be retained (last-value-cached) on the internal MQTT broker.

If this endpoint runs on an agent, setting this to `true` instead of the default `false` might be useful in certain applications to have some value on the topic if the agent disconnects. However, in other applications this might not make sense.

* **Optional**
* Type: `boolean`, must be one of `true`, `false`
* Default: `false`

### targetState

The state this resource should be in after startup.

* **Optional**
* Type: `enum`, must be one of `enabled`, `disabled`
* Default: `enabled`

### topic

Explicit topic name to which this endpoint address should be mapped.

{% hint style="warning" %}
The provided topic name is prefixed with the value of the [Cybus::MqttRoot](https://docs.cybus.io/2-0-6/documentation/services/parameters#global-pre-defined-parameters) global parameter. This global parameter by default has the value `services/<serviceId>` where `<serviceId>` is replaced with the actual ServiceID of the current service. Hence, in the default case the full endpoint topic will expand to:
{% endhint %}

`services/<serviceId>/<topic>`

See the explanation at [Cybus::MqttRoot](https://docs.cybus.io/2-0-6/documentation/services/parameters#global-pre-defined-parameters) if alternative topic structures are needed.

Providing a custom topic and avoiding an additional mapping resource improves overall performance as the message has to travel one hop less. Endpoints with custom topics can still be mapped using a regular mapping (see [Cybus::Mapping](https://docs.cybus.io/2-0-6/documentation/services/service-commissioning-files/resources/cybus-mapping)).

* **Optional**
* Type: `string`

### agentName

**Obsolete**. This value is no longer being used. The agentName of the referenced connection is always used if this connection and endpoint are being used on an agent instance, separate from Connectware.

### buffering

The `buffering` section can optionally switch on output data buffering on `write` endpoints. With this feature, it is possible to enable output buffering on write operations for when the connection to the device is lost, in order to avoid data loss.

The buffering mechanism kicks in when a device disconnection is detected and will start buffering any incoming values. After the connection is reestablished, buffered messages will be written to the machine (“flushed”).

The flushing of the buffer is implemented to handle subsequent disconnections during flushing correctly. In such a case, newly incoming values will be buffered, too. Once the connection is reestablished, the flushing will continue where it left off.

By default, this feature is switched off. To enable it, the property `enabled` must be set to `true`, and most likely additional properties should be set according to the expected behavior in the actual application scenario. The supported properties of buffering are:

* `enabled` (default: `false`): Whether buffering should be enabled or not when the connection to the source device is lost.
* `keepOrder` (default: `true`): Whether to keep the order of messages when going into redelivery mode after an endpoint comes back online.
* `burstInterval` (default: `100`): Time in milliseconds to wait between each re-publishing of buffered messages after connection is re-established.
* `burstSize` (default: `1`): The number of messages to send in one burst when flushing the buffer upon re-connection.
* `bufferMaxSize` (default: `100000`): The max number of messages to be buffered. Older messages are deleted when this limit is reached.
* `bufferMaxAge` (default: `86400`, one day): The number of seconds the buffered data will be kept. If messages have been buffered for longer than this number of seconds, they will be discarded.

{% hint style="warning" %}
It is important to keep a balanced configuration of these properties to avoid potentially unwanted behavior. For example, if you configure a large `bufferMaxSize` in combination with a slow `burstInterval` and a small `burstSize`, flushing the buffer could take very long and depending on the `bufferMaxAge` messages could expire.

The values should be configured based on the target device capabilities.
{% endhint %}

The `keepOrder` property, which is switched on by default, will keep the order of arriving messages when a flush of the buffer is in progress. This will delay newly arriving messages until all the buffered messages have been sent.

For example, if we had the values `1`, `2`, `3`, `4` in the buffer, the buffer starts the flushing of values after a reconnection, and the values `5`, `6` are received in the meantime, then the machine will get the values in that exact order `1`, `2`, `3`, `4`, `5`, `6`. If this property was set to false and the same scenario is replicated, the order of arrival of the new values is unspecified and the end result would be an interleaved set of values, for example: `1`, `5`, `2`, `3`, `6`, `4`.

### inputBuffering

The input data to each endpoint can optionally be managed through an individual input buffer (also called `input queue`) to establish fine-grained control for high data rate behavior. By default, this input buffering is disabled and instead all input data is handled on the global event queue, which works fine as long as there is no risk of out-of-memory exceptions due to unexpected slow data processing or forwarding.

When enabling the individual input buffer, the buffer properties determine the behavior in situations when the input buffer is filling up. The buffer is filling up when the message arrival rate is larger than the processing data rate or the forwarding (publishing) data rate. Or, in other words, the input buffer is filling up if the messages arrive faster than how they can be processed or be forwarded (published). If this situation happens for longer time durations, the input buffer will reach its configured capacity limits and arriving messages will be dropped, so that the system will not run into an uncontrollable out-of-memory exception. This is a fundamental and unavoidable property of distributed systems due to its finite resources. But the actual behavior of the input buffer can be adapted to the actual application scenario by setting the properties in the `inputBuffering` section (optional).

Supported properties are (all optional):

* `enabled` (type: `boolean`, default: `false`): Enable or disable input buffering.
* `maxInputBufferSize` (type: `integer`, default: `5000`): Maximum number of input messages that are queued in the input buffer. Exceeding messages will be discarded. Adjust this to a higher value if you are handling bursty traffic.
* `maxConcurrentMessages` (type: `integer`, default: `2`): Maximum number of concurrently processed messages as long as the input buffer queue is non-empty.
* `waitingTimeOnEmptyQueue` (type: `integer`, default: `10`): Waiting time in milliseconds after the input buffer queue ran empty and before checking again for newly queued input messages. Regardless of this value, on non-empty input buffer queue all messages will be processed without waiting time in between until the queue is empty again.

### publishError

Controls whether error information is published in a structured format when an operation fails. When `publishError` is set to `true`, errors will be published as a structured object containing both an error code and an error message:

```json
{
...
    "error": {
        "code": 500,
        "message": "Example error message"
    }
...
}
```

* **Optional**
* Type: `boolean`, must be one of `true`, `false`
* Default: `false`
* Compatible with HTTP and OPC UA protocols

Without this property, errors may still be published but only as a simple text string, like: `"error": "An error occurred"`

## Examples

### BACnet

{% code lineNumbers="true" %}

```yaml
bacnetSubscribe:
  type: Cybus::Endpoint
  properties:
    protocol: Bacnet
    connection: !ref bacnetConnection
    subscribe:
      objectType: analog-input
      objectInstance: 2796204
      interval: 1000
# This subscribes to a Bacnet analog input of object instance 2796204
```

{% endcode %}

### OPC UA

{% code lineNumbers="true" %}

```yaml
opcuaSubscribeToCurrentServerTime:
  type: Cybus::Endpoint
  properties:
    protocol: Opcua
    connection: !ref opcuaConnection
    subscribe:
      nodeId: i=2258
# This subscribes to the OPC UA server node that publishes the current time
```

{% endcode %}

### MQTT with Write Buffering Enabled

{% code lineNumbers="true" %}

```yaml
writeEndpoint:
  type: Cybus::Endpoint
  properties:
    protocol: Mqtt
    connection: !ref mqttConnection
    buffering:
      enabled: true
      keepOrder: true
      burstInterval: 10
      burstSize: 100
      bufferMaxSize: 20000
      bufferMaxAge: 5000
    write:
      topic: test/write
# This configures a write endpoint which will buffer up to 20000 messages
# if the connection is lost and will publish 100 messages every 10 milliseconds
# when the connection is reestablished. New incoming messages will be published
# only when the originally buffered items were all published
```

{% endcode %}
