mqtt
Policy Information Point for subscribing to MQTT topics.
This Policy Information Point subscribes to MQTT topics and returns messages from MQTT brokers as a reactive stream of attribute values.
Attribute Invocation
The mqtt.messages attribute accepts up to three parameters:
| Policy syntax | Meaning |
|---|---|
topic.<mqtt.messages> |
Subscribe with QoS 0, using the default broker. |
topic.<mqtt.messages(qos)> |
Subscribe with the given QoS level, using the default broker. |
topic.<mqtt.messages(qos, "staging")> |
Subscribe with the given QoS, selecting the broker named "staging" from the brokerConfig array. |
A policy selects a broker only by name or uses the default. A policy may not supply an inline broker configuration object.
The topic can be a single topic string or an array of topic strings.
Quality of Service Levels
MQTT QoS levels determine message delivery guarantees:
- QoS 0: At most once - fire and forget, no acknowledgment
- QoS 1: At least once - acknowledged delivery, possible duplicates
- QoS 2: Exactly once - assured delivery, no duplicates
Broker Configuration
Configure the PIP through the mqttPipConfig SAPL environment variable in pdp.json.
Top-level settings:
brokerConfig: A single broker configuration object, or an array of named broker configuration objects for multi-broker setupsdefaultBrokerConfigName: Thenameof the broker to use when no broker is specified in the policy (defaults to"default")defaultResponse: Response when no messages arrive before timeout –"undefined"or"error"(defaults to"undefined")defaultResponseTimeout: Timeout in milliseconds before emitting the default response (defaults to 2000)emitAtRetry: Emit value on reconnection –"true"or"false"(defaults to"false")maxPayloadSize: Maximum incoming message payload in bytes (defaults to 1048576). A larger message fails closed to an error value rather than being decoded.
Each broker configuration object contains:
name: Broker identifier used for broker selection and secrets matching (see below)brokerAddress: Hostname or IP address of the MQTT brokerbrokerPort: Port number of the MQTT brokerclientId: Unique identifier for the MQTT client connection
Example pdp.json with two named brokers:
{
"variables": {
"mqttPipConfig": {
"defaultBrokerConfigName": "production",
"brokerConfig": [
{
"name": "production",
"brokerAddress": "mqtt.example.com",
"brokerPort": 1883,
"clientId": "sapl-prod"
},
{
"name": "staging",
"brokerAddress": "mqtt-staging.example.com",
"brokerPort": 1883,
"clientId": "sapl-staging"
}
]
}
}
}
Broker Selection
When the policy does not specify a broker (e.g. topic.<mqtt.messages>):
- The PDP reads
defaultBrokerConfigNamefrommqttPipConfig. If not set, the default name is"default". - The
brokerConfigarray is searched for a broker whosenamematches. - If
brokerConfigis a single object (not an array), it is used directly without name matching.
When the policy specifies a broker name (e.g. topic.<mqtt.messages(1, "staging")>):
- The
brokerConfigarray is searched for a broker whosenamematches"staging". - If no match is found, an error is returned.
A policy may only select a broker by name or use the default. Supplying an inline broker configuration object from a policy is rejected with an error value.
Secrets Configuration
Broker credentials are sourced exclusively from the secrets section in pdp.json.
They are never read from broker configuration objects or policy parameters.
The broker name field is the join key between the broker configuration and the
secrets. For a broker with "name": "staging", the PDP looks up
secrets.mqtt.staging for that broker’s credentials.
Credential resolution order:
- Per-broker secrets: If the resolved broker config has a
namefield, look forsecrets.mqtt.<name>(e.g.secrets.mqtt.production). - Flat secrets: If no per-broker match, check whether
secrets.mqttdirectly containsusername/passwordfields. - Anonymous: If no secrets are found at all, connect with empty credentials.
Multi-broker secrets example:
{
"secrets": {
"mqtt": {
"production": { "username": "prod-user", "password": "prod-secret" },
"staging": { "username": "staging-user", "password": "staging-secret" }
}
}
}
Single-broker or flat secrets example (used when no per-broker key matches):
{
"secrets": {
"mqtt": { "username": "sapl-user", "password": "secure-password" }
}
}
Complete pdp.json Example
{
"variables": {
"mqttPipConfig": {
"defaultBrokerConfigName": "production",
"brokerConfig": [
{ "name": "production", "brokerAddress": "mqtt.example.com", "brokerPort": 1883, "clientId": "sapl-prod" },
{ "name": "staging", "brokerAddress": "mqtt-staging.example.com", "brokerPort": 1883, "clientId": "sapl-staging" }
]
}
},
"secrets": {
"mqtt": {
"production": { "username": "prod-user", "password": "prod-secret" },
"staging": { "username": "staging-user", "password": "staging-secret" }
}
}
}
With this configuration:
"sensors/#".<mqtt.messages>connects tomqtt.example.comasprod-user(default broker is"production")."sensors/#".<mqtt.messages(1, "staging")>connects tomqtt-staging.example.comasstaging-user.
Message Format
Received messages are automatically converted based on their MQTT payload format:
- Messages with content type
application/jsonare parsed as JSON values - UTF-8 encoded text messages are returned as text values
- Binary payloads are returned as arrays of byte values (as integers)
Topic Wildcards
The PIP supports MQTT topic wildcards for flexible subscriptions:
+- Single-level wildcard (matches one topic level)#- Multi-level wildcard (matches zero or more topic levels, must be last)
Examples:
sensors/+/temperaturematchessensors/room1/temperatureandsensors/room2/temperaturebuilding/#matchesbuilding/floor1/room1andbuilding/floor2/room3/sensor5
Reconnection Behavior
The PIP automatically handles broker reconnection in case of connection loss. When reconnection occurs, the PIP re-subscribes to all active topics and continues emitting messages.
messages
Subscribes to MQTT topics and emits messages as they arrive. Uses QoS level 0 (at most once) by default.
Accepts a single topic string or an array of topic strings. MQTT wildcards work in topic filters.
Example with single topic:
policy "single_temperature_sensor"
permit
"home/livingroom/temperature".<mqtt.messages>.celsius > 22.0;
Example with multiple topics:
policy "multiple_sensors"
permit
var topics = ["sensors/temperature", "sensors/humidity"];
topics.<mqtt.messages> != undefined;
Example with single-level wildcard:
policy "all_room_temperatures"
permit
"building/+/temperature".<mqtt.messages>.value > 25.0;
Example with multi-level wildcard:
policy "all_building_sensors"
permit
"building/#".<mqtt.messages>.alert == true;
messages
Subscribes to MQTT topics on an operator-configured broker selected by name.
The mqttPipConfig parameter is the name of a broker from the operator-configured
brokerConfig. A policy may only select a broker by name or use the default. A
policy may not supply an inline broker configuration object; doing so yields an
error value.
Example referencing a broker by name:
policy "staging_environment_monitoring"
permit
var topics = ["sensors/data", "actuators/status"];
topics.<mqtt.messages(1, "staging")>.operational == true;
messages
Subscribes to MQTT topics with a specified Quality of Service level.
QoS levels and their trade-offs:
- QoS 0: At most once - fastest but may lose messages
- QoS 1: At least once - acknowledged delivery, may receive duplicates
- QoS 2: Exactly once - slowest but guaranteed
Example with QoS 1 for reliable monitoring:
policy "critical_alarm_monitoring"
permit
"alarms/critical".<mqtt.messages(1)>.severity == "HIGH";
Example with QoS 2 for command processing:
policy "device_command_processing"
permit
var commandTopics = ["device/shutdown", "device/restart"];
commandTopics.<mqtt.messages(2)>.confirmed == true;
Example with QoS 0 for high-frequency sensor data:
policy "sensor_stream"
permit
"sensors/motion/#".<mqtt.messages(0)> != undefined;