Kafka

Overview

Apache Kafka is a software framework for stream-processing of large amounts of data. It is an open-source software platform developed by the Apache Software Foundation, written in Scala and Java. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds.

This protocol implementation allows to send the data from the Connectware to an Apache Kafka broker for message processing or to subscribe to a particular topic to receive data.

Currently, the protocol supports authentification by means of SASL and compression of produced messages by using GZIP.

Message assembly

The endpoints in this protocol are available for the write and subscribe operations only. To transmit messages to the Kafka broker, a JSON message with the payload as value needs to be sent to the respective endpoint. The message may look as follows when producing messages to the broker:

{
  "id": 123321,
  "topic": "some_test_topic",
  "acks": 1,
  "timeout": 3000,
  "compression": "None",
  "value": [
    {
      "key": "sample-key 1",
      "value": "sample message #1",
      "partition": 0,
      "headers": {
        "foo": "bar"
      },
      "timestamp": 1633425301000
    },
    {
      "key": "sample-key 2",
      "value": "sample message #2",
      "partition": 1,
      "headers": {
        "foo": "bar"
      },
      "timestamp": 1633425301001
    }
  ]
}

Which upon successful delivery, the following message will be generated on the /res topic:

{
  "id": 123321,
  "timestamp": 1633425302878,
  "result": {
    "value": [
      {
        "topicName": "some_test_topic",
        "partition": 0,
        "errorCode": 0,
        "baseOffset": "0",
        "logAppendTime": "-1",
        "logStartOffset": "0"
      },
      {
        "topicName": "some_test_topic",
        "partition": 1,
        "errorCode": 0,
        "baseOffset": "0",
        "logAppendTime": "-1",
        "logStartOffset": "0"
      }
    ]
  }
}

And when delivery is unsuccessful, you will get the following message explaining what happened:

{
  "id": 123321,
  "timestamp": 1633425495764,
  "error": {
    "code": -1,
    "message": "Connection error: connect ECONNREFUSED 192.168.0.1:9092"
  }
}

Upon subscription you will get messages like this:

{
  "timestamp": 1649046618574,
  "value": {
    "topic": "some_test_topic",
    "partition": 0,
    "key": "totally optional msg key",
    "headers": {
    },
    "message": "test 123 test msg body"
  }
}

Commissioning File Specifics

The Endpoints for the Kafka connections may only be of the type write or subscribe.

Special care must be taken when configuring the connection properties since connections can only take all write endpoints or all subscribe endpoints. But not both at the same time.

To choose the type of connection you are planning to use, you will need to configure the property clientType on the connection resource.

Further configuration options are listed below:

Connection Properties

brokers (array, required)

The host names of the Kafka brokers

The object is an array with all elements of the type string.

Examples: ["kafka1:9092","kafka2:9092"], ["kafka1:9092","kafka2:9092","kafka3:9092"]

clientType (string, enum)

Configure the connection to be a producer for a consumer. Defaults to producer.

This element must be one of the following enum values:

  • producer

  • consumer

Default: "producer"

groupId (string)

Only for consumer clients: The group id for the Kafka consumer. It defaults to the id of the Connection resource.

clientId (string)

The client id used when connecting to the broker

Default: "protocol-mapper-kafka"

Example: "connectware_13462"

connectionTimeout (integer)

Time in milliseconds to wait for a successful connection

Default: 1000

requestTimeout (integer)

Time in milliseconds to wait for a successful request

Default: 30000

trustAllCertificates (boolean)

If true, all broker certificates will be accepted, regardless of whether they can be validated or not. Use this option if self-signed server certificates should be accepted, or if there are other reasons which prevent this client to validate the certificates.

Default: false

Examples: true, false

mutualAuthentication (boolean)

If true, a full certificate chain including client certificate is expected to connect properly with validated certificates.

Default: false

Examples: true, false

caCert (string)

The root CA certificate as Base64 encoded PEM file content

clientCert (string)

The device certificate as Base64 encoded PEM CRT file content

clientPrivateKey (string)

The device private key as Base64 encoded PEM CRT file content

sasl (object)

SASL stands for Simple Authentication and Security Layer. With this property you can configure the authentification scheme to be used when contacting the Kafka broker.

Properties of the sasl object:

mechanism (string, enum, required)

The strategy used to carry on authentification

This element must be one of the following enum values:

  • plain

  • scram-sha-256

  • scram-sha-512

username (string, required)

The username used in the authentification procedure

password (string, required)

The password used in the authentification procedure

Endpoint Properties

topic (string, required)

Any valid topic name addressing a single data-point

Example: "topic_name"

acks (number, enum)

Control the number of required acknowledgements. -1 = all insync replicas must acknowledge (default); 0 = no acknowledgments; 1 = only waits for the leader to acknowledge

This element must be one of the following enum values:

  • -1

  • 0

  • 1

Default: -1

timeout (number)

The time to await a response from the broker in milliseconds

Default: 30000

compression (string, enum)

The compression codec used to compress the messages

This element must be one of the following enum values:

  • None

  • GZIP

Default: "None"

fromBeginning (boolean)

Only for subscribe: Where to let the consumer start consuming messages. If true, start from the beginning of the topic. If false, start at the end.

Default: false

Sample Commissioning File

Download producer example: kafka-example-producer.yml

 1---
 2# ----------------------------------------------------------------------------#
 3# Commissioning File
 4# ----------------------------------------------------------------------------#
 5# Copyright: Cybus GmbH (2020)
 6# Contact: support@cybus.io
 7# ----------------------------------------------------------------------------#
 8# Source Interface Definition - Kafka broker
 9# ----------------------------------------------------------------------------#
10
11description: >
12  Sample commissioning file for communicating with Kafka broker (producer)
13
14metadata:
15  name: Apache Kafka Connectivity
16  icon: https://www.cybus.io/wp-content/uploads/2017/10/for-whom1.svg
17  provider: cybus
18  homepage: https://www.cybus.io
19  version: 0.0.1
20
21parameters:
22  param_brokers:
23    description: Hosts or IP addresses of the Apache Kafka broker
24    type: string
25    default: 192.168.0.1:9092
26
27resources:
28  kafkaConnection:
29    type: Cybus::Connection
30    properties:
31      protocol: Kafka
32      connection:
33        brokers: [
34          !ref param_brokers
35        ]
36
37  kafkaWriteExample:
38    type: Cybus::Endpoint
39    properties:
40      protocol: Kafka
41      connection: !ref kafkaConnection
42      write:
43        topic: some_test_topic

Download consumer example: kafka-example-consumer.yml

 1---
 2# ----------------------------------------------------------------------------#
 3# Commissioning File
 4# ----------------------------------------------------------------------------#
 5# Copyright: Cybus GmbH (2020)
 6# Contact: support@cybus.io
 7# ----------------------------------------------------------------------------#
 8# Source Interface Definition - Kafka broker
 9# ----------------------------------------------------------------------------#
10
11description: >
12  Sample commissioning file for communicating with Kafka broker (consumer)
13
14metadata:
15  name: Apache Kafka Connectivity
16  icon: https://www.cybus.io/wp-content/uploads/2017/10/for-whom1.svg
17  provider: cybus
18  homepage: https://www.cybus.io
19  version: 0.0.1
20
21parameters:
22  param_brokers:
23    description: Hosts or IP addresses of the Apache Kafka broker
24    type: string
25    default: 192.168.0.1:9092
26
27resources:
28  kafkaConnection:
29    type: Cybus::Connection
30    properties:
31      protocol: Kafka
32      connection:
33        clientType: 'consumer'
34        brokers: [
35          !ref param_brokers
36        ]
37
38  kafkaSubscribeExample:
39    type: Cybus::Endpoint
40    properties:
41      protocol: Kafka
42      connection: !ref kafkaConnection
43      subscribe:
44        fromBeginning: false
45        topic: some_test_topic