PublishKafka
Description
Sends the contents of a FlowFile as either a message or as individual records to Apache Kafka using the Kafka Producer API. The messages to send may be individual FlowFiles, may be delimited using a user-specified delimiter (such as a new-line), or may be record-oriented data that can be read by the configured Record Reader. The complementary NiFi processor for fetching messages is ConsumeKafka.
Tags
Apache, Kafka, Message, PubSub, Put, Record, Send, avro, csv, json, logs
Properties
In the list below required Properties are shown with an asterisk (*). Other properties are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
Display Name | API Name | Default Value | Allowable Values | Description |
---|---|---|---|---|
Kafka Connection Service * | Kafka Connection Service | Controller Service: KafkaConnectionService Implementations: AmazonMSKConnectionService Kafka3ConnectionService | Provides connections to Kafka Broker for publishing Kafka Records | |
Topic Name * | Topic Name | Name of the Kafka Topic to which the Processor publishes Kafka Records Supports Expression Language, using FlowFile attributes and Environment variables. | ||
Failure Strategy * | Failure Strategy | Route to Failure |
| Specifies how the processor handles a FlowFile if it is unable to publish the data to Kafka |
Delivery Guarantee * | acks | Guarantee Replicated Delivery |
| Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka Client acks property. |
Compression Type * | compression.type | none |
| Specifies the compression strategy for records sent to Kafka. Corresponds to Kafka Client compression.type property. |
Max Request Size * | max.request.size | 1 MB | The maximum size of a request in bytes. Corresponds to Kafka Client max.request.size property. | |
Transactions Enabled * | Transactions Enabled | true |
| Specifies whether to provide transactional guarantees when communicating with Kafka. If there is a problem sending data to Kafka, and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true requires that the [Delivery Guarantee] property be set to [Guarantee Replicated Delivery.] |
Transactional ID Prefix | Transactional ID Prefix | Specifies the KafkaProducer config transactional.id will be a generated UUID and will be prefixed with the configured string. Supports Expression Language, using Environment variables. This property is only considered if:
| ||
Partitioner Class * | partitioner.class | DefaultPartitioner |
| Specifies which class to use to compute a partition id for a message. Corresponds to Kafka Client partitioner.class property. |
Partition * | partition | Specifies the Kafka Partition destination for Records. Supports Expression Language, using FlowFile attributes and Environment variables. This property is only considered if:
| ||
Message Demarcator | Message Demarcator | Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS. Supports Expression Language, using FlowFile attributes and Environment variables. | ||
Record Reader | Record Reader | Controller Service: RecordReaderFactory Implementations: AvroReader CEFReader CSVReader ExcelReader GrokReader JsonPathReader JsonTreeReader ReaderLookup ScriptedReader Syslog5424Reader SyslogReader WindowsEventLogReader XMLReader YamlTreeReader | The Record Reader to use for incoming FlowFiles | |
Record Writer | Record Writer | Controller Service: RecordSetWriterFactory Implementations: AvroRecordSetWriter CSVRecordSetWriter FreeFormTextRecordSetWriter JsonRecordSetWriter RecordSetWriterLookup ScriptedRecordSetWriter XMLRecordSetWriter | The Record Writer to use in order to serialize the data before sending to Kafka | |
Publish Strategy * | Publish Strategy | Use Content as Record Value |
| The format used to publish the incoming FlowFile record to Kafka. This property is only considered if:
|
Message Key Field | Message Key Field | The name of a field in the Input Records that should be used as the Key for the Kafka message. Supports Expression Language, using FlowFile attributes and Environment variables. This property is only considered if:
| ||
FlowFile Attribute Header Pattern | FlowFile Attribute Header Pattern | A Regular Expression that is matched against all FlowFile attribute names. Any attribute whose name matches the pattern will be added to the Kafka messages as a Header. If not specified, no FlowFile attributes will be added as headers. This property is only considered if:
| ||
Header Encoding * | Header Encoding | UTF-8 | For any attribute that is added as a Kafka Record Header, this property indicates the Character Encoding to use for serializing the headers. This property is only considered if:
| |
Kafka Key | Kafka Key | The Key to use for the Message. If not specified, the FlowFile attribute 'kafka.key' is used as the message key, if it is present.Beware that setting Kafka key and demarcating at the same time may potentially lead to many Kafka messages with the same key.Normally this is not a problem as Kafka does not enforce or assume message and key uniqueness. Still, setting the demarcator and Kafka key at the same time poses a risk of data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key. Supports Expression Language, using FlowFile attributes and Environment variables. This property is only considered if:
| ||
Kafka Key Attribute Encoding * | Kafka Key Attribute Encoding | UTF-8 Encoded |
| FlowFiles that are emitted have an attribute named 'kafka.key'. This property dictates how the value of the attribute should be encoded. This property is only considered if:
|
Record Key Writer | Record Key Writer | Controller Service: RecordSetWriterFactory Implementations: AvroRecordSetWriter CSVRecordSetWriter FreeFormTextRecordSetWriter JsonRecordSetWriter RecordSetWriterLookup ScriptedRecordSetWriter XMLRecordSetWriter | The Record Key Writer to use for outgoing FlowFiles This property is only considered if:
| |
Record Metadata Strategy * | Record Metadata Strategy | Use Configured Values |
| Specifies whether the Record's metadata (topic and partition) should come from the Record's metadata field or if it should come from the configured Topic Name and Partition / Partitioner class properties This property is only considered if:
|
Dynamic Properties
This component does not support dynamic properties.
Relationships
Name | Description |
---|---|
failure | Any FlowFile that cannot be sent to Kafka will be routed to this Relationship |
success | FlowFiles for which all content was sent to Kafka. |
Reads Attributes
Name | Description |
---|---|
kafka.tombstone | If this attribute is set to 'true', if the processor is not configured with a demarcator and if the FlowFile's content is null, then a tombstone message with zero bytes will be sent to Kafka. |
Writes Attributes
Name | Description |
---|---|
msg.count | The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to FlowFiles that are routed to success. |
State Management
This component does not store state.
Restricted
This component is not restricted.
Input Requirement
This component requires an incoming relationship.
System Resource Considerations
This component does not specify system resource considerations.