mqtt
Policy Information Point for subscribing to MQTT topics.
This Policy Information Point subscribes to MQTT topics and returns messages from MQTT brokers as a reactive stream of attribute values.
Subscribe to single or multiple topics with configurable Quality of Service levels and broker configurations.
Quality of Service Levels
MQTT QoS levels determine message delivery guarantees:
- QoS 0: At most once - fire and forget, no acknowledgment
- QoS 1: At least once - acknowledged delivery, possible duplicates
- QoS 2: Exactly once - assured delivery, no duplicates
Configuration
Configure the PIP through the SAPL environment variables. The mqttPipConfig
variable contains:
brokerConfig: Single object or array of broker configuration objectsdefaultBrokerConfigName: Default broker configuration name (optional)defaultResponse: Default response when no messages arrive - “undefined” or “error” (defaults to “undefined”)defaultResponseTimeout: Timeout in milliseconds before emitting default response (defaults to 1000ms)emitAtRetry: Emit value on reconnection - “true” or “false” (defaults to “false”)
Each broker configuration object contains:
name: Broker configuration identifier (optional)brokerAddress: Hostname or IP address of the MQTT brokerbrokerPort: Port number of the MQTT brokerclientId: Unique identifier for the MQTT client connectionusername: Username for broker authentication (optional, defaults to empty string)password: Password for broker authentication (optional, defaults to empty string)
Configuration example without authentication:
{
"defaultBrokerConfigName": "production",
"defaultResponse": "undefined",
"defaultResponseTimeout": 5000,
"emitAtRetry": "false",
"brokerConfig": [
{
"name": "production",
"brokerAddress": "mqtt.example.com",
"brokerPort": 1883,
"clientId": "sapl-client-prod"
},
{
"name": "staging",
"brokerAddress": "mqtt-staging.example.com",
"brokerPort": 1883,
"clientId": "sapl-client-staging"
}
]
}
Configuration example with authentication:
{
"defaultBrokerConfigName": "production",
"brokerConfig": {
"name": "production",
"brokerAddress": "mqtt.example.com",
"brokerPort": 1883,
"clientId": "sapl-client-prod",
"username": "sapl-user",
"password": "secure-password"
}
}
Message Format
Received messages are automatically converted based on their MQTT payload format:
- Messages with content type
application/jsonare parsed as JSON values - UTF-8 encoded text messages are returned as text values
- Binary payloads are returned as arrays of byte values (as integers)
Topic Wildcards
The PIP supports MQTT topic wildcards for flexible subscriptions:
+- Single-level wildcard (matches one topic level)#- Multi-level wildcard (matches zero or more topic levels, must be last)
Examples:
sensors/+/temperaturematchessensors/room1/temperatureandsensors/room2/temperaturebuilding/#matchesbuilding/floor1/room1andbuilding/floor2/room3/sensor5
Example Policy
policy "temperature_monitoring"
permit
action == "monitor"
where
var sensors = ["sensors/room1/temp", "sensors/room2/temp"];
sensors.<mqtt.messages>.celsius < 30.0;
Reconnection Behavior
The PIP automatically handles broker reconnection in case of connection loss. When reconnection occurs, the PIP re-subscribes to all active topics and continues emitting messages.
messages
Input Entity of Attribute Finder
| Name: topic [TEXT | ARRAY] |
Parameters of Attribute Finder
Name: qos [INTEGER] Subscribes to MQTT topics with a specified Quality of Service level.
QoS levels and their trade-offs:
- QoS 0: At most once - fastest but may lose messages
- QoS 1: At least once - acknowledged delivery, may receive duplicates
- QoS 2: Exactly once - slowest but guaranteed
Example with QoS 1 for reliable monitoring:
policy "critical_alarm_monitoring"
permit
where
"alarms/critical".<mqtt.messages(1)>.severity == "HIGH";
Example with QoS 2 for command processing:
policy "device_command_processing"
permit
where
var commandTopics = ["device/shutdown", "device/restart"];
commandTopics.<mqtt.messages(2)>.confirmed == true;
Example with QoS 0 for high-frequency sensor data:
policy "sensor_stream"
permit
where
"sensors/motion/#".<mqtt.messages(0)> != undefined;