# Kafka

Apache Kafka is a software framework for stream-processing of large amounts of data. It is an open-source software platform developed by the Apache Software Foundation, written in Scala and Java. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds.

This protocol implementation allows to send the data from the Connectware to an Apache Kafka broker for message processing or to subscribe to a particular topic to receive data.

Currently, the protocol supports authentification by means of SASL and compression of produced messages by using GZIP.

## Message assembly

The endpoints in this protocol are available for the `write` and `subscribe` operations only. To transmit messages to the Kafka broker, a JSON message with the payload as `value` needs to be sent to the respective endpoint. The message may look as follows when producing messages to the broker:

{% code lineNumbers="true" %}

```yaml
{
    'id': 123321,
    'topic': 'some_test_topic',
    'acks': 1,
    'timeout': 3000,
    'compression': 'None',
    'value':
        [
            { 'key': 'sample-key 1', 'value': 'sample message #1', 'partition': 0, 'headers': { 'foo': 'bar' }, 'timestamp': 1633425301000 },
            { 'key': 'sample-key 2', 'value': 'sample message #2', 'partition': 1, 'headers': { 'foo': 'bar' }, 'timestamp': 1633425301001 },
        ],
}
```

{% endcode %}

Which upon successful delivery, the following message will be generated on the `/res` topic:

{% code lineNumbers="true" %}

```yaml
{
    'id': 123321,
    'timestamp': 1633425302878,
    'result':
        {
            'value':
                [
                    { 'topicName': 'some_test_topic', 'partition': 0, 'errorCode': 0, 'baseOffset': '0', 'logAppendTime': '-1', 'logStartOffset': '0' },
                    { 'topicName': 'some_test_topic', 'partition': 1, 'errorCode': 0, 'baseOffset': '0', 'logAppendTime': '-1', 'logStartOffset': '0' },
                ],
        },
}
```

{% endcode %}

And when delivery is unsuccessful, you will get the following message explaining what happened:

{% code lineNumbers="true" %}

```yaml
{ 'id': 123321, 'timestamp': 1633425495764, 'error': { 'code': -1, 'message': 'Connection error: connect ECONNREFUSED 192.168.0.1:9092' } }
```

{% endcode %}

Upon subscription you will get messages like this:

{% code lineNumbers="true" %}

```yaml
{
    'timestamp': 1649046618574,
    'value': { 'topic': 'some_test_topic', 'partition': 0, 'key': 'totally optional msg key', 'headers': {}, 'message': 'test 123 test msg body' },
}
```

{% endcode %}

## Commissioning File Specifics

The Endpoints for the Kafka connections may only be of the type `write` or `subscribe`.

Special care must be taken when configuring the connection properties since connections can only take all write endpoints or all subscribe endpoints. But not both at the same time.

To choose the type of connection you are planning to use, you will need to configure the property `clientType` on the connection resource.

Further configuration options are listed below:

[Connection Properties](https://docs.cybus.io/1-11-0/documentation/industry-protocol-details/kafka/kafkaconnection)

[Endpoint Properties](https://docs.cybus.io/1-11-0/documentation/industry-protocol-details/kafka/kafkaendpoint)

## Service Commissioning File Example

{% file src="<https://3526354945-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FrCPVzPlizXFgeCsIL7jH%2Fuploads%2Fgit-blob-16579f4f48192140658efaf04a33b7475d17af4a%2Fkafka-example-producer.yml?alt=media>" %}

{% code lineNumbers="true" %}

```yaml
---
# ----------------------------------------------------------------------------#
# Commissioning File
# ----------------------------------------------------------------------------#
# Copyright: Cybus GmbH (2020)
# Contact: support@cybus.io
# ----------------------------------------------------------------------------#
# Source Interface Definition - Kafka broker
# ----------------------------------------------------------------------------#

description: >
    Sample commissioning file for communicating with Kafka broker (producer)

metadata:
    name: Apache Kafka Connectivity
    icon: https://www.cybus.io/wp-content/uploads/2017/10/for-whom1.svg
    provider: cybus
    homepage: https://www.cybus.io
    version: 0.0.1

parameters:
    param_brokers:
        description: Hosts or IP addresses of the Apache Kafka broker
        type: string
        default: 192.168.0.1:9092

resources:
    kafkaConnection:
        type: Cybus::Connection
        properties:
            protocol: Kafka
            connection:
                brokers: [!ref param_brokers]

    kafkaWriteExample:
        type: Cybus::Endpoint
        properties:
            protocol: Kafka
            connection: !ref kafkaConnection
            write:
                topic: some_test_topic
```

{% endcode %}

{% file src="<https://3526354945-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FrCPVzPlizXFgeCsIL7jH%2Fuploads%2Fgit-blob-c453ea9854b46c9ed940126e0821ab9cc6a50920%2Fkafka-example-consumer.yml?alt=media>" %}

{% code title="kafka-example-consumer.yml" lineNumbers="true" %}

```yaml
---
# ----------------------------------------------------------------------------#
# Commissioning File
# ----------------------------------------------------------------------------#
# Copyright: Cybus GmbH (2020)
# Contact: support@cybus.io
# ----------------------------------------------------------------------------#
# Source Interface Definition - Kafka broker
# ----------------------------------------------------------------------------#

description: >
    Sample commissioning file for communicating with Kafka broker (consumer)

metadata:
    name: Apache Kafka Connectivity
    icon: https://www.cybus.io/wp-content/uploads/2017/10/for-whom1.svg
    provider: cybus
    homepage: https://www.cybus.io
    version: 0.0.1

parameters:
    param_brokers:
        description: Hosts or IP addresses of the Apache Kafka broker
        type: string
        default: 192.168.0.1:9092

resources:
    kafkaConnection:
        type: Cybus::Connection
        properties:
            protocol: Kafka
            connection:
                clientType: 'consumer'
                brokers: [!ref param_brokers]

    kafkaSubscribeExample:
        type: Cybus::Endpoint
        properties:
            protocol: Kafka
            connection: !ref kafkaConnection
            subscribe:
                fromBeginning: false
                topic: some_test_topic
```

{% endcode %}
