Skip to main content

PublishKafka

Description

Sends the contents of a FlowFile as either a message or as individual records to Apache Kafka using the Kafka Producer API. The messages to send may be individual FlowFiles, may be delimited using a user-specified delimiter (such as a new-line), or may be record-oriented data that can be read by the configured Record Reader. The complementary NiFi processor for fetching messages is ConsumeKafka.

Tags

Apache, Kafka, Message, PubSub, Put, Record, Send, avro, csv, json, logs

Properties

In the list below required Properties are shown with an asterisk (*). Other properties are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

Display NameAPI NameDefault ValueAllowable ValuesDescription
Kafka Connection Service *Kafka Connection ServiceController Service:
KafkaConnectionService

Implementations:
AmazonMSKConnectionService
Kafka3ConnectionService
Provides connections to Kafka Broker for publishing Kafka Records
Topic Name *Topic NameName of the Kafka Topic to which the Processor publishes Kafka Records

Supports Expression Language, using FlowFile attributes and Environment variables.
Failure Strategy *Failure StrategyRoute to Failure
  • Route to Failure
  • Rollback
Specifies how the processor handles a FlowFile if it is unable to publish the data to Kafka
Delivery Guarantee *acksGuarantee Replicated Delivery
  • Guarantee Replicated Delivery
  • Guarantee Single Node Delivery
  • Best Effort
Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka Client acks property.
Compression Type *compression.typenone
  • none
  • gzip
  • snappy
  • lz4
  • zstd
Specifies the compression strategy for records sent to Kafka. Corresponds to Kafka Client compression.type property.
Max Request Size *max.request.size1 MBThe maximum size of a request in bytes. Corresponds to Kafka Client max.request.size property.
Transactions Enabled *Transactions Enabledtrue
  • true
  • false
Specifies whether to provide transactional guarantees when communicating with Kafka. If there is a problem sending data to Kafka, and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true requires that the [Delivery Guarantee] property be set to [Guarantee Replicated Delivery.]
Transactional ID PrefixTransactional ID PrefixSpecifies the KafkaProducer config transactional.id will be a generated UUID and will be prefixed with the configured string.

Supports Expression Language, using Environment variables.

This property is only considered if:
  • the property Transactions Enabled has a value of true
Partitioner Class *partitioner.classDefaultPartitioner
  • RoundRobinPartitioner
  • DefaultPartitioner
  • Expression Language Partitioner
Specifies which class to use to compute a partition id for a message. Corresponds to Kafka Client partitioner.class property.
Partition *partitionSpecifies the Kafka Partition destination for Records.

Supports Expression Language, using FlowFile attributes and Environment variables.

This property is only considered if:
  • the property Partitioner Class has a value of org.apache.nifi.processors.kafka.pubsub.Partitioners.ExpressionLanguagePartitioner
Message DemarcatorMessage DemarcatorSpecifies the string (interpreted as UTF-8) to use for demarcating multiple messages within a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.

Supports Expression Language, using FlowFile attributes and Environment variables.
Record ReaderRecord ReaderController Service:
RecordReaderFactory

Implementations:
AvroReader
CEFReader
CSVReader
ExcelReader
GrokReader
JsonPathReader
JsonTreeReader
ReaderLookup
ScriptedReader
Syslog5424Reader
SyslogReader
WindowsEventLogReader
XMLReader
YamlTreeReader
The Record Reader to use for incoming FlowFiles
Record WriterRecord WriterController Service:
RecordSetWriterFactory

Implementations:
AvroRecordSetWriter
CSVRecordSetWriter
FreeFormTextRecordSetWriter
JsonRecordSetWriter
RecordSetWriterLookup
ScriptedRecordSetWriter
XMLRecordSetWriter
The Record Writer to use in order to serialize the data before sending to Kafka
Publish Strategy *Publish StrategyUse Content as Record Value
  • Use Content as Record Value
  • Use Wrapper
The format used to publish the incoming FlowFile record to Kafka.

This property is only considered if:
  • the property Record Reader has a value specified
Message Key FieldMessage Key FieldThe name of a field in the Input Records that should be used as the Key for the Kafka message.

Supports Expression Language, using FlowFile attributes and Environment variables.

This property is only considered if:
  • the property Publish Strategy has a value of USE_VALUE
FlowFile Attribute Header PatternFlowFile Attribute Header PatternA Regular Expression that is matched against all FlowFile attribute names. Any attribute whose name matches the pattern will be added to the Kafka messages as a Header. If not specified, no FlowFile attributes will be added as headers.

This property is only considered if:
  • the property Publish Strategy has a value of USE_VALUE
Header Encoding *Header EncodingUTF-8For any attribute that is added as a Kafka Record Header, this property indicates the Character Encoding to use for serializing the headers.

This property is only considered if:
  • the property FlowFile Attribute Header Pattern has a value specified
Kafka KeyKafka KeyThe Key to use for the Message. If not specified, the FlowFile attribute 'kafka.key' is used as the message key, if it is present.Beware that setting Kafka key and demarcating at the same time may potentially lead to many Kafka messages with the same key.Normally this is not a problem as Kafka does not enforce or assume message and key uniqueness. Still, setting the demarcator and Kafka key at the same time poses a risk of data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.

Supports Expression Language, using FlowFile attributes and Environment variables.

This property is only considered if:
  • the property Publish Strategy has a value of USE_WRAPPER
Kafka Key Attribute Encoding *Kafka Key Attribute EncodingUTF-8 Encoded
  • UTF-8 Encoded
  • Hex Encoded
  • Do Not Add Key as Attribute
FlowFiles that are emitted have an attribute named 'kafka.key'. This property dictates how the value of the attribute should be encoded.

This property is only considered if:
  • the property Publish Strategy has a value of USE_WRAPPER
Record Key WriterRecord Key WriterController Service:
RecordSetWriterFactory

Implementations:
AvroRecordSetWriter
CSVRecordSetWriter
FreeFormTextRecordSetWriter
JsonRecordSetWriter
RecordSetWriterLookup
ScriptedRecordSetWriter
XMLRecordSetWriter
The Record Key Writer to use for outgoing FlowFiles

This property is only considered if:
  • the property Publish Strategy has a value of USE_WRAPPER
Record Metadata Strategy *Record Metadata StrategyUse Configured Values
  • Metadata From Record
  • Use Configured Values
Specifies whether the Record's metadata (topic and partition) should come from the Record's metadata field or if it should come from the configured Topic Name and Partition / Partitioner class properties

This property is only considered if:
  • the property Publish Strategy has a value of USE_WRAPPER

Dynamic Properties

This component does not support dynamic properties.

Relationships

NameDescription
failureAny FlowFile that cannot be sent to Kafka will be routed to this Relationship
successFlowFiles for which all content was sent to Kafka.

Reads Attributes

NameDescription
kafka.tombstoneIf this attribute is set to 'true', if the processor is not configured with a demarcator and if the FlowFile's content is null, then a tombstone message with zero bytes will be sent to Kafka.

Writes Attributes

NameDescription
msg.countThe number of messages that were sent to Kafka for this FlowFile. This attribute is added only to FlowFiles that are routed to success.

State Management

This component does not store state.

Restricted

This component is not restricted.

Input Requirement

This component requires an incoming relationship.

System Resource Considerations

This component does not specify system resource considerations.

See Also

ConsumeKafka