Skip to main content

ConsumeMQTT

Description

Subscribes to a topic and receives messages from an MQTT broker

Tags

IOT, MQTT, consume, listen, subscribe

Properties

In the list below required Properties are shown with an asterisk (*). Other properties are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

Display NameAPI NameDefault ValueAllowable ValuesDescription
Broker URI *Broker URIThe URI(s) to use to connect to the MQTT broker (e.g., tcp://localhost:1883). The 'tcp', 'ssl', 'ws' and 'wss' schemes are supported. In order to use 'ssl', the SSL Context Service property must be set. When a comma-separated URI list is set (e.g., tcp://localhost:1883,tcp://localhost:1884), the processor will use a round-robin algorithm to connect to the brokers on connection failure.

Supports Expression Language, using Environment variables.
MQTT Specification Version *MQTT Specification Versionv3 AUTO
  • v3 AUTO
  • v5.0
  • v3.1.1
  • v3.1.0
The MQTT specification version when connecting with the broker. See the allowable value descriptions for more details.
UsernameUsernameUsername to use when connecting to the broker

Supports Expression Language, using Environment variables.
PasswordPasswordPassword to use when connecting to the broker
SSL Context ServiceSSL Context ServiceController Service:
SSLContextService

Implementations:
StandardRestrictedSSLContextService
StandardSSLContextService
The SSL Context Service used to provide client certificate information for TLS/SSL connections.
Session state *Session stateClean Session
  • Clean Session
  • Resume Session
Whether to start a fresh or resume previous flows. See the allowable value descriptions for more details.
Session Expiry IntervalSession Expiry Interval24 hrsAfter this interval the broker will expire the client and clear the session state.

This property is only considered if:
  • the property MQTT Specification Version has a value of 5
  • the property Session state has a value of false
Client IDClient IDMQTT client ID to use. If not set, a UUID will be generated.

Supports Expression Language, using Environment variables.
Group IDGroup IDMQTT consumer group ID to use. If group ID not set, client will connect as individual consumer.
Topic Filter *Topic FilterThe MQTT topic filter to designate the topics to subscribe to.

Supports Expression Language, using Environment variables.
Quality of Service (QoS) *Quality of Service(QoS)0 - At most once
  • 0 - At most once
  • 1 - At least once
  • 2 - Exactly once
The Quality of Service (QoS) to receive the message with. Accepts values '0', '1' or '2'; '0' for 'at most once', '1' for 'at least once', '2' for 'exactly once'.
Record Readerrecord-readerController Service:
RecordReaderFactory

Implementations:
AvroReader
CEFReader
CSVReader
ExcelReader
GrokReader
JsonPathReader
JsonTreeReader
ReaderLookup
ScriptedReader
Syslog5424Reader
SyslogReader
WindowsEventLogReader
XMLReader
YamlTreeReader
The Record Reader to use for parsing received MQTT Messages into Records.
Record Writerrecord-writerController Service:
RecordSetWriterFactory

Implementations:
AvroRecordSetWriter
CSVRecordSetWriter
FreeFormTextRecordSetWriter
JsonRecordSetWriter
RecordSetWriterLookup
ScriptedRecordSetWriter
XMLRecordSetWriter
The Record Writer to use for serializing Records before writing them to a FlowFile.
Add attributes as fields *add-attributes-as-fieldstrue
  • true
  • false
If setting this property to true, default fields are going to be added in each record: _topic, _qos, _isDuplicate, _isRetained.

This property is only considered if:
  • the property Record Reader has a value specified
Message Demarcatormessage-demarcatorWith this property, you have an option to output FlowFiles which contains multiple messages. This property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart multiple messages. This is an optional property ; if not provided, and if not defining a Record Reader/Writer, each message received will result in a single FlowFile. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS.

Supports Expression Language, using Environment variables.
Connection Timeout (seconds)Connection Timeout (seconds)30Maximum time interval the client will wait for the network connection to the MQTT server to be established. The default timeout is 30 seconds. A value of 0 disables timeout processing meaning the client will wait until the network connection is made successfully or fails.
Keep Alive Interval (seconds)Keep Alive Interval (seconds)60Defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout. The client will ensure that at least one message travels across the network within each keep alive period. In the absence of a data-related message during the time period, the client sends a very small "ping" message, which the server will acknowledge. A value of 0 disables keepalive processing in the client.
Last Will MessageLast Will MessageThe message to send as the client's Last Will.
Last Will Topic *Last Will TopicThe topic to send the client's Last Will to.

This property is only considered if:
  • the property Last Will Message has a value specified
Last Will Retain *Last Will Retainfalse
  • true
  • false
Whether to retain the client's Last Will.

This property is only considered if:
  • the property Last Will Message has a value specified
Last Will QoS Level *Last Will QoS Level0 - At most once
  • 0 - At most once
  • 1 - At least once
  • 2 - Exactly once
QoS level to be used when publishing the Last Will Message.

This property is only considered if:
  • the property Last Will Message has a value specified
Max Queue Size *Max Queue SizeThe MQTT messages are always being sent to subscribers on a topic regardless of how frequently the processor is scheduled to run. If the 'Run Schedule' is significantly behind the rate at which the messages are arriving to this processor, then a back up can occur in the internal queue of this processor. This property specifies the maximum number of messages this processor will hold in memory at one time in the internal queue. This data would be lost in case of a NiFi restart.

Dynamic Properties

This component does not support dynamic properties.

Relationships

NameDescription
MessageThe MQTT message output
parse.failureIf a message cannot be parsed using the configured Record Reader, the contents of the message will be routed to this Relationship as its own individual FlowFile.

Reads Attributes

This processor does not read attributes.

Writes Attributes

NameDescription
mqtt.brokerMQTT broker that was the message source
mqtt.isDuplicateWhether or not this message might be a duplicate of one which has already been received.
mqtt.isRetainedWhether or not this message was from a current publisher, or was "retained" by the server as the last message published on the topic.
mqtt.qosThe quality of service for this message.
mqtt.topicMQTT topic on which message was received
record.countThe number of records received

State Management

This component does not store state.

Restricted

This component is not restricted.

Input Requirement

This component does not allow an incoming relationship.

System Resource Considerations

ScopeDescription
MEMORYThe 'Max Queue Size' specifies the maximum number of messages that can be hold in memory by NiFi by a single instance of this processor. A high value for this property could represent a lot of data being stored in memory.

See Also

PublishMQTT