RecordTransform
Class
nifiapi.recordtransform.RecordTransform
The RecordTransform
class provides a way to interact with RecordReader
and RecordWriter
controller services in Python. Individual records are identified and passed to a transform
method for further processing. Each record is routed individually to an outgoing relationship. Additionally, the RecordTransform
class supports record partitioning.
Regardless of the input and output formatting, the record
object received by the transform
method is an instance of a dictionary.
The use of RecordReader
and RecordWriter
controller services in FlowFileSource
and FlowFileTransform
processors is not advised, as serializing and deserializing data passing between Java and Python is a time-consuming and inefficient process.
Processors implementing RecordTransform
class are expected to implement the transform
(ProcessContext, record, RecordSchema
, CachingAttributeMap
) method, which should return an instance of RecordTransformResult.
The RecordTransform
class supports both life-cycle methods: onScheduled and onStopped.
By default there are three relationships available for this class: success
, failure
and original
. Additional relations can be define and declared using getRelationships
method. See Relationship for details.
By default all RecordTransform
Processors are supplied with 'Record Reader' and 'Record Writer' properties. It is possible to register additional properties by defining a getPropertyDescriptors
method. See PropertyDescriptor for details.
Limitations
The RecordTransform
does not provide an interface for modifying FlowFile attributes.
Implementation
See a boilerplate implementation.
from nifiapi.recordtransform import (
RecordTransform,
RecordTransformResult
)
from nifiapi.relationship import Relationship
from nifiapi.properties import (
ProcessContext,
PropertyDescriptor
)
from typing import List
class Processor(RecordTransform):
class Java:
implements = ['org.apache.nifi.python.processor.RecordTransform']
class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
description = '''
'''
tags = []
dependencies = []
def __init__(self, *args, **kwargs):
super().__init__()
def getRelationships(self) -> List[Relationship]:
'''
Register any additional relationships.
Returns:
list(nifiapi.relationship.Relationship)
'''
return []
def getPropertyDescriptors(self) -> List[PropertyDescriptor]:
'''
Register property descriptor required to successfully run the
processor.
Returns:
list(PropertyDescriptor)
'''
return []
def onScheduled(self, context: ProcessContext) -> None:
'''
The onScheduled method in Apache NiFi is a life-cycle method that is
called when a processor is scheduled to run, allowing for
initialization tasks such as setting up resources, loading
configurations, or validating properties.
Parameters:
context (ProcessContext)
Return:
None
'''
pass
def onStopped(self, context: ProcessContext) -> None:
'''
The `onStopped` method in Apache NiFi is a life-cycle method invoked
when a processor is stopped. It is used to perform cleanup tasks, such
as releasing resources or closing connections.
Parameters:
context (ProcessContext)
Returns:
None
'''
pass
def transform(
self, context: ProcessContext, record: dict, schema, attribute_map
) -> RecordTransformResult:
'''
Parameters:
context (ProcessContext)
record (Dictionary)
schema (org.apache.nifi.serialization.record.RecordSchema)
attribute_map (nifiapi.recordtransform.CachingAttributeMap)
Returns:
RecordTransformResult
'''
return RecordTransformResult(
relationship="success",
record=record
)