| from Tianru Zhou

Apache Camel for Batching AWS Kinesis Records

Custom Apache Camel Component – Background information

In one of our ongoing projects we are developing a java application which is designed to receive, process and dispatch data to different endpoints. These requirements are a perfect fit for a message based enterprise integration framework like Apache Camel.

Apache Camel

Apache Camel is an open source integration framework which allows you to define routing and mediation rules in a lot of flexible ways. Pluggable components can be used to make Camel work with almost any transport or messaging model. It also comes with a lot of components for most commonly used transport models and also allows the implementation of your own custom component.

In one of the use cases in this project we want to receive data from some source, process them and finally send them to a AWS Kinesis (see https://aws.amazon.com/de/kinesis/?nc1=h_ls) endpoint for which a public Camel component already exists (see https://camel.apache.org/components/latest/aws-kinesis-component.html).

As our application grows, we are receiving more and more data. It was not long before we realized that we are facing performance issues: we are not sending data to AWS Kinesis fast enough to cope with the incoming load in real time.

By looking at the source code of the Camel Kinesis Component we learned that it uses the putRecord method provided by AWS Kinesis API to send messages.

AWS Kinesis API

The AWS Kinesis API provides two methods for sending data to an AWS Kinesis Stream:

  • putRecord: Writes a single data record into an Amazon Kinesis data stream.
  • putRecords: Writes multiple data records into a Kinesis data stream in a single call (also referred to as a PutRecords request).

Please also see https://docs.aws.amazon.com/de_de/kinesis/latest/APIReference/API_PutRecord.html and https://docs.aws.amazon.com/de_de/kinesis/latest/APIReference/API_PutRecords.html.

The putRecords method can take up to 500 messages and send them in a batch which typically improves the performance. However, the Camel AWS Kinesis component only uses the putRecord method to send data. After we figured that out our mission was quite clear: write a custom Camel component for AWS Kinesis and use putRecords to enable batch sending.

Our Solution using Apache Camel

Our solution can be broken into three main steps:

1. Aggregate the data into a batch. this is the payload we will send
2. Create a custom camel component
3. Change the producer to use the putRecords method

Camel Aggregator chart

Like the name suggested the aggregator can combine a number of messages together into a single message.

There are plenty of options to control the behavior of the Camel aggregator pattern. For example an aggregationStrategy is used for defining how the exchanges (the message container used inside Apache Camel) will be aggregated.

To be more precise the aggregation strategy defines how the incoming exchange will be merged into the existing already aggregated exchange.

In our case, we just need to create a putRecordsRequestEntry from the incoming exchange and add it to the putRecordsRequestEntryList in the existing already merged exchange.

public class ListWithPutRecordsRequestEntry implements AggregationStrategy {
 
  public ListWithPutRecordsRequestEntry() {
    super();
  }
   
  @Override
  public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
    List<PutRecordsRequestEntry> putRecordsRequestEntryList;
    List<String> correlationIds;
    if (oldExchange == null) {
      putRecordsRequestEntryList = new ArrayList();
    } else {
      putRecordsRequestEntryList = oldExchange.getIn().getBody(ArrayList.class);
    }
 
    PutRecordsRequestEntry putRecordsRequestEntry = createPutRecordEntry(newExchange);
 
    putRecordsRequestEntryList.add(putRecordsRequestEntry);
    newExchange.getIn().setBody(putRecordsRequestEntryList);
 
    return newExchange;<a href="https://www.woodmark.de/services/academy/webinare/">Webinare</a>
 
  }
 
  private PutRecordsRequestEntry createPutRecordEntry(Exchange newExchange) {
    PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
    putRecordsRequestEntry.setData(ByteBuffer.wrap(newExchange.getIn().getBody(byte[].class)));
   
    return putRecordsRequestEntry;
  }
}

You can also control under which circumstances the batch is considered „ready“. In other words, we can define when to finish the aggregation and send the aggregated exchanges to the next step. In our case we have to consider both size (number of messages) and time interval.

The size limit is there because AWS Kinesis only allows up to 5 MiB for the entire request.

In addition we have to consider a time limit for the aggregation. The reason is that our application has to run in (near) real-time and therefore we can’t keep data from dispatching for a long time.

Custom Apache Camel Component

Writing a custom Camel component is very straight forward. The following is from the official Apache Camel Documentation:

Here are the main steps to writing a component.

  • write a POJO which implements the Component interface. The simplest approach is just to derive from DefaultComponent
  • to support auto-discovery of your component add a file to META-INF/services/org/apache/camel/component/FOO where FOO is the URI scheme for your component and any related endpoints created on the fly. The latter file should contain the definition of the component class. For example if your component is implemented by the com.example.CustomComponent class, the service file should contain the following line – class=com.example.CustomComponent.

All we need to do is to copy the existing Camel Kinesis Component and create a file under the above mentioned path. In this file we must specify the name of the custom component:

class=de.woodmark.camel.BatchKinesisComponent

Custom Producer

Afterwards, the last step is to alter the producer to use the putRecords method instead of putRecord to send our batched records. In order to do that we need to implement the process method in the producer class. This void method takes an exchange as input which in our case is passed from the above described aggregator. After that we create a putRecordsRequest from the putRecordsRequestEntryList we created in the aggregator and pass it to the putRecords method.

We also save the response we get back from Kinesis into the exchange for further processing.

public class BatchKinesisProducer extends DefaultProducer {
 
  public BatchKinesisProducer(BatchKinesisEndpoint endpoint) {
    super(endpoint);
  }
   
  @Override
  public BatchKinesisEndpoint getEndpoint() {
    return (BatchKinesisEndpoint) super.getEndpoint();
  }
   
  @Override
  public void process(Exchange exchange) throws Exception {  
    PutRecordsRequest request = this.createRequest((Collection<PutRecordsRequestEntry>) exchange.getIn().getBody());
   
    PutRecordsResult putRecordsResult = this.getEndpoint().getClient().putRecords(request);
 
    Message message = AwsExchangeUtil.getMessageForResponse(exchange);
    message.setBody(putRecordsResult);
   
  }
 
  private PutRecordsRequest createRequest(Collection<PutRecordsRequestEntry> putRecordsRequestEntries) {
    PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
    putRecordsRequest.setStreamName(this.getEndpoint().getConfiguration().getStreamName());
    putRecordsRequest.setRecords(putRecordsRequestEntries);
   
    return putRecordsRequest;
  }
}

These are basically all the steps we took from identifying the problem, forming a solution based on research to the implementation of the chosen solution with Apache Camel. As a result the performance is increased from about 200 messages/second to about 500 messages/second which is an enormously improvement. We are currently also in the process of creating a pull request for the Camel github repository.

______

Apache Camel is an Apache Software Foundation project, available under the Apache v2 license. It’s a complete open community, always listening proposals and comments. Apache and Apache related brands are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. No endorsement by The Apache Software Foundation is implied by the use of these marks.
AWS Kinesis is a registered trademark of Amazon.com in the United States and/or other countries. No endorsement by Amazon.com / Amazon Web Services, Inc. is implied by the use of these marks.

Share this article with others

Tags

About the author

Tianru ist seit 2017 als Software Entwickler bei Woodmark Consulting mit den Schwerpunkten Java, Cloud (AWS zertifiziert, Azure), DevOps und den Big Data Technologien Hadoop, Spark, Kafka etc.

To overview blog posts