mqtt
Policy Information Point for subscribing to MQTT topics.
This Policy Information Point subscribes to MQTT topics and returns messages from MQTT brokers as a reactive stream of attribute values.
Attribute Invocation
The mqtt.messages attribute accepts up to three parameters:
| Policy syntax | Meaning |
|---|---|
topic.<mqtt.messages> |
Subscribe with QoS 0, using the default broker. |
topic.<mqtt.messages(qos)> |
Subscribe with the given QoS level, using the default broker. |
topic.<mqtt.messages(qos, "staging")> |
Subscribe with the given QoS, selecting the broker named "staging" from the brokerConfig array. |
topic.<mqtt.messages(qos, { ... })> |
Subscribe with the given QoS, using the inline object as broker configuration. |
The topic can be a single topic string or an array of topic strings.
Quality of Service Levels
MQTT QoS levels determine message delivery guarantees:
- QoS 0: At most once - fire and forget, no acknowledgment
- QoS 1: At least once - acknowledged delivery, possible duplicates
- QoS 2: Exactly once - assured delivery, no duplicates
Broker Configuration
Configure the PIP through the mqttPipConfig SAPL environment variable in pdp.json.
Top-level settings:
brokerConfig: A single broker configuration object, or an array of named broker configuration objects for multi-broker setupsdefaultBrokerConfigName: Thenameof the broker to use when no broker is specified in the policy (defaults to"default")defaultResponse: Response when no messages arrive before timeout –"undefined"or"error"(defaults to"undefined")defaultResponseTimeout: Timeout in milliseconds before emitting the default response (defaults to 1000)emitAtRetry: Emit value on reconnection –"true"or"false"(defaults to"false")
Each broker configuration object contains:
name: Broker identifier used for broker selection and secrets matching (see below)brokerAddress: Hostname or IP address of the MQTT brokerbrokerPort: Port number of the MQTT brokerclientId: Unique identifier for the MQTT client connection
Example pdp.json with two named brokers:
{
"variables": {
"mqttPipConfig": {
"defaultBrokerConfigName": "production",
"brokerConfig": [
{
"name": "production",
"brokerAddress": "mqtt.example.com",
"brokerPort": 1883,
"clientId": "sapl-prod"
},
{
"name": "staging",
"brokerAddress": "mqtt-staging.example.com",
"brokerPort": 1883,
"clientId": "sapl-staging"
}
]
}
}
}
Broker Selection
When the policy does not specify a broker (e.g. topic.<mqtt.messages>):
- The PDP reads
defaultBrokerConfigNamefrommqttPipConfig. If not set, the default name is"default". - The
brokerConfigarray is searched for a broker whosenamematches. - If
brokerConfigis a single object (not an array), it is used directly without name matching.
When the policy specifies a broker name (e.g. topic.<mqtt.messages(1, "staging")>):
- The
brokerConfigarray is searched for a broker whosenamematches"staging". - If no match is found, an error is returned.
When the policy provides an inline broker object
(e.g. topic.<mqtt.messages(1, { "brokerAddress": "...", ... })>):
- The inline object is used directly as the broker configuration.
Secrets Configuration
Broker credentials are sourced exclusively from the secrets section in pdp.json.
They are never read from broker configuration objects or policy parameters.
The broker name field is the join key between the broker configuration and the
secrets. For a broker with "name": "staging", the PDP looks up
secrets.mqtt.staging for that broker’s credentials.
Credential resolution order:
- Per-broker secrets: If the resolved broker config has a
namefield, look forsecrets.mqtt.<name>(e.g.secrets.mqtt.production). - Flat secrets: If no per-broker match, check whether
secrets.mqttdirectly containsusername/passwordfields. - Anonymous: If no secrets are found at all, connect with empty credentials.
Multi-broker secrets example:
{
"secrets": {
"mqtt": {
"production": { "username": "prod-user", "password": "prod-secret" },
"staging": { "username": "staging-user", "password": "staging-secret" }
}
}
}
Single-broker or flat secrets example (used when no per-broker key matches):
{
"secrets": {
"mqtt": { "username": "sapl-user", "password": "secure-password" }
}
}
Complete pdp.json Example
{
"variables": {
"mqttPipConfig": {
"defaultBrokerConfigName": "production",
"brokerConfig": [
{ "name": "production", "brokerAddress": "mqtt.example.com", "brokerPort": 1883, "clientId": "sapl-prod" },
{ "name": "staging", "brokerAddress": "mqtt-staging.example.com", "brokerPort": 1883, "clientId": "sapl-staging" }
]
}
},
"secrets": {
"mqtt": {
"production": { "username": "prod-user", "password": "prod-secret" },
"staging": { "username": "staging-user", "password": "staging-secret" }
}
}
}
With this configuration:
"sensors/#".<mqtt.messages>connects tomqtt.example.comasprod-user(default broker is"production")."sensors/#".<mqtt.messages(1, "staging")>connects tomqtt-staging.example.comasstaging-user."sensors/#".<mqtt.messages(1, { "brokerAddress": "other.host", "brokerPort": 1883, "clientId": "custom" })>connects toother.hostwith flat secrets fallback (or anonymous if no flat secrets).
Message Format
Received messages are automatically converted based on their MQTT payload format:
- Messages with content type
application/jsonare parsed as JSON values - UTF-8 encoded text messages are returned as text values
- Binary payloads are returned as arrays of byte values (as integers)
Topic Wildcards
The PIP supports MQTT topic wildcards for flexible subscriptions:
+- Single-level wildcard (matches one topic level)#- Multi-level wildcard (matches zero or more topic levels, must be last)
Examples:
sensors/+/temperaturematchessensors/room1/temperatureandsensors/room2/temperaturebuilding/#matchesbuilding/floor1/room1andbuilding/floor2/room3/sensor5
Reconnection Behavior
The PIP automatically handles broker reconnection in case of connection loss. When reconnection occurs, the PIP re-subscribes to all active topics and continues emitting messages.
messages
Subscribes to MQTT topics and emits messages as they arrive. Uses QoS level 0 (at most once) by default.
Accepts a single topic string or an array of topic strings. MQTT wildcards work in topic filters.
Example with single topic:
policy "single_temperature_sensor"
permit
"home/livingroom/temperature".<mqtt.messages>.celsius > 22.0;
Example with multiple topics:
policy "multiple_sensors"
permit
var topics = ["sensors/temperature", "sensors/humidity"];
topics.<mqtt.messages> != undefined;
Example with single-level wildcard:
policy "all_room_temperatures"
permit
"building/+/temperature".<mqtt.messages>.value > 25.0;
Example with multi-level wildcard:
policy "all_building_sensors"
permit
"building/#".<mqtt.messages>.alert == true;
messages
Subscribes to MQTT topics with custom broker configuration.
Reference a specific broker by name or provide an inline configuration. Supports multi-broker scenarios and per-subscription broker overrides.
The mqttPipConfig parameter accepts:
- A string referencing a broker configuration by name
- A broker configuration object with properties:
brokerAddress,brokerPort,clientId - An array of broker configuration objects for multi-broker subscriptions
Example referencing a broker by name:
policy "staging_environment_monitoring"
permit
var topics = ["sensors/data", "actuators/status"];
topics.<mqtt.messages(1, "staging")>.operational == true;
Example with inline broker configuration:
policy "custom_broker_connection"
permit
var brokerConfig = {
"brokerAddress": "mqtt.internal.example.com",
"brokerPort": 1883,
"clientId": "policy-specific-client"
};
"devices/status".<mqtt.messages(1, brokerConfig)>.online == true;
Example with multiple brokers:
policy "distributed_mqtt_network"
permit
var brokers = [
{
"name": "datacenter1",
"brokerAddress": "mqtt-dc1.example.com",
"brokerPort": 1883,
"clientId": "sapl-dc1"
},
{
"name": "datacenter2",
"brokerAddress": "mqtt-dc2.example.com",
"brokerPort": 1883,
"clientId": "sapl-dc2"
}
];
"sensors/#".<mqtt.messages(2, brokers)>.status == "OK";
messages
Subscribes to MQTT topics with a specified Quality of Service level.
QoS levels and their trade-offs:
- QoS 0: At most once - fastest but may lose messages
- QoS 1: At least once - acknowledged delivery, may receive duplicates
- QoS 2: Exactly once - slowest but guaranteed
Example with QoS 1 for reliable monitoring:
policy "critical_alarm_monitoring"
permit
"alarms/critical".<mqtt.messages(1)>.severity == "HIGH";
Example with QoS 2 for command processing:
policy "device_command_processing"
permit
var commandTopics = ["device/shutdown", "device/restart"];
commandTopics.<mqtt.messages(2)>.confirmed == true;
Example with QoS 0 for high-frequency sensor data:
policy "sensor_stream"
permit
"sensors/motion/#".<mqtt.messages(0)> != undefined;