mqtt
Policy Information Point for subscribing to MQTT topics.
This Policy Information Point subscribes to MQTT topics and returns messages from MQTT brokers as a reactive stream of attribute values.
Attribute Invocation
The mqtt.messages attribute accepts up to three parameters:
| Policy syntax | Meaning |
|---|---|
topic.<mqtt.messages> |
Subscribe with QoS 0, using the default broker. |
topic.<mqtt.messages(qos)> |
Subscribe with the given QoS level, using the default broker. |
topic.<mqtt.messages(qos, "staging")> |
Subscribe with the given QoS, selecting the broker named "staging" from the brokerConfig array. |
A policy selects a broker only by name or uses the default. A policy may not supply an inline broker configuration object.
The topic can be a single topic string or an array of topic strings.
Quality of Service Levels
MQTT QoS levels determine message delivery guarantees:
- QoS 0: At most once - fire and forget, no acknowledgment
- QoS 1: At least once - acknowledged delivery, possible duplicates
- QoS 2: Exactly once - assured delivery, no duplicates
Broker Configuration
Configure the PIP through the variables.mqtt PDP variable in pdp.json.
Top-level variables.mqtt settings:
brokerConfig: A single broker configuration object, or an array of named broker configuration objects for multi-broker setupsallowInsecureTransport: Allow broker credentials over plaintext MQTT when set totrue(defaults tofalse). Prefertls: truefor credentialed broker connections.defaultBrokerConfigName: Thenameof the broker to use when no broker is specified in the policy (defaults to"default")defaultResponse: Response when no messages arrive before timeout –"undefined"or"error"(defaults to"undefined")defaultResponseTimeout: Timeout in milliseconds before emitting the default response (defaults to 2000)emitAtRetry: Emit value on reconnection –"true"or"false"(defaults to"false")maxPayloadSize: Maximum incoming message payload in bytes (defaults to 1048576). A larger message fails closed to an error value rather than being decoded.maxTopicFilters: Maximum number of topic filters accepted per subscription (defaults to 32).maxTopicFilterBytes: Maximum total UTF-8 bytes across all topic filters accepted per subscription (defaults to 8192).
Each broker configuration object contains:
name: Broker identifier used for broker selection and secrets matching (see below)brokerAddress: Hostname or IP address of the MQTT brokerbrokerPort: Port number of the MQTT brokerclientId: Unique identifier for the MQTT client connectionallowInsecureTransport: Broker-level override for plaintext credential transport
Example pdp.json with two named brokers:
{
"variables": {
"mqtt": {
"defaultBrokerConfigName": "production",
"maxTopicFilters": 32,
"maxTopicFilterBytes": 8192,
"brokerConfig": [
{
"name": "production",
"brokerAddress": "mqtt.example.com",
"brokerPort": 1883,
"clientId": "sapl-prod"
},
{
"name": "staging",
"brokerAddress": "mqtt-staging.example.com",
"brokerPort": 1883,
"clientId": "sapl-staging"
}
]
}
}
}
Broker Selection
When the policy does not specify a broker (e.g. topic.<mqtt.messages>):
- The PDP reads
defaultBrokerConfigNamefromvariables.mqtt. If not set, the default name is"default". - The
brokerConfigarray is searched for a broker whosenamematches. - If
brokerConfigis a single object (not an array), it is used directly without name matching.
When the policy specifies a broker name (e.g. topic.<mqtt.messages(1, "staging")>):
- The
brokerConfigarray is searched for a broker whosenamematches"staging". - If no match is found, an error is returned.
A policy may only select a broker by name or use the default. Supplying an inline broker configuration object from a policy is rejected with an error value.
Secrets Configuration
Broker credentials are sourced exclusively from the secrets section in pdp.json.
They are never read from broker configuration objects or policy parameters.
When credentials are configured, broker connections require tls: true by default.
Plaintext credential transport is rejected unless variables.mqtt.allowInsecureTransport
or the selected broker’s allowInsecureTransport is explicitly set to true.
The broker name field is the join key between the broker configuration and the
secrets. For a broker with "name": "staging", the PDP looks up
secrets.mqtt.staging for that broker’s credentials.
Credential resolution order:
- Per-broker secrets: If the resolved broker config has a
namefield, look forsecrets.mqtt.<name>(e.g.secrets.mqtt.production). - Flat secrets: If no per-broker match, check whether
secrets.mqttdirectly containsusername/passwordfields. - Anonymous: If no secrets are found at all, connect with empty credentials.
Multi-broker secrets example:
{
"secrets": {
"mqtt": {
"production": { "username": "prod-user", "password": "prod-secret" },
"staging": { "username": "staging-user", "password": "staging-secret" }
}
}
}
Single-broker or flat secrets example (used when no per-broker key matches):
{
"secrets": {
"mqtt": { "username": "sapl-user", "password": "secure-password" }
}
}
Complete pdp.json Example
{
"variables": {
"mqtt": {
"defaultBrokerConfigName": "production",
"maxTopicFilters": 32,
"maxTopicFilterBytes": 8192,
"brokerConfig": [
{ "name": "production", "brokerAddress": "mqtt.example.com", "brokerPort": 1883, "clientId": "sapl-prod" },
{ "name": "staging", "brokerAddress": "mqtt-staging.example.com", "brokerPort": 1883, "clientId": "sapl-staging" }
]
}
},
"secrets": {
"mqtt": {
"production": { "username": "prod-user", "password": "prod-secret" },
"staging": { "username": "staging-user", "password": "staging-secret" }
}
}
}
With this configuration:
"sensors/#".<mqtt.messages>connects tomqtt.example.comasprod-user(default broker is"production")."sensors/#".<mqtt.messages(1, "staging")>connects tomqtt-staging.example.comasstaging-user.
Message Format
Received messages are automatically converted based on their MQTT payload format:
- Messages with content type
application/jsonare parsed as JSON values - UTF-8 encoded text messages are returned as text values
- Binary payloads are rejected with an error value; publish text, JSON, or an explicit encoded string
Topic Wildcards
The PIP supports MQTT topic wildcards for flexible subscriptions:
+- Single-level wildcard (matches one topic level)#- Multi-level wildcard (matches zero or more topic levels, must be last)
Examples:
sensors/+/temperaturematchessensors/room1/temperatureandsensors/room2/temperaturebuilding/#matchesbuilding/floor1/room1andbuilding/floor2/room3/sensor5
Reconnection Behavior
The PIP automatically handles broker reconnection in case of connection loss. When reconnection occurs, the PIP re-subscribes to all active topics and continues emitting messages.
messages
Subscribes to MQTT topics and emits messages as they arrive. Uses QoS level 0 (at most once) by default.
Accepts a single topic string or an array of topic strings. MQTT wildcards work in topic filters.
Topic arrays are bounded by variables.mqtt.maxTopicFilters. The total
UTF-8 bytes across all topic filters are bounded by
variables.mqtt.maxTopicFilterBytes. If either limit is exceeded, the PIP
returns an error value before opening a broker subscription.
Example with single topic:
policy "single_temperature_sensor"
permit
"home/livingroom/temperature".<mqtt.messages>.celsius > 22.0;
Example with multiple topics:
policy "multiple_sensors"
permit
var topics = ["sensors/temperature", "sensors/humidity"];
topics.<mqtt.messages> != undefined;
Example with single-level wildcard:
policy "all_room_temperatures"
permit
"building/+/temperature".<mqtt.messages>.value > 25.0;
Example with multi-level wildcard:
policy "all_building_sensors"
permit
"building/#".<mqtt.messages>.alert == true;
messages
Subscribes to MQTT topics on an operator-configured broker selected by name.
The brokerName parameter is the name of a broker from the operator-configured
brokerConfig. A policy may only select a broker by name or use the default. A
policy may not supply an inline broker configuration object; doing so yields an
error value.
Accepts a single topic string or an array of topic strings. Topic arrays are
bounded by variables.mqtt.maxTopicFilters. The total UTF-8 bytes across all
topic filters are bounded by variables.mqtt.maxTopicFilterBytes. If either
limit is exceeded, the PIP returns an error value before opening a broker
subscription.
Example referencing a broker by name:
policy "staging_environment_monitoring"
permit
var topics = ["sensors/data", "actuators/status"];
topics.<mqtt.messages(1, "staging")>.operational == true;
messages
Subscribes to MQTT topics with a specified Quality of Service level.
QoS levels and their trade-offs:
- QoS 0: At most once - fastest but may lose messages
- QoS 1: At least once - acknowledged delivery, may receive duplicates
- QoS 2: Exactly once - slowest but guaranteed
Accepts a single topic string or an array of topic strings. Topic arrays are
bounded by variables.mqtt.maxTopicFilters. The total UTF-8 bytes across all
topic filters are bounded by variables.mqtt.maxTopicFilterBytes. If either
limit is exceeded, the PIP returns an error value before opening a broker
subscription.
Example with QoS 1 for reliable monitoring:
policy "critical_alarm_monitoring"
permit
"alarms/critical".<mqtt.messages(1)>.severity == "HIGH";
Example with QoS 2 for command processing:
policy "device_command_processing"
permit
var commandTopics = ["device/shutdown", "device/restart"];
commandTopics.<mqtt.messages(2)>.confirmed == true;
Example with QoS 0 for high-frequency sensor data:
policy "sensor_stream"
permit
"sensors/motion/#".<mqtt.messages(0)> != undefined;