Skip to main content

ConsumeKafka

Description

Consumes messages from Apache Kafka Consumer API. The complementary NiFi processor for sending messages is PublishKafka. The Processor supports consumption of Kafka messages, optionally interpreted as NiFi records. Please note that, at this time (in read record mode), the Processor assumes that all records that are retrieved from a given partition have the same schema. For this mode, if any of the Kafka messages are pulled but cannot be parsed or written with the configured Record Reader or Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the 'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual messages within the single FlowFile. A 'record.count' attribute is added to indicate how many messages are contained in the FlowFile. No two Kafka messages will be placed into the same FlowFile if they have different schemas, or if they have different values for a message header that is included by the <Headers to Add as Attributes> property.

Tags

Consume, Get, Ingest, Ingress, Kafka, PubSub, Record, Topic, avro, csv, json

Properties

In the list below required Properties are shown with an asterisk (*). Other properties are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

Display NameAPI NameDefault ValueAllowable ValuesDescription
Kafka Connection Service *Kafka Connection ServiceController Service:
KafkaConnectionService

Implementations:
AmazonMSKConnectionService
Kafka3ConnectionService
Provides connections to Kafka Broker for publishing Kafka Records
Group ID *Group IDKafka Consumer Group Identifier corresponding to Kafka group.id property
Topic Format *Topic Formatnames
  • names
  • pattern
Specifies whether the Topics provided are a comma separated list of names or a single regular expression
Topics *TopicsThe name or pattern of the Kafka Topics from which the Processor consumes Kafka Records. More than one can be supplied if comma separated.

Supports Expression Language, using Environment variables.
Auto Offset Reset *auto.offset.resetlatest
  • earliest
  • latest
  • none
Automatic offset configuration applied when no previous consumer offset found corresponding to Kafka auto.offset.reset property
Commit Offsets *Commit Offsetstrue
  • true
  • false
Specifies whether this Processor should commit the offsets to Kafka after receiving messages. Typically, this value should be set to true so that messages that are received are not duplicated. However, in certain scenarios, we may want to avoid committing the offsets, that the data can be processed and later acknowledged by PublishKafka in order to provide Exactly Once semantics.
Max Uncommitted Time *Max Uncommitted Time100 millisSpecifies the maximum amount of time that the Processor can consume from Kafka before it must transfer FlowFiles on through the flow and commit the offsets to Kafka (if appropriate). A larger time period can result in longer latency
Header Name PatternHeader Name PatternRegular Expression Pattern applied to Kafka Record Header Names for selecting Header Values to be written as FlowFile attributes
Header Encoding *Header EncodingUTF-8Character encoding applied when reading Kafka Record Header values and writing FlowFile attributes
Processing Strategy *Processing StrategyFLOW_FILE
  • FLOW_FILE
  • DEMARCATOR
  • RECORD
Strategy for processing Kafka Records and writing serialized output to FlowFiles
Record Reader *Record ReaderController Service:
RecordReaderFactory

Implementations:
AvroReader
CEFReader
CSVReader
ExcelReader
GrokReader
JsonPathReader
JsonTreeReader
ReaderLookup
ScriptedReader
Syslog5424Reader
SyslogReader
WindowsEventLogReader
XMLReader
YamlTreeReader
The Record Reader to use for incoming Kafka messages

This property is only considered if:
  • the property Processing Strategy has a value of RECORD
Record Writer *Record WriterController Service:
RecordSetWriterFactory

Implementations:
AvroRecordSetWriter
CSVRecordSetWriter
FreeFormTextRecordSetWriter
JsonRecordSetWriter
RecordSetWriterLookup
ScriptedRecordSetWriter
XMLRecordSetWriter
The Record Writer to use in order to serialize the outgoing FlowFiles

This property is only considered if:
  • the property Processing Strategy has a value of RECORD
Output Strategy *Output StrategyUse Content as Value
  • Use Content as Value
  • Use Wrapper
The format used to output the Kafka Record into a FlowFile Record.

This property is only considered if:
  • the property Processing Strategy has a value of RECORD
Key Attribute Encoding *Key Attribute EncodingUTF-8 Encoded
  • UTF-8 Encoded
  • Hex Encoded
  • Do Not Add Key as Attribute
Encoding for value of configured FlowFile attribute containing Kafka Record Key.

This property is only considered if:
  • the property Output Strategy has a value of USE_VALUE
Key Format *Key FormatByte Array
  • String
  • Byte Array
  • Record
Specifies how to represent the Kafka Record Key in the output FlowFile

This property is only considered if:
  • the property Output Strategy has a value of USE_WRAPPER
Key Record Reader *Key Record ReaderController Service:
RecordReaderFactory

Implementations:
AvroReader
CEFReader
CSVReader
ExcelReader
GrokReader
JsonPathReader
JsonTreeReader
ReaderLookup
ScriptedReader
Syslog5424Reader
SyslogReader
WindowsEventLogReader
XMLReader
YamlTreeReader
The Record Reader to use for parsing the Kafka Record Key into a Record

This property is only considered if:
  • the property Key Format has a value of record
Message Demarcator *Message DemarcatorSince KafkaConsumer receives messages in batches, this Processor has an option to output FlowFiles which contains all Kafka messages in a single batch for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart multiple Kafka messages. This is an optional property and if not provided each Kafka message received will result in a single FlowFile which time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS

This property is only considered if:
  • the property Processing Strategy has a value of DEMARCATOR
Separate By Key *Separate By Keyfalse
  • true
  • false
When this property is enabled, two messages will only be added to the same FlowFile if both of the Kafka Messages have identical keys.

This property is only considered if:
  • the property Message Demarcator has a value specified

Dynamic Properties

This component does not support dynamic properties.

Relationships

NameDescription
successFlowFiles containing one or more serialized Kafka Records

Reads Attributes

This processor does not read attributes.

Writes Attributes

NameDescription
kafka.countThe number of messages written if more than one
kafka.keyThe key of message if present and if single message. How the key is encoded depends on the value of the 'Key Attribute Encoding' property.
kafka.offsetThe offset of the message in the partition of the topic.
kafka.partitionThe partition of the topic the message or message bundle is from
kafka.timestampThe timestamp of the message in the partition of the topic.
kafka.tombstoneSet to true if the consumed message is a tombstone message
kafka.topicThe topic the message or message bundle is from
mime.typeThe MIME Type that is provided by the configured Record Writer
record.countThe number of records received

State Management

This component does not store state.

Restricted

This component is not restricted.

Input Requirement

This component does not allow an incoming relationship.

System Resource Considerations

This component does not specify system resource considerations.

See Also

PublishKafka