ConsumeAMQP
Description
Consumes AMQP Messages from an AMQP Broker using the AMQP 0.9.1 protocol. Each message that is received from the AMQP Broker will be emitted as its own FlowFile to the 'success' relationship.
Tags
amqp, consume, get, message, rabbit, receive
Properties
In the list below required Properties are shown with an asterisk (*). Other properties are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
Display Name | API Name | Default Value | Allowable Values | Description |
---|---|---|---|---|
Queue * | Queue | The name of the existing AMQP Queue from which messages will be consumed. Usually pre-defined by AMQP administrator. | ||
Auto-Acknowledge Messages * | auto.acknowledge | false |
| If false (Non-Auto-Acknowledge), the messages will be acknowledged by the processor after transferring the FlowFiles to success and committing the NiFi session. Non-Auto-Acknowledge mode provides 'at-least-once' delivery semantics. If true (Auto-Acknowledge), messages that are delivered to the AMQP Client will be auto-acknowledged by the AMQP Broker just after sending them out. This generally will provide better throughput but will also result in messages being lost upon restart/crash of the AMQP Broker, NiFi or the processor. Auto-Acknowledge mode provides 'at-most-once' delivery semantics and it is recommended only if loosing messages is acceptable. |
Batch Size * | batch.size | 10 | The maximum number of messages that should be processed in a single session. Once this many messages have been received (or once no more messages are readily available), the messages received will be transferred to the 'success' relationship and the messages will be acknowledged to the AMQP Broker. Setting this value to a larger number could result in better performance, particularly for very small messages, but can also result in more messages being duplicated upon sudden restart of NiFi. | |
Prefetch Count * | prefetch.count | 0 | The maximum number of unacknowledged messages for the consumer. If consumer has this number of unacknowledged messages, AMQP broker will no longer send new messages until consumer acknowledges some of the messages already delivered to it.Allowed values: from 0 to 65535. 0 means no limit | |
Header Output Format * | header.format | Comma-Separated String |
| Defines how to output headers from the received message |
Header Key Prefix * | header.key.prefix | consume.amqp | Text to be prefixed to header keys as the are added to the FlowFile attributes. Processor will append '.' to the value of this property This property is only considered if:
| |
Header Separator | header.separator | , | The character that is used to separate key-value for header in String. The value must be only one character. This property is only considered if:
| |
Remove Curly Braces | remove.curly.braces | False |
| If true Remove Curly Braces, Curly Braces in the header will be automatically remove. This property is only considered if:
|
Brokers | Brokers | A comma-separated list of known AMQP Brokers in the format <host>:<port> (e.g., localhost:5672). If this is set, Host Name and Port are ignored. Only include hosts from the same AMQP cluster. Supports Expression Language, using Environment variables. | ||
Host Name | Host Name | localhost | Network address of AMQP broker (e.g., localhost). If Brokers is set, then this property is ignored. Supports Expression Language, using Environment variables. | |
Port | Port | 5672 | Numeric value identifying Port of AMQP broker (e.g., 5671). If Brokers is set, then this property is ignored. Supports Expression Language, using Environment variables. | |
Virtual Host | Virtual Host | Virtual Host name which segregates AMQP system for enhanced security. Supports Expression Language, using Environment variables. | ||
User Name | User Name | User Name used for authentication and authorization. Supports Expression Language, using Environment variables. | ||
Password | Password | Password used for authentication and authorization. | ||
AMQP Version * | AMQP Version | 0.9.1 |
| AMQP Version. Currently only supports AMQP v0.9.1. |
SSL Context Service | ssl-context-service | Controller Service: SSLContextService Implementations: StandardRestrictedSSLContextService StandardSSLContextService | The SSL Context Service used to provide client certificate information for TLS/SSL connections. | |
Use Client Certificate Authentication | cert-authentication | false |
| Authenticate using the SSL certificate rather than user name/password. |
Dynamic Properties
This component does not support dynamic properties.
Relationships
Name | Description |
---|---|
success | All FlowFiles that are received from the AMQP queue are routed to this relationship |
Reads Attributes
This processor does not read attributes.
Writes Attributes
Name | Description |
---|---|
<Header Key Prefix>.<attribute> | Each message header will be inserted with this attribute name, if processor is configured to output headers as attribute |
amqp$appId | The App ID field from the AMQP Message |
amqp$clusterId | The ID of the AMQP Cluster |
amqp$contentEncoding | The Content Encoding reported by the AMQP Message |
amqp$contentType | The Content Type reported by the AMQP Message |
amqp$correlationId | The Message's Correlation ID |
amqp$deliveryMode | The numeric indicator for the Message's Delivery Mode |
amqp$exchange | The exchange from which AMQP Message was received |
amqp$expiration | The Message Expiration |
amqp$headers | The headers present on the AMQP Message. Added only if processor is configured to output this attribute. |
amqp$messageId | The unique ID of the Message |
amqp$priority | The Message priority |
amqp$replyTo | The value of the Message's Reply-To field |
amqp$routingKey | The routingKey of the AMQP Message |
amqp$timestamp | The timestamp of the Message, as the number of milliseconds since epoch |
amqp$type | The type of message |
amqp$userId | The ID of the user |
State Management
This component does not store state.
Restricted
This component is not restricted.
Input Requirement
This component does not allow an incoming relationship.
System Resource Considerations
This component does not specify system resource considerations.