Skip to main content

MergeRecord

Description

This Processor merges together multiple record-oriented FlowFiles into a single FlowFile that contains all of the Records of the input FlowFiles. This Processor works by creating 'bins' and then adding FlowFiles to these bins until they are full. Once a bin is full, all of the FlowFiles will be combined into a single output FlowFile, and that FlowFile will be routed to the 'merged' Relationship. A bin will consist of potentially many 'like FlowFiles'. In order for two FlowFiles to be considered 'like FlowFiles', they must have the same Schema (as identified by the Record Reader) and, if the <Correlation Attribute Name> property is set, the same value for the specified attribute. See Processor Usage and Additional Details for more information. NOTE: this processor should NOT be configured with Cron Driven for the Scheduling Strategy.

Tags

content, correlation, event, merge, record, stream

Properties

In the list below required Properties are shown with an asterisk (*). Other properties are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

Display NameAPI NameDefault ValueAllowable ValuesDescription
Record Reader *record-readerController Service:
RecordReaderFactory

Implementations:
AvroReader
CEFReader
CSVReader
ExcelReader
GrokReader
JsonPathReader
JsonTreeReader
ReaderLookup
ScriptedReader
Syslog5424Reader
SyslogReader
WindowsEventLogReader
XMLReader
YamlTreeReader
Specifies the Controller Service to use for reading incoming data
Record Writer *record-writerController Service:
RecordSetWriterFactory

Implementations:
AvroRecordSetWriter
CSVRecordSetWriter
FreeFormTextRecordSetWriter
JsonRecordSetWriter
RecordSetWriterLookup
ScriptedRecordSetWriter
XMLRecordSetWriter
Specifies the Controller Service to use for writing out the records
Merge Strategy *merge-strategyBin-Packing Algorithm
  • Bin-Packing Algorithm
  • Defragment
Specifies the algorithm used to merge records. The 'Defragment' algorithm combines fragments that are associated by attributes back into a single cohesive FlowFile. The 'Bin-Packing Algorithm' generates a FlowFile populated by arbitrarily chosen FlowFiles
Correlation Attribute Namecorrelation-attribute-nameIf specified, two FlowFiles will be binned together only if they have the same value for this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue.
Attribute Strategy *Attribute StrategyKeep Only Common Attributes
  • Keep Only Common Attributes
  • Keep All Unique Attributes
Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, any attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile. If 'Keep Only Common Attributes' is selected, only the attributes that exist on all FlowFiles in the bundle, with the same value, will be preserved.
Minimum Number of Records *min-records1The minimum number of records to include in a bin

Supports Expression Language, using Environment variables.
Maximum Number of Recordsmax-records1000The maximum number of Records to include in a bin. This is a 'soft limit' in that if a FlowFIle is added to a bin, all records in that FlowFile will be added, so this limit may be exceeded by up to the number of records in the last input FlowFile.

Supports Expression Language, using Environment variables.
Minimum Bin Size *min-bin-size0 BThe minimum size of for the bin
Maximum Bin Sizemax-bin-sizeThe maximum size for the bundle. If not specified, there is no maximum. This is a 'soft limit' in that if a FlowFile is added to a bin, all records in that FlowFile will be added, so this limit may be exceeded by up to the number of bytes in last input FlowFile.
Max Bin Agemax-bin-ageThe maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> where <duration> is a positive integer and time unit is one of seconds, minutes, hours
Maximum Number of Bins *max.bin.count10Specifies the maximum number of bins that can be held in memory at any one time. This number should not be smaller than the maximum number of concurrent threads for this Processor, or the bins that are created will often consist only of a single incoming FlowFile.

Dynamic Properties

This component does not support dynamic properties.

Relationships

NameDescription
failureIf the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure
mergedThe FlowFile containing the merged records
originalThe FlowFiles that were used to create the bundle

Reads Attributes

NameDescription
fragment.countApplicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected in the given bundle.
fragment.identifierApplicable only if the <Merge Strategy> property is set to Defragment. All FlowFiles with the same value for this attribute will be bundled together.

Writes Attributes

NameDescription
<Attributes from Record Writer>Any Attribute that the configured Record Writer returns will be added to the FlowFile.
merge.bin.ageThe age of the bin, in milliseconds, when it was merged and output. Effectively this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output
merge.completion.reasonThis processor allows for several thresholds to be configured for merging FlowFiles. This attribute indicates which of the Thresholds resulted in the FlowFiles being merged. For an explanation of each of the possible values and their meanings, see the Processor's Usage / documentation and see the 'Additional Details' page.
merge.countThe number of FlowFiles that were merged into this bundle
merge.uuidUUID of the merged FlowFile that will be added to the original FlowFiles attributes
mime.typeThe MIME Type indicated by the Record Writer
record.countThe merged FlowFile will have a 'record.count' attribute indicating the number of records that were written to the FlowFile.

State Management

This component does not store state.

Restricted

This component is not restricted.

Input Requirement

This component requires an incoming relationship.

Example Use Cases

Use Case 1

Combine together many arbitrary Records in order to create a single, larger file

Configuration

Configure the "Record Reader" to specify a Record Reader that is appropriate for the incoming data type.
Configure the "Record Writer" to specify a Record Writer that is appropriate for the desired output data type.
Set "Merge Strategy" to Bin-Packing Algorithm.
Set the "Minimum Bin Size" to desired file size of the merged output file. For example, a value of 1 MB will result in not merging data until at least
1 MB of data is available (unless the Max Bin Age is reached first). If there is no desired minimum file size, leave the default value of 0 B.
Set the "Minimum Number of Records" property to the minimum number of Records that should be included in the merged output file. For example, setting the value
to 10000 ensures that the output file will have at least 10,000 Records in it (unless the Max Bin Age is reached first).
Set the "Max Bin Age" to specify the maximum amount of time to hold data before merging. This can be thought of as a "timeout" at which time the Processor will
merge whatever data it is, even if the "Minimum Bin Size" and "Minimum Number of Records" has not been reached. It is always recommended to set the value.
A reasonable default might be 10 mins if there is no other latency requirement.

Connect the 'merged' Relationship to the next component in the flow. Auto-terminate the 'original' Relationship.

Example Use Cases Involving Other Components

Multiprocessor Use Case 1

Combine together many Records that have the same value for a particular field in the data, in order to create a single, larger file

Components Involved

  • PartitionRecord
    1. Configure the "Record Reader" to specify a Record Reader that is appropriate for the incoming data type.
    2. Configure the "Record Writer" to specify a Record Writer that is appropriate for the desired output data type.
    3. Add a single additional property. The name of the property should describe the field on which the data is being merged together.
    4. The property's value should be a RecordPath that specifies which output FlowFile the Record belongs to.
    5. For example, to merge together data that has the same value for the "productSku" field, add a property named productSku with a value of /productSku.
    6. Connect the "success" Relationship to MergeRecord.
    7. Auto-terminate the "original" Relationship.
  • MergeRecord
    1. Configure the "Record Reader" to specify a Record Reader that is appropriate for the incoming data type.
    2. Configure the "Record Writer" to specify a Record Writer that is appropriate for the desired output data type.
    3. Set "Merge Strategy" to Bin-Packing Algorithm.
    4. Set the "Minimum Bin Size" to desired file size of the merged output file. For example, a value of 1 MB will result in not merging data until at least
    5. 1 MB of data is available (unless the Max Bin Age is reached first). If there is no desired minimum file size, leave the default value of 0 B.
    6. Set the "Minimum Number of Records" property to the minimum number of Records that should be included in the merged output file. For example, setting the value
    7. to 10000 ensures that the output file will have at least 10,000 Records in it (unless the Max Bin Age is reached first).
    8. Set the "Maximum Number of Records" property to a value at least as large as the "Minimum Number of Records." If there is no need to limit the maximum number of
    9. records per file, this number can be set to a value that will never be reached such as 1000000000.
    10. Set the "Max Bin Age" to specify the maximum amount of time to hold data before merging. This can be thought of as a "timeout" at which time the Processor will
    11. merge whatever data it is, even if the "Minimum Bin Size" and "Minimum Number of Records" has not been reached. It is always recommended to set the value.
    12. A reasonable default might be 10 mins if there is no other latency requirement.
    13. Set the value of the "Correlation Attribute Name" property to the name of the property that you added in the PartitionRecord Processor. For example, if merging data
    14. based on the "productSku" field, the property in PartitionRecord was named productSku so the value of the "Correlation Attribute Name" property should
    15. be productSku.
    16. Set the "Maximum Number of Bins" property to a value that is at least as large as the different number of values that will be present for the Correlation Attribute.
    17. For example, if you expect 1,000 different SKUs, set this value to at least 1001. It is not advisable, though, to set the value above 10,000.
    18. Connect the 'merged' Relationship to the next component in the flow.
    19. Auto-terminate the 'original' Relationship.

System Resource Considerations

This component does not specify system resource considerations.

See Also

MergeContent, PartitionRecord, SplitRecord