Apache NiFi


-- cursh handling
[4:26 PM] Joe Witt: set that to
[4:26 PM] Joe Witt: nifi.flowcontroller.autoResumeState=false
[4:26 PM] Joe Witt: then you can fix/change whatever is going into a hung thread state mode
[4:27 PM] Joe Witt: in latest release (or maybe in the next release?) you can kill hung processors too
[4:27 PM] Joe Witt: that wil let you fix things and restart them as well
[4:27 PM] Joe Witt: that is new or about to be released

Create Processor

mvn archetype:generate \
-DarchetypeGroupId=org.apache.nifi \
-DarchetypeArtifactId=nifi-processor-bundle-archetype \
-DarchetypeVersion=1.5.0 -DnifiVersion=1.2.0

executescript-cookbook: part-1, part-2, part-3

Created on May 6, 2018
@author: yehuda
from import IOUtils
from java.nio.charset import StandardCharsets
from import OutputStreamCallback, InputStreamCallback
import json
class InputStreamCB(InputStreamCallback):
    def __init__(self):
        self.items = []
    def process(self, inputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        d = json.loads(text)
        collection_name = "messages"
        c = "source , batch".split(",")
        collect_to_attach = map(lambda x: x.strip(), c)
        attach_data = {}
        for aitem in collect_to_attach:
            if aitem in d:
                attach_data[aitem] = d[aitem]
        col_name_parts = collection_name.split(".")
        # Filter
        selected_obj = d
        for col_name_part in col_name_parts:
            if col_name_part in selected_obj:
                selected_obj = selected_obj[col_name_part]
        # Enreach item
        out = []
        for item in selected_obj:
            m = dict()
            # SAVE AS NEW FLOW FILE
        self.items = out
    def getItems(self):
        return self.items
class OutputStreamCB(OutputStreamCallback):
    def __init__(self):
    def setItem(self,i):
        self.item = i
    def process(self, outputStream):
        str_out = json.dumps(self.item)
ff = session.get()
if ff != None:
    isCB = InputStreamCB(), isCB)
    counter = 1
    for itm in isCB.getItems():
        itm_ff = session.create(ff)
        outCB = OutputStreamCB()
        itm_ff = session.write(itm_ff, outCB)
        session.transfer(itm_ff, REL_SUCCESS)
    session.transfer(ff, REL_FAILURE)
