| from Johannes Peter

Making customized Apache NiFi processors configurable with configuration files using the @OnScheduled annotation

Making customized Apache NiFi processors configurable

Although Apache NiFi provides various out-of-the-box processors to route, read or transform content of flowfiles, developers repeatedly face situations, where the available processors are not sufficient to solve complex ETL-problems. To overcome this issue, Apache NiFi includes a well-defined interface to include customized processors. Several blogs have already been published providing programmers an easy and straightforward insight into this interface.

When writing a processor, the focus of interest usually lies on the onTrigger()-method, which is executed whenever a flowfile reaches the processor. Within the body of this method, attributes and content of flowfiles can be read and transformed. Subsequently, the flowfile can be routed to a certain destination. However, certain actions should not be included into this method, especially if they are likely to unnecessarily lower the performance of the workflow when executed repeatedly. For instance, you might want to make your processor configurable with a configuration file. In this case, the configuration file should not be read each time a flowfile passes the processor. Instead, reading the configuration file only when the processor is activated (says scheduled) and repeatedly using the object storing its content is much more reasonable. For this purpose, the @OnScheduled annotation can be used flagging methods to be executed in terms of preparing the processor.

Example for using the @OnScheduled annotation
The following example demonstrates the usage of the @OnScheduled annotation. First, the class is defined in a usual way, including some inherited methods (i. e. a PropertyDescriptor), where the path leading to the configuration file has to be passed, a Relationship, and an uninitialized Properties object. It is worth mentioning that for the PropertyDescriptor the FILE_EXISTS_VALIDATOR is applied ensuring that the configuration file actually exists even before the processor can be scheduled in the NiFi environment.

@SideEffectFree
@Tags({"OnScheduled", "Configuration file"})
@CapabilityDescription("Add properties from file to flowfiles.")
public class OnScheduledTestProcessor extends AbstractProcessor {
  private List<PropertyDescriptor> properties;
  private Set<Relationship> relationships;
  private Properties props;
 
  public static final PropertyDescriptor PATH_TO_FILE = new PropertyDescriptor.Builder()
    .name("Path to properties file")
    .required(true)
    .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
    .build();
 
  public static final Relationship SUCCESS = new Relationship.Builder()
    .name("success")
    .description("Success relationship")
    .build();
 
  @Override
  public void init(final ProcessorInitializationContext context){
 List<PropertyDescriptor> properties = new ArrayList<>();
    properties.add(PATH_TO_FILE);
    this.properties = Collections.unmodifiableList(properties);
 
    Set<Relationship> relationships = new HashSet<>();
    relationships.add(SUCCESS);
    this.relationships = Collections.unmodifiableSet(relationships);
  }
  @Override
  public Set<Relationship> getRelationships(){
    return relationships;
  }
 
  @Override
  public List<PropertyDescriptor> getSupportedPropertyDescriptors(){
    return properties;
  }
}

Subsequently, a method prepare() with the @OnScheduled annotation is defined, which is supposed to be executed when the processor is activated / scheduled. Within this method, the Properties object is instantiated and properties are loaded from the file. The file has not to be checked for existence or for being a file as this has already been done by the validator of the PropertyDescriptor. When the properties have been loaded successfully, an INFO log message is recorded.

@OnScheduled
public void prepare(final ProcessContext context) throws IOException {
  props = new Properties();
  try (FileInputStream input = new FileInputStream(context.getProperty(PATH_TO_FILE).getValue())){
    props.load(input);
    getLogger().info("Properties file loaded");
  } catch (IOException e) {
    throw e;
  }
}

Finally, the onTrigger() method is defined. As this post focuses on the explanation for the preparation of processors, the body of this method is kept simple here. After the flowfile is checked for being null (which is helpful if the processor is supposed to run multi-threaded), the properties from the file are added to the attributes of the processor.

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
  FlowFile flowfile = session.get();
  if (flowfile == null)
    return;
  for (String propKey : props.stringPropertyNames()){
    flowfile = session.putAttribute(flowfile, propKey, props.getProperty(propKey));
  }
  getLogger().info("Added properties to flowfile");
  session.transfer(flowfile, SUCCESS);
  session.commit();
}
Workflow including a "GenerateFlowFile"

To test the processor within the NiFi environment, we create a small workflow including a “GenerateFlowFile”-processor, which creates a new flowfile and sends it to the new “OnScheduledTestProcessor”. This processor loads a .properties-file containing a single property (test.key=test.value) when it is scheduled and adds this property to the attributes of each flowfile that passes it.

Standard FlowFile Attributes

As we can see in the logs, the properties file is only loaded once. The “LogAttribute”-processor at the end of the workflow reveals that the property is actually added to the attributes of the passing flowfiles.

Finally, it is worth mentioning that there exist even more annotations within the NiFi interface for customized processors. For instance, methods can be defined with the “OnUnscheduled” annotation, to be executed whenever a processor is deactivated / unscheduled.

Apache, Apache NiFi, and NiFi are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. No endorsement by The Apache Software Foundation is implied by the use of these marks.

Share this article with others

Tags

About the author

Johannes war bis Ende 2017 Berater und Architekt im Bereich Search und Big Data bei der Woodmark. Sein Spezialgebiet umfasste die Verarbeitung unstrukturierter Daten. Dazu zählen Suche, Log Analyse und Natural Language Processing.

To overview blog posts