Generic VRPC

Overview

The Generic VRPC protocol is an interface for managing a custom implementation of any arbitrary protocol, where VRPC (Variadic Remote Procedure Calls via MQTT, http://vrpc.io ) is used as the configuration and life cycle management interface from the Connectware to the custom implementation.

The data interface for the actual process data, on the other hand, needs to be implemented by the custom code itself. This can be any sort of data format and transport protocol as required by the concrete application, for example JSON over MQTT, but could be anything. The Generic VRPC protocol just serves as an integration layer of the custom implementation into the configuration and life cycle layer of the Connectware.

Using VRPC offers the following benefits:

  • The custom implementation can be written in any programming language for which VRPC bindings exist, i.e. Javascript/Node.js, Python, or C++

  • VRPC wraps the state machine function calls which are needed when implementing the details of a protocol life cycle

  • The parameter values for the custom protocol can be configured directly inside Connectware’s service commissioning files (see Structure of Commissioning Files) and are transparently passed through to the custom implementation, hence eliminating any need for separate parameter management solutions (e.g. separate config files).

The interface from the Connectware to the custom implementation relies on the following requirements:

  1. The custom code runs a VrpcAdapter instance that registers at the Connectware’s MQTT broker

  2. The custom code implements the protocol state machine as a class that implements these seven methods: connect, disconnect, isConnected, subscribe, unsubscribe, read, write

There are no further requirements on the custom code except these seven methods.

Example Configuration

This part of the commissioning file describes one connection and one endpoint in a custom protocol implementation.

The custom implementation runs an instance of VrpcAdapter which registered itself at the Connectware’s MQTT broker using the vrpc domain mycustom.vrpc and the agent name mycustom.implementation. The protocol implementation itself is written in a class named MyProtocol. These three arguments are the only parts needed for configuring the GenericVrpc protocol.

device1:
  type: Cybus::Connection
  properties:
    protocol: GenericVrpc
    targetState: connected
    connection:
      vrpc:
        domain: mycustom.vrpc
        agent: mycustom.implementation
        className: MyProtocol
      args:
        someParameter: SomeValue
        someOtherParameter: "here://1.2.3.4:456/this/that"
        anotherParameter: 123

processData1:
  type: Cybus::Endpoint
  properties:
    protocol: GenericVrpc
    connection: !ref device1
    subscribe:
      someEndpointParameter: AnotherValue
      someNumber: 2000

All values below args of the connection are passed on to the MyProtocol class transparently, specifically to the constructor of each MyProtocol instance. In this example, three values of various types are shown.

All values below subscribe of the endpoint are passed on to the MyProtocol::subscribe method call. In this example, two values are shown. The same sort of additional parameters can be added at the read and write sections of endpoints.

Example Protocol implementation

This example code shows the seven methods as mentioned above in a Node.js/Javascript implementation of the MyProtocol class, which is derived from Node.js’s standard EventEmitter class. The EventEmitter base class is used for notification of successful connect/disconnect but most importantly for notification of newly arriving data to which the Connectware should be subscribed.

const EventEmitter = require('events')

/** Implements the "protocol" interface which is expected from the
*  connectware when creating a "Connection" with "Endpoints" to receive data.  */
class MyProtocol extends EventEmitter {
  /** Constructor. Receives a JSON object with options that have been specified
  *  in the commissioning file of the connectware. */
  constructor (options) {
      super()
      this._deviceData = options
      this._isConnected = false
  }

  /** Called when the "Connection" over this "protocol" should actually connect. */
  async connect () {
      do_something_to_connect_with(this._deviceData)
      // ...
      this._isConnected = true
      this.emit('connected')  // notifies that we are connected
  }

  /** Called when the "Connection" over this "protocol" should disconnect. */
  async disconnect () {
      do_something_to_disconnect()
      // ...
      this._isConnected = false
      this.emit('disconnected')  // notifies that we are disconnected
  }

  /** Returns true if the actual connection has been established. */
  async isConnected () {
      return this._isConnected
  }

  /** Called when an "Endpoint" over this "Connection" should be subscribed.
  *
  * The address argument contains a JSON object with the options that have been
  * specified in the commissioning file of the connectware.
  *
  * The id argument is the eventName to which the new data should be emitted,
  * see example below. */
  async subscribe (address, id) {
      do_something_to_subscribe(this._device, address)
      store_id_address_mapping(id, address)
      // ...
      setInterval(() => this.emit(id, { value: someValue } )) // as an example
  }

  /** Called when the "Endpoint" over this "Connection" should be unsubscribed. */
  async unsubscribe (id) {
      const address = get_id_address_mapping(id)
      do_something_to_unsubscribe(address)
      // ...
  }

  /** Called when one "Endpoint"'s data should be read. */
  async read (address) {
      throw new Error('read() not implemented here so far')
  }

  /** Called when data should be written to one "Endpoint". */
  async write (address, data) {
      throw new Error('write() not implemented here so far')
  }
}

This class needs to be registered at a local VrpcAdapter instance. In a Node.js/Javascript application this is done in the top-level index.js file as follows:

const { VrpcAdapter, VrpcAgent } = require('vrpc')
const MyProtocol = require('./src/MyProtocol')

VrpcAdapter.register(MyProtocol)

const vrpcAgent = new VrpcAgent({
  domain: `mycustom.vrpc`,
  agent: `mycustom.implementation`,
  username: `my_mqtt_username`,
  password: `my_mqtt_password`,
  broker: `mqtt://1.2.3.4:1833`
})
vrpcAgent.serve()

Connection Properties

vrpc (object)

Properties of the vrpc object:

agent (string)

domain (string)

className (string)

args (object)

All further parameters below will be passed through to the constructor of the connector

connectionStrategy (object)

If a connection attempt fails, retries will be performed with increasing delay (waiting time) in between. The following parameters control how these delays behave.

Properties of the connectionStrategy object:

initialDelay (integer)

Delay (waiting time) of the first connection retry (in milliseconds). For subsequent retries, the delay will be increased according to the parameter incrementFactor which has a default value of 2.

Default: 1000

Additional restrictions:

  • Minimum: 1000

maxDelay (integer)

Maximum delay (waiting time) to wait until the next retry (in milliseconds). The delay (waiting time) for any subsequent connection retry will not be larger than this value. Must be strictly greater than initialDelay.

Default: 30000

incrementFactor (integer)

The factor used to increment initialDelay up to maxDelay. For example if initialDelay is set to 1000 and maxDelay to 5000 the values for the delay would be 1000, 2000, 4000, 5000.

Default: 2

Additional restrictions:

  • Minimum: 2

For any of the read(), write() or subscribe() functions, further parameters can be added that will be passed through to the generic VRPC agent client


Endpoint Properties