Skip to main content

About FlowFileTransform

Class nifiapi.flowfiletransform.FlowFileTransform

The FlowFileTransform class provides an interface for manipulating existing FlowFiles. Processors implementing the FlowFileTransform class are expected to implement the transform(ProcessContext, InputFlowFile) method, which should return an instance of FlowFileTransformResult.

The FlowFileTransform class supports both life-cycle methods: onScheduled and onStopped.

By default there are three relationships available for this class: success, failure and original. Additional relationships can be defined and declared using the getRelationships method. See Relationship for details.

It is possible to register custom properties by defining a getPropertyDescriptors method. See PropertyDescriptor for details.

Limitations

The transform method can only return one FlowFileTransformResult, which means there can only be a single FlowFileTransformResult for each input FlowFile.

If RecordReader, or RecordWriter type Controller Services are needed in the processor we recommend using the RecordTransform processor instead of the FlowFileTransform processor. The RecordTransform processor is designed with high performance in mind when using either RecordReader or RecordWriter. It is not advised to use either RecordReader, or RecordWriter type Controller Services in Processors implementing this class due to low performance. See RecordTransform for a more performant way to process records.

Implementation

See a boilerplate implementation.

from nifiapi.flowfiletransform import (
FlowFileTransform,
FlowFileTransformResult
)
from nifiapi.relationship import Relationship
from nifiapi.properties import (
ProcessContext,
PropertyDescriptor
)
from typing import List


class Processor(FlowFileTransform):

class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']

class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
description = '''
'''
tags = []
dependencies = []

def __init__(self, *args, **kwargs):
super().__init__()

def getRelationships(self) -> List[Relationship]:
'''
Register any additional relationships.

Returns:
list(Relationship)
'''
return []

def getPropertyDescriptors(self) -> List[PropertyDescriptor]:
'''
Register property descriptor required to successfully run the
processor.

Returns:
list(PropertyDescriptor)
'''
return []

def onScheduled(self, context: ProcessContext) -> None:
'''
The onScheduled method in Apache NiFi is a life-cycle method that is
called when a processor is scheduled to run, allowing for
initialization tasks such as setting up resources, loading
configurations, or validating properties.

Parameters:
context (ProcessContext)

Return:
None
'''
pass

def onStopped(self, context: ProcessContext) -> None:
'''
The `onStopped` method in Apache NiFi is a life-cycle method invoked
when a processor is stopped. It is used to perform cleanup tasks, such
as releasing resources or closing connections.

Parameters:
context (ProcessContext)

Returns:
None
'''
pass

def transform(
self, context: ProcessContext, flow_file
) -> FlowFileTransformResult:
'''
Parameters:
context (ProcessContext)
flow_file

Returns:
FlowFileTransformResult
'''
return FlowFileTransformResult("success")