Generic VRPC

The Generic VRPC protocol is an interface for managing a custom implementation of any arbitrary protocol, where VRPC (Variadic Remote Procedure Calls via MQTT, http://vrpc.io ) is used as the configuration and life cycle management interface from the Connectware to the custom implementation.

The data interface for the actual process data, on the other hand, needs to be implemented by the custom code itself. This can be any sort of data format and transport protocol as required by the concrete application, for example JSON over MQTT, but could be anything. The Generic VRPC protocol just serves as an integration layer of the custom implementation into the configuration and life cycle layer of the Connectware.

Using VRPC offers the following benefits:

  • The custom implementation can be written in any programming language for which VRPC bindings exist, i.e. Javascript/Node.js, Python, or C++

  • VRPC wraps the state machine function calls which are needed when implementing the details of a protocol life cycle

  • The parameter values for the custom protocol can be configured directly inside Connectware’s service commissioning files (see Structure of Commissioning Files) and are transparently passed through to the custom implementation, hence eliminating any need for separate parameter management solutions (e.g. separate config files).

The interface from the Connectware to the custom implementation relies on the following requirements:

  1. The custom code runs a VrpcAdapter instance that registers at the Connectware’s MQTT broker

  2. The custom code implements the protocol state machine as a class that implements these seven methods: connect, disconnect, isConnected, subscribe, unsubscribe, read, write

There are no further requirements on the custom code except these seven methods.

Service Commissioning File Example

This part of the commissioning file describes one connection and one endpoint in a custom protocol implementation.

The custom implementation runs an instance of VrpcAdapter which registered itself at the Connectware’s MQTT broker using the vrpc domain mycustom.vrpc and the agent name mycustom.implementation. The protocol implementation itself is written in a class named MyProtocol. These three arguments are the only parts needed for configuring the GenericVrpc protocol.

device1:
    type: Cybus::Connection
    properties:
        protocol: GenericVrpc
        targetState: connected
        connection:
            vrpc:
                domain: mycustom.vrpc
                agent: mycustom.implementation
                className: MyProtocol
            args:
                someParameter: SomeValue
                someOtherParameter: 'here://1.2.3.4:456/this/that'
                anotherParameter: 123

processData1:
    type: Cybus::Endpoint
    properties:
        protocol: GenericVrpc
        connection: !ref device1
        subscribe:
            someEndpointParameter: AnotherValue
            someNumber: 2000

All values below args of the connection are passed on to the MyProtocol class transparently, specifically to the constructor of each MyProtocol instance. In this example, three values of various types are shown.

All values below subscribe of the endpoint are passed on to the MyProtocol::subscribe method call. In this example, two values are shown. The same sort of additional parameters can be added at the read and write sections of endpoints.

Example Protocol implementation

This example code shows the seven methods as mentioned above in a Node.js/Javascript implementation of the MyProtocol class, which is derived from Node.js’s standard EventEmitter class. The EventEmitter base class is used for notification of successful connect/disconnect but most importantly for notification of newly arriving data to which the Connectware should be subscribed.

const EventEmitter = require('events')

/** Implements the "protocol" interface which is expected from the
*  connectware when creating a "Connection" with "Endpoints" to receive data.  */
class MyProtocol extends EventEmitter {
  /** Constructor. Receives a JSON object with options that have been specified
  *  in the commissioning file of the connectware. */
  constructor (options) {
      super()
      this._deviceData = options
      this._isConnected = false
  }

  /** Called when the "Connection" over this "protocol" should actually connect. */
  async connect () {
      do_something_to_connect_with(this._deviceData)
      // ...
      this._isConnected = true
      this.emit('connected')  // notifies that we are connected
  }

  /** Called when the "Connection" over this "protocol" should disconnect. */
  async disconnect () {
      do_something_to_disconnect()
      // ...
      this._isConnected = false
      this.emit('disconnected')  // notifies that we are disconnected
  }

  /** Returns true if the actual connection has been established. */
  async isConnected () {
      return this._isConnected
  }

  /** Called when an "Endpoint" over this "Connection" should be subscribed.
  *
  * The address argument contains a JSON object with the options that have been
  * specified in the commissioning file of the connectware.
  *
  * The id argument is the eventName to which the new data should be emitted,
  * see example below. */
  async subscribe (address, id) {
      do_something_to_subscribe(this._device, address)
      store_id_address_mapping(id, address)
      // ...
      setInterval(() => this.emit(id, { value: someValue } )) // as an example
  }

  /** Called when the "Endpoint" over this "Connection" should be unsubscribed. */
  async unsubscribe (id) {
      const address = get_id_address_mapping(id)
      do_something_to_unsubscribe(address)
      // ...
  }

  /** Called when one "Endpoint"'s data should be read. */
  async read (address) {
      throw new Error('read() not implemented here so far')
  }

  /** Called when data should be written to one "Endpoint". */
  async write (address, data) {
      throw new Error('write() not implemented here so far')
  }
}

This class needs to be registered at a local VrpcAdapter instance. In a Node.js/Javascript application this is done in the top-level index.js file as follows:

const { VrpcAdapter, VrpcAgent } = require('vrpc')
const MyProtocol = require('./src/MyProtocol')

VrpcAdapter.register(MyProtocol)

const vrpcAgent = new VrpcAgent({
  domain: `mycustom.vrpc`,
  agent: `mycustom.implementation`,
  username: `my_mqtt_username`,
  password: `my_mqtt_password`,
  broker: `mqtt://1.2.3.4:1833`
})
vrpcAgent.serve()

Connection Properties

Endpoint Properties

Last updated

Logo

© Copyright 2024, Cybus GmbH