Setting Up Shared Subscriptions

Learn about setting up shared subscriptions in Connectware.

In Connectware, you can use shared subscriptions with mappings and endpoints. Below, we provide three service commissioning file examples that demonstrate different use cases for implementing shared subscriptions.

Syntax of Shared Subscriptions

The syntax for shared subscriptions follows this pattern:

$share/group_name/topic
Parameter
Description
Required

$share

A fixed prefix that designates a shared subscription.

Yes

group_name

An identifier for the group of clients that will share the message load. All clients using the same group name will be part of the same message distribution group.

Yes

topic

The actual topic pattern to subscribe to. Can include wildcards and multi-level topics according to standard MQTT topic rules.

Yes

Example

$share/processing_group/sensors/data

In this example:

  • All clients subscribing with this pattern will be part of the processing_group

  • Messages published to sensors/data will be distributed among these clients

  • Each message will be delivered to exactly one client in the group, preventing duplicate processing

Shared Subscriptions Examples

Basic Example with One Agent

This example allows you to test if shared subscriptions work correctly by forwarding messages published to the input topic to the mapping subscribed to $share/group1/input.

  1. Install an agent. See Installing Agents.

  2. Register the agent in Connectware. See Registering Agents in Connectware.

  3. Apply the service to the agent. See Installing Services.

  4. Enable the services. See Enabling Services.

  5. Publish a message to the input topic.

Result: The agent will publish to the topic output.

description: |
  Simple Mapping with Shared Subscription

metadata:
  name: Simple Shared Subscription

resources:
  sharedSubscriptionMapping:
    type: Cybus::Mapping
    properties:
      mappings:
        - subscribe:
            topic: $share/group1/input
          publish:
            topic: output
          rules:
            - transform:
                expression: |
                  {
                    "msg": $
                  }

Advanced Example with Two Agents (Load Balancing)

This example allows you to test the load-balancing capability of shared subscriptions using two agents.

  1. Install two agents named agent01 and agent02. See Installing Agents.

  2. Register the agents in Connectware. See Registering Agents.

  3. Apply the same service to both agents (rename one for differentiation). See Installing Services.

  4. Enable the services. See Enabling Services.

  5. Publish a message to the input topic.

Result: Either agent01 or agent02 will publish to the output topic, but only one will act at a time. This ensures load balancing between the agents.

description: |
  Shared Subscription Test

metadata:
  name: Shared Subscription Test

parameters:
  agentName:
    description: Choose Agent
    type: string
    enum: ['agent01', 'agent02']

resources:
  sharedSubscriptionMapping:
    type: Cybus::Mapping
    properties:
      agentName: !ref agentName
      mappings:
        - subscribe:
            topic: $share/group1/input
          publish:
            topic: output
          rules:
            - transform:
                expression: !sub |
                  (
                    $merge([$, {"agent": "${agentName}"}]);
                  )

Advanced Example with Multiple Agents (Wildcard Mappings)

In this example, a shared subscription is deployed across multiple agents. This example sets up a qa group where agents subscribe to the service/# topic wildcard, allowing them to handle messages on all subtopics. This setup is particularly useful for load testing, where tens of thousands of messages per second are processed from sources like OPC UA. The agents then publish these messages to the northbound/opcua/$nodes topic.

The wildcards #nodes and $nodes represent a named wildcard that maps the topic tree under services (e.g., service/a/bc, service/x/y/z/...) to the northbound path (e.g., northbound/opcua/a/bc).

The dedicated connection specified in the subscribe property ensures that each agent creates a new MQTT client instead of sharing a single MqttConnection.

---
description: |
  Wildcard Mapping with Shared Subscription

metadata:
  name: Wildcard Mapping Shared Subscription
  provider: cybus
  homepage: https://www.cybus.io
  version: 1.0.0

parameters:
  agentName:
    type: string
    default: protocol-mapper

  mqttBroker:
    type: string
    default: broker.example.com

  mqttUser:
    type: string
    default: admin

  mqttPassword:
    type: string
    default: admin

resources:
  mqttConnection:
    type: Cybus::Connection
    properties:
      agentName: !ref agentName
      protocol: Mqtt
      targetState: connected
      connection:
        host: !ref mqttBroker
        username: !ref mqttUser
        password: !ref mqttPassword

  wildcardMappingSharedSubscription:
    type: Cybus::Mapping
    properties:
      agentName: !ref agentName
      mappings:
        - subscribe:
            topic: $share/qa/services/#nodes
            connection: !ref mqttConnection
          publish:
            topic: northbound/opcua/$nodes

Last updated

Was this helpful?