Skip to main content

RecordTransform

Class nifiapi.recordtransform.RecordTransform

The RecordTransform class provides a way to interact with RecordReader and RecordWriter controller services in Python. Individual records are identified and passed to a transform method for further processing. Each record is routed individually to an outgoing relationship. Additionally, the RecordTransform class supports record partitioning.

Regardless of the input and output formatting, the record object received by the transform method is an instance of a dictionary.

caution

The use of RecordReader and RecordWriter controller services in FlowFileSource and FlowFileTransform processors is not advised, as serializing and deserializing data passing between Java and Python is a time-consuming and inefficient process.

Processors implementing RecordTransform class are expected to implement the transform(ProcessContext, record, RecordSchema, CachingAttributeMap) method, which should return an instance of RecordTransformResult.

The RecordTransform class supports both life-cycle methods: onScheduled and onStopped.

By default there are three relationships available for this class: success, failure and original. Additional relations can be define and declared using getRelationships method. See Relationship for details.

By default all RecordTransform Processors are supplied with 'Record Reader' and 'Record Writer' properties. It is possible to register additional properties by defining a getPropertyDescriptors method. See PropertyDescriptor for details.

Limitations

The RecordTransform does not provide an interface for modifying FlowFile attributes.

Implementation

See a boilerplate implementation.

from nifiapi.recordtransform import (
RecordTransform,
RecordTransformResult
)
from nifiapi.relationship import Relationship
from nifiapi.properties import (
ProcessContext,
PropertyDescriptor
)
from typing import List


class Processor(RecordTransform):

class Java:
implements = ['org.apache.nifi.python.processor.RecordTransform']

class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
description = '''
'''
tags = []
dependencies = []

def __init__(self, *args, **kwargs):
super().__init__()

def getRelationships(self) -> List[Relationship]:
'''
Register any additional relationships.

Returns:
list(nifiapi.relationship.Relationship)
'''
return []

def getPropertyDescriptors(self) -> List[PropertyDescriptor]:
'''
Register property descriptor required to successfully run the
processor.

Returns:
list(PropertyDescriptor)
'''
return []

def onScheduled(self, context: ProcessContext) -> None:
'''
The onScheduled method in Apache NiFi is a life-cycle method that is
called when a processor is scheduled to run, allowing for
initialization tasks such as setting up resources, loading
configurations, or validating properties.

Parameters:
context (ProcessContext)

Return:
None
'''
pass

def onStopped(self, context: ProcessContext) -> None:
'''
The `onStopped` method in Apache NiFi is a life-cycle method invoked
when a processor is stopped. It is used to perform cleanup tasks, such
as releasing resources or closing connections.

Parameters:
context (ProcessContext)

Returns:
None
'''
pass

def transform(
self, context: ProcessContext, record: dict, schema, attribute_map
) -> RecordTransformResult:
'''
Parameters:
context (ProcessContext)
record (Dictionary)
schema (org.apache.nifi.serialization.record.RecordSchema)
attribute_map (nifiapi.recordtransform.CachingAttributeMap)

Returns:
RecordTransformResult
'''
return RecordTransformResult(
relationship="success",
record=record
)