Creating a new Python Processor
Introducing a new Processor is a simple matter of implementing a Python class that extends one of the following classes FlowFileSource or FlowFileTransform.
We recommend processors be designed with modularity in mind with each processor having a well defined and narrow scope. Multiple processors built this way can easily be mixed and matched on the canvas to build flexible data pipelines that solve complex problems. This, in the long term will allow for a higher degree of control, facilitate (inevitable) debugging and, as each processor is provided with separate log output, will provide the best overview of actions taken by custom code.
In order to use the Processor, description, tags and dependencies need to be defined, which can look like this:
from nifiapi.flowfiletransform import (
FlowFileTransform,
FlowFileTransformResult,
)
from nifiapi.properties import ProcessContext
class ExamplePutSQL(FlowFileTransform):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
description = '''
Executes a SQL INSERT command. The content of an incoming FlowFile is
expected to be a JSON document containing a single dictionary
describing a record to be processed. Keys of the dictionary must match
field names defined for a collection.
The content of the FlowFile is expected to be in UTF-8 format.
'''
tags = [
"mysql", "database", "insert"
]
dependencies = []
def transform(
self, context: ProcessContext, flow_file
) -> FlowFileTransformResult:
'''
Parameters:
context (ProcessContext)
flow_file
Returns:
FlowFileTransformResult
'''
return FlowFileTransformResult('success')
See FlowFileTransformResult to learn more about modifying FlowFile attributes and contents.
Note that above code is an example of FlowFileTransform class, which requires the implementation of transform
method.
Required methods change between different processor classes. See Processor Types for details.
ProcessorDetails Attributes
Name | Type | Description |
---|---|---|
version | String | Specifies the version of the processor |
description | String | A short and concise blurb used to identify main functionality |
tags | List(String) | Keywords allowing for quick search and access thought the NiFi UI |
dependencies | List(String) | Any external or additional packages required by the Processor to run; Dependencies are automatically pulled when a new Processor is added to the Flow; All dependencies need to be accessible via pip |
Note the classname is important as it determines the processor name that is shown in the NiFi UI. It is the convention is to put action first (verb) following by destination (noun).