Setting Up Shared Subscriptions

Learn about setting up shared subscriptions in Connectware.

In Connectware, you can use shared subscriptions with mappings and endpoints. Below, we provide three service commissioning file examples that demonstrate different use cases for implementing shared subscriptions.

Syntax of Shared Subscriptions

The syntax for shared subscriptions follows this pattern:

$share/group_name/topic
Parameter
Description
Required

$share

A fixed prefix that designates a shared subscription.

Yes

group_name

An identifier for the group of clients that will share the message load. All clients using the same group name will be part of the same message distribution group.

Yes

topic

The actual topic pattern to subscribe to. Can include wildcards and multi-level topics according to standard MQTT topic rules.

Yes

Example

$share/processing_group/sensors/data

In this example:

  • All clients subscribing with this pattern will be part of the processing_group

  • Messages published to sensors/data will be distributed among these clients

  • Each message will be delivered to exactly one client in the group, preventing duplicate processing

Shared Subscriptions Examples

Basic Example with One Agent

This example allows you to test if shared subscriptions work correctly by forwarding messages published to the input topic to the mapping subscribed to $share/group1/input.

  1. Install an agent. See Installing Agents.

  2. Register the agent in Connectware. See Registering Agents in Connectware.

  3. Apply the service to the agent. See Installing Services.

  4. Enable the services. See Enabling Services.

  5. Publish a message to the input topic.

Result: The agent will publish to the topic output.

description: |
    Simple Mapping with Shared Subscription

metadata:
    name: Simple Shared Subscription

resources:
    sharedSubscriptionMapping:
        type: Cybus::Mapping
        properties:
            mappings:
                - subscribe:
                      topic: $share/group1/input
                  publish:
                      topic: output
                  rules:
                      - transform:
                            expression: |
                                {
                                  "msg": $
                                }

Advanced Example with Two Agents (Load Balancing)

This example allows you to test the load-balancing capability of shared subscriptions using two agents.

  1. Install two agents named agent01 and agent02. See Installing Agents.

  2. Register the agents in Connectware. See Registering Agents.

  3. Apply the same service to both agents (rename one for differentiation). See Installing Services.

  4. Enable the services. See Enabling Services.

  5. Publish a message to the input topic.

Result: Either agent01 or agent02 will publish to the output topic, but only one will act at a time. This ensures load balancing between the agents.

description: |
    Shared Subscription Test

metadata:
    name: Shared Subscription Test

parameters:
    agentName:
        description: Choose Agent
        type: string
        enum: ['agent01', 'agent02']

resources:
    sharedSubscriptionMapping:
        type: Cybus::Mapping
        properties:
            agentName: !ref agentName
            mappings:
                - subscribe:
                      topic: $share/group1/input
                  publish:
                      topic: output
                  rules:
                      - transform:
                            expression: !sub |
                                (
                                  $merge([$, {"agent": "${agentName}"}]);
                                )

Advanced Example with Multiple Agents (Wildcard Mappings)

In this example, a shared subscription is deployed across multiple agents. This example sets up a qa group where agents subscribe to the service/# topic wildcard, allowing them to handle messages on all subtopics. This setup is particularly useful for load testing, where tens of thousands of messages per second are processed from sources like OPC UA. The agents then publish these messages to the northbound/opcua/$nodes topic.

The wildcards #nodes and $nodes represent a named wildcard that maps the topic tree under services (e.g., service/a/bc, service/x/y/z/...) to the northbound path (e.g., northbound/opcua/a/bc).

The dedicated connection specified in the subscribe property ensures that each agent creates a new MQTT client instead of sharing a single MqttConnection.

---
description: |
    Wildcard Mapping with Shared Subscription

metadata:
    name: Wildcard Mapping Shared Subscription
    provider: cybus
    homepage: https://www.cybus.io
    version: 1.0.0

parameters:
    agentName:
        type: string
        default: protocol-mapper

    mqttBroker:
        type: string
        default: broker.example.com

    mqttUser:
        type: string
        default: admin

    mqttPassword:
        type: string
        default: admin

resources:
    mqttConnection:
        type: Cybus::Connection
        properties:
            agentName: !ref agentName
            protocol: Mqtt
            targetState: connected
            connection:
                host: !ref mqttBroker
                username: !ref mqttUser
                password: !ref mqttPassword

    wildcardMappingSharedSubscription:
        type: Cybus::Mapping
        properties:
            agentName: !ref agentName
            mappings:
                - subscribe:
                      topic: $share/qa/services/#nodes
                      connection: !ref mqttConnection
                  publish:
                      topic: northbound/opcua/$nodes

Last updated

Logo

© Copyright 2024, Cybus GmbH