mqtt
Policy Information Point for subscribing to MQTT topics.
This Policy Information Point subscribes to MQTT topics and returns messages from MQTT brokers as a reactive stream of attribute values.
Subscribe to single or multiple topics with configurable Quality of Service levels and broker configurations.
Quality of Service Levels
MQTT QoS levels determine message delivery guarantees:
- QoS 0: At most once - fire and forget, no acknowledgment
- QoS 1: At least once - acknowledged delivery, possible duplicates
- QoS 2: Exactly once - assured delivery, no duplicates
Configuration
Configure the PIP through the SAPL environment variables. The mqttPipConfig
variable contains:
brokerConfig: Single object or array of broker configuration objectsdefaultBrokerConfigName: Default broker configuration name (optional)defaultResponse: Default response when no messages arrive - “undefined” or “error” (defaults to “undefined”)defaultResponseTimeout: Timeout in milliseconds before emitting default response (defaults to 1000ms)emitAtRetry: Emit value on reconnection - “true” or “false” (defaults to “false”)
Each broker configuration object contains:
name: Broker configuration identifier (optional)brokerAddress: Hostname or IP address of the MQTT brokerbrokerPort: Port number of the MQTT brokerclientId: Unique identifier for the MQTT client connectionusername: Username for broker authentication (optional, defaults to empty string)password: Password for broker authentication (optional, defaults to empty string)
Configuration example without authentication:
{
"defaultBrokerConfigName": "production",
"defaultResponse": "undefined",
"defaultResponseTimeout": 5000,
"emitAtRetry": "false",
"brokerConfig": [
{
"name": "production",
"brokerAddress": "mqtt.example.com",
"brokerPort": 1883,
"clientId": "sapl-client-prod"
},
{
"name": "staging",
"brokerAddress": "mqtt-staging.example.com",
"brokerPort": 1883,
"clientId": "sapl-client-staging"
}
]
}
Configuration example with authentication:
{
"defaultBrokerConfigName": "production",
"brokerConfig": {
"name": "production",
"brokerAddress": "mqtt.example.com",
"brokerPort": 1883,
"clientId": "sapl-client-prod",
"username": "sapl-user",
"password": "secure-password"
}
}
Message Format
Received messages are automatically converted based on their MQTT payload format:
- Messages with content type
application/jsonare parsed as JSON values - UTF-8 encoded text messages are returned as text values
- Binary payloads are returned as arrays of byte values (as integers)
Topic Wildcards
The PIP supports MQTT topic wildcards for flexible subscriptions:
+- Single-level wildcard (matches one topic level)#- Multi-level wildcard (matches zero or more topic levels, must be last)
Examples:
sensors/+/temperaturematchessensors/room1/temperatureandsensors/room2/temperaturebuilding/#matchesbuilding/floor1/room1andbuilding/floor2/room3/sensor5
Example Policy
policy "temperature_monitoring"
permit
action == "monitor";
var sensors = ["sensors/room1/temp", "sensors/room2/temp"];
sensors.<mqtt.messages>.celsius < 30.0;
Reconnection Behavior
The PIP automatically handles broker reconnection in case of connection loss. When reconnection occurs, the PIP re-subscribes to all active topics and continues emitting messages.
messages
Subscribes to MQTT topics and emits messages as they arrive. Uses QoS level 0 (at most once) by default.
Accepts a single topic string or an array of topic strings. MQTT wildcards work in topic filters.
Example with single topic:
policy "single_temperature_sensor"
permit
"home/livingroom/temperature".<mqtt.messages>.celsius > 22.0;
Example with multiple topics:
policy "multiple_sensors"
permit
var topics = ["sensors/temperature", "sensors/humidity"];
topics.<mqtt.messages> != undefined;
Example with single-level wildcard:
policy "all_room_temperatures"
permit
"building/+/temperature".<mqtt.messages>.value > 25.0;
Example with multi-level wildcard:
policy "all_building_sensors"
permit
"building/#".<mqtt.messages>.alert == true;
messages
Subscribes to MQTT topics with custom broker configuration.
Reference a specific broker by name or provide an inline configuration. Supports multi-broker scenarios and per-subscription broker overrides.
The mqttPipConfig parameter accepts:
- A string referencing a broker configuration by name
- A broker configuration object with properties:
brokerAddress,brokerPort,clientId, optionalusername, optionalpassword - An array of broker configuration objects for multi-broker subscriptions
Example referencing a broker by name:
policy "staging_environment_monitoring"
permit
var topics = ["sensors/data", "actuators/status"];
topics.<mqtt.messages(1, "staging")>.operational == true;
Example with inline broker configuration:
policy "custom_broker_connection"
permit
var brokerConfig = {
"brokerAddress": "mqtt.internal.example.com",
"brokerPort": 1883,
"clientId": "policy-specific-client",
"username": "device-monitor",
"password": "secure-token"
};
"devices/status".<mqtt.messages(1, brokerConfig)>.online == true;
Example with multiple brokers:
policy "distributed_mqtt_network"
permit
var brokers = [
{
"name": "datacenter1",
"brokerAddress": "mqtt-dc1.example.com",
"brokerPort": 1883,
"clientId": "sapl-dc1"
},
{
"name": "datacenter2",
"brokerAddress": "mqtt-dc2.example.com",
"brokerPort": 1883,
"clientId": "sapl-dc2"
}
];
"sensors/#".<mqtt.messages(2, brokers)>.status == "OK";
messages
Subscribes to MQTT topics with a specified Quality of Service level.
QoS levels and their trade-offs:
- QoS 0: At most once - fastest but may lose messages
- QoS 1: At least once - acknowledged delivery, may receive duplicates
- QoS 2: Exactly once - slowest but guaranteed
Example with QoS 1 for reliable monitoring:
policy "critical_alarm_monitoring"
permit
"alarms/critical".<mqtt.messages(1)>.severity == "HIGH";
Example with QoS 2 for command processing:
policy "device_command_processing"
permit
var commandTopics = ["device/shutdown", "device/restart"];
commandTopics.<mqtt.messages(2)>.confirmed == true;
Example with QoS 0 for high-frequency sensor data:
policy "sensor_stream"
permit
"sensors/motion/#".<mqtt.messages(0)> != undefined;