Integrate with MQTT
Octopus provides two out-of-box ways to integrate with MQTT:
- Most Octopus adaptors, like BLE adaptor support to synchronize the device status via an MQTT broker. Get more MQTT extension adaptors below.
- If the device naturally supports MQTT, the MQTT adaptor can be used as the first choice.
This post mainly outlines the detail of the first way, if you want to know more about the MQTT adaptor, please view here. If the above out-of-box ways cannot satisfy you, you can follow the CONTRIBUTING to contribute your idea or develop a new adaptor.
Noted: MQTT extension only supports templated topic write - [publish] at present.
Although the latest version of MQTT is v5.0, for the time being, Octopus does not support the revision, the main reason is the corresponding development library does not support yet(paho.mqtt.golang/issues#347):
Integrating with MQTT to expose the status of a device, in addition to giving the device an ability to use MQTT, can also expand the usage scenarios of the device, such as equipment interaction and equipment monitoring.
MQTT
MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.
Although MQTT's name contains "MQ", it is not a protocol for defining a message queue, actually, the "MQ" refers to the MQseries product from IBM and has nothing to do with "Message Queue". MQTT is a lightweight and binary protocol, and due to its minimal packet overhead, MQTT excels when transferring data over the wire in comparison to protocols like HTTP. MQTT provides a means of communication that can be pub/sub like a message queue, at the same time, many features are provided to enrich communication scenarios, such as QoS, Last will and testament, retained message and so on. To learn more about MQTT, there are a series of articles that are highly recommended: MQTT Essentials.
Convention
MQTT uses subject-based filtering of messages. Every message contains a topic (subject) that the broker can use to determine whether a subscribing client gets the message or not.
In MQTT, the topic is a hierarchically-structured string that can be used to filter and route messages and the payload data is agnostic which means the publisher can send binary data, text data, or even full-fledged XML or JSON, so designing the topic tree and payload schema is an important work of any MQTT deployment.
Octopus recommends you to construct the topic name followed the best practices of MQTT topic from MQTT Essentials, and marshals the payload data as JSON.
Configuration
Octopus reorganizes the client parameters of github.com/eclipse/paho.mqtt.golang, and then provides a set of configuration options.
The current official Adaptors such as BLE, Modbus and OPC-UA support the MQTT protocol extension using the same configuration (refer to the following spec.template.spec.extension.mqtt
).
Specification
The specification of MQTT options are valid in all MQTT extension adaptors, they are using for connecting the MQTT broker, guiding the connection, indicating which topic to publish/subscribe and encoding of payload data.
MQTTOptions
Parameter | Description | Schema | Required |
---|---|---|---|
client | Specifies the client settings. | MQTTClientOptions | true |
message | Specifies the message settings. | MQTTMessageOptions | true |
MQTTClientOptions
Parameter | Description | Schema | Required |
---|---|---|---|
server | Specifies the server URI of MQTT broker, the format should be schema://host:port . The schema is one of the "ws", "wss", "tcp", "unix", "ssl", "tls" or "tcps". | string | true |
protocolVersion | Specifies the MQTT protocol version that the cluster uses to connect to broker. Legitimate values are 3 - MQTT 3.1 and 4 - MQTT 3.1.1, The default value is 0 , which means MQTT v3.1.1 identification is preferred. | *uint | false |
basicAuth | Specifies the username and password that the client connects to the MQTT broker. | *MQTTClientBasicAuth | false |
tlsConfig | Specifies the TLS configuration that the client connects to the MQTT broker. | *MQTTClientTLS | false |
cleanSession | Specifies setting the "clean session" flag in the connect message that the MQTT broker should not, default to true . | *bool | false |
store | Specifies to provide message persistence in cases where QoS level is 1 or 2, the default store is Memory . | *MQTTClientStore | false |
resumeSubs | Specifies to enable resuming of stored (un)subscribe messages when connecting but not reconnecting. This is only valid if cleanSession is false . The default value is false . | *bool | false |
connectTimeout | Specifies the amount of time that the client try to open a connection to an MQTT broker before timing out and getting error. A duration of 0 never times out. The default value is 30s . | *metav1.Duration | false |
keepAlive | Specifies the amount of time that the client should wait before sending a PING request to the broker. The default keep alive is 10s . | *metav1.Duration | false |
pingTimeout | Specifies the amount of time that the client should wait after sending a PING request to the brokerThe default value is 10s . | *metav1.Duration | false |
order | Specifies the message routing to guarantee order within each QoS level. The default value is true . | *bool | false |
writeTimeout | Specifies the amount of time that the client publish a message successfully before getting a timeout error, default to 30s . | *metav1.Duration | false |
waitTimeout | Specifies the amount of time that the client should timeout after subscribed/published a message, a duration of 0 never times out. | *metav1.Duration | false |
disconnectQuiesce | Specifies the quiesce when the client disconnects, default to 5s . | *metav1.Duration | false |
autoReconnect | Configures using the automatic reconnection logic, default to true . | *bool | false |
maxReconnectInterval | Specifies the amount of time that the client should wait before reconnecting to the broker, default to 10m . | *metav1.Duration | false |
messageChannelDepth | Specifies the size of the internal queue that holds messages while the client is temporarily offline, default to 100 . | *uint | false |
httpHeaders | Specifies the additional HTTP headers that the client sends in the WebSocket opening handshake. | map[string][]string | false |
MQTTClientBasicAuth
Parameter | Description | Schema | Required |
---|---|---|---|
username | Specifies the username for basic authentication. | string | false |
usernameRef | Specifies the relationship of DeviceLink's references to refer to the value as the username. | *edgev1alpha1.DeviceLinkReferenceRelationship | false |
password | Specifies the password for basic authentication. | string | false |
passwordRef | Specifies the relationship of DeviceLink's references to refer to the value as the password. | *edgev1alpha1.DeviceLinkReferenceRelationship | false |
MQTTClientTLS
Parameter | Description | Schema | Required |
---|---|---|---|
caFilePEM | The PEM format content of the CA certificate, which is used for validate the server certificate with. | string | false |
caFilePEMRef | Specifies the relationship of DeviceLink's references to refer to the value as the CA file PEM content . | *edgev1alpha1.DeviceLinkReferenceRelationship | false |
certFilePEM | The PEM format content of the certificate, which is used for client authenticate to the server. | string | false |
certFilePEMRef | Specifies the relationship of DeviceLink's references to refer to the value as the client certificate file PEM content . | *edgev1alpha1.DeviceLinkReferenceRelationship | false |
keyFilePEM | The PEM format content of the key, which is used for client authenticate to the server. | string | false |
keyFilePEMRef | Specifies the relationship of DeviceLink's references to refer to the value as the client key file PEM content. | *edgev1alpha1.DeviceLinkReferenceRelationship | false |
serverName | Indicates the name of the server, ref to http://tools.ietf.org/html/rfc4366#section-3.1 | string | false |
insecureSkipVerify | Doesn't validate the server certificate, default value is false . | bool | false |
MQTTClientStore
Parameter | Description | Schema | Required |
---|---|---|---|
type | Specifies the type of storage. Options are Memory and File , the default value is Memory . | string | false |
direcotryPrefix | Specifies the directory prefix of the storage, if using File store. The default value is /var/run/octopus/mqtt . | string | false |
MQTTMessageOptions
Parameter | Description | Schema | Required |
---|---|---|---|
topic | Specifies the topic. | string | true |
will | Specifies the will message. | *MQTTWillMessage | false |
qos | Specifies the QoS of the message, default value is 1 . | *MQTTMessageQoSLevel | false |
retained | Specifies if the last published message to be retained, default is true . | *bool | false |
path | Specifies the path for rendering the :path keyword of topic. | string | false |
operator | Specifies the operator for rendering the :operator keyword of topic. | *MQTTMessageTopicOperator | false |
MQTTWillMessage
Parameter | Description | Schema | Required |
---|---|---|---|
topic | Specifies the topic of will message. If not set, the topic will append $will to the topic name specified in parent field as its topic name. | string | false |
content | Specifies the content of will message. The serialized form of the content is a Base64 encoded string, representing the arbitrary (possibly non-string) content value here. | string | true |
MQTTMessageQoSLevel
Parameter | Description | Schema |
---|---|---|
0 | Send at most once. | byte |
1 | Send at least once. | byte |
2 | Send exactly once. | byte |
MQTTMessageTopicOperator
Parameter | Description | Schema | Required |
---|---|---|---|
read | Specifies the operator for rendering the :operator keyword of topic during subscribing. | string | false |
write | Specifies the operator for rendering the :operator keyword of topic during publishing. | string | false |
YAML
The specification of MQTT options are valid in all MQTT extension adaptors, they are using for connecting the MQTT broker server, guiding the connection, indicating which topic to publish/subscribe and encoding of payload data and so on.
REQUIRED is the required field to be filled.
Templated Topic
Octopus provides a templated topic to adapt to different MQTT publish and subscribe scenarios. There is five keywords supported in templated topic:
:namespace
, replaces with DeviceLink's Namespace.:name
, replaces with DeviceLink's Name.:uid
, replaces with DeviceLink's UID.:path
, replaces with custom path.:operator
, replaces based on operation(read
- subscribe,write
- publish). It is worth noting thatread
operations are not supported in MQTT extension, but work well in MQTT adaptor.
Templated topic has two features as below:
- Fault-tolerant extra separator,
path: "a///b///c"
will treat aspath: "a/b/c"
. - Automatically ignore keywords without content.
Usage Cases
Given topic
cattle.io/octopus/:namespace/device/:name
, when the DeviceLink is named asdefault/case1
:apiVersion: edge.cattle.io/v1alpha1kind: DeviceLinkmetadata:namespace: defaultname: case1uid: fcd1eb1b-ea42-4cb9-afb0-0ec2d0830583spec:...template:...spec:extension:mqtt:...message:topic: "cattle.io/octopus/:namespace/device/:name"- Publish Topic:
cattle.io/octopus/default/device/case1
- Subscribe Topic:
cattle.io/octopus/default/device/case1
- Publish Topic:
Given topic
cattle.io/octopus/device/:uid
, when the DeviceLink is named asdefault/case2
:apiVersion: edge.cattle.io/v1alpha1kind: DeviceLinkmetadata:namespace: defaultname: case2uid: 41478d1e-c3f8-46e3-a3b5-ba251f285277spec:...template:...spec:extension:mqtt:...message:topic: "cattle.io/octopus/device/:uid"- Publish Topic:
cattle.io/octopus/device/41478d1e-c3f8-46e3-a3b5-ba251f285277
- Subscribe Topic:
cattle.io/octopus/device/41478d1e-c3f8-46e3-a3b5-ba251f285277
UID is the unique identify provided by Kubernetes to represent the resource, and there is not much reading meaning to the outside. Therefore, it is not recommended to use this keyword in general cases.
- Publish Topic:
Given topic
cattle.io/octopus/:operator/device/:namespace/:name
, when the DeviceLink is named asdefault/case3
:apiVersion: edge.cattle.io/v1alpha1kind: DeviceLinkmetadata:namespace: defaultname: case3uid: 835aea2e-5f80-4d14-88f5-40c4bda41aa3spec:...template:...spec:extension:mqtt:...message:topic: "cattle.io/octopus/:operator/device/:namespace/:name"operator:write: "set"- Publish Topic:
cattle.io/octopus/set/device/default/case3
- Subscribe Topic:
cattle.io/octopus/device/default/case3
- Publish Topic:
Given topic
cattle.io/octopus/:operator/device/:path/:uid
, when the DeviceLink is named asdefault/case4
:apiVersion: edge.cattle.io/v1alpha1kind: DeviceLinkmetadata:namespace: defaultname: case4uid: 014997f5-1f12-498b-8631-d2f22920e20aspec:...template:...spec:extension:mqtt:...message:topic: "cattle.io/octopus/:operator/device/:path/:uid"operator:read: "status"path: "region/ap"- Publish Topic:
cattle.io/octopus/device/region/ap/014997f5-1f12-498b-8631-d2f22920e20a
- Subscribe Topic:
cattle.io/octopus/status/device/region/ap/014997f5-1f12-498b-8631-d2f22920e20a
- Publish Topic: