Apache camel

Recently I had the opportunity to play with Apache Camel and Spring Boot. I found the result quite entertaining! If you haven’t heard the name before, it is an open source routing tool which can receive messages, process them and send them to other destinations. In my case, I was reading payment messages from one topic, storing them in a db and then scheduled request with a POST call to another server.

Now, I could just start talking about all the different commands you can use, but I prefer to show you the project step by step. Some of the code is omitted such as Java Beans, JPA, Spring annotations, but what I do care about is to get the idea of how things work.

  • First we read from a specific topic.
  • Then we log and direct our call to another route where we will get our server token. This route is omitted for now
  • With the use of .bean simply we call the specified bean’s method, in that case getUnsentMessages
  • Split command is used in cases where bean method getUnsentMessages returns a List of results and we only want to perform REST calls for each one individually.
  • Directs the output.
 from(props.getDispatchMessageScheduler())
 	    	.log("Retrieve unsent messages")
	    	.to("direct:authService")
	    	.bean(topicMessageHandler,"getUnsentMessages")//injected bean
	    	.split(body())
	    	.to("direct:unsentTopics");

The code we just saw, will be the message generator for our consumer:

1 Receive message from direct
2..6 Log and process the message. In those lines we start seeing some Java too. With the .getIn() command we receive the direct message and map it to our base class. On lines 5 we create a header element for later use.

1.    	from("direct:unsentTopics")
2.    	.log("Dispatch topic")
3.			.process(exchange -> {
4.				MyBaseClass baseClass = exchange.getIn().getBody(MyBaseClass.class);
5.				exchange.getIn().setHeader("RECORD_ID", baseClass.getId());
6.			  })

7..14 Here we set all the headers prior to our POST request.

7.			.setHeader("CamelHttpMethod")
8.				.simple("POST")
9.			.setHeader("Content-Type")
10.				.simple("application/json;charset=UTF-8")
11.			.setHeader("Accept")
12.				.simple("application/json")
13.			.setHeader("Authorization")
14.				.simple("Bearer ${header.TOKEN}") 	

15..30 In those lines we try to determine the messages type and corresponding URL. One thing to have in mind is that the URLs where in http4 format. Also a cool trick to use when playing with http status is to use the parameter ?throwExceptionOnFailure=false in the URL. By default when you make a call and receive anything but 100…299 response code an exception will be thrown which is not what I wanted.

15.			.process(new Processor() {
16.				@Override
17.				public void process(final Exchange exchange) throws Exception {
18.					 String uri="";
19.					 MyBaseClass body =exchange.getIn().getBody(MyBaseClass.class);
20.					   if(body.getMessage() == null)
21.						 throw new RuntimeException("Empty body message");

22.					if( body.getMessage().contains(FLOW_DESCRIMINATOR_DEPOSIT)) {
23.						 uri = props.getDepositUrl(); }
24.					
25.			    	if(  body.getMessage().contains(FLOW_DESCRIMINATOR_LOAN)) {
26.			    		uri =  props.getLoanUrl(); }

27.			    	else {
28.			    		throw new RuntimeException("Invalid message type:" + body.getMessage());   	}

29.					exchange.getIn().setHeader("URI", uri);
30.					} })

31..45 In those lines we see two new commands .recipientList() and .choice(). The first one is most appropriately used in cases where you want to forward the same message to dynamically specify recipients. In our case we have only one. The .choice() is like an if/else command. When a specific condition is met do A otherwise do B. The scope of the command ends with .end()

31.			.setBody().simple("${body.message}")
32.			.recipientList(header(URI"))
33.			.choice()
34.			.when()
          .simple("${header.CamelHttpResponseCode} == 200")
35.					.process(exchange -> {
36.						Integer recordId = exchange.getIn().getHeader("RECORD_ID",Integer.class);
37.						exchange.getIn().setBody(recordId);				})

38.				.bean(topicMessageHandler,"successfullyDispatched")
39.				.log("Topic dispatch completed successfully")
40.	        .otherwise()
41.		                .process(exchange -> {
42.				        	MyErrorObject errorObj = createErrorObject(exchange);				        	
43.							exchange.getIn().setBody(errorObj);	})
44.		        .bean(topicMessageHandler,"unsuccessfullyDispatched")
45.      .end();

To sum it up, we managed to do a lot of stuff with little coding. It is a great tool for cases where you just want to route traffic. This was my first project so I am pretty sure there much optimization to be done, just wanted to show you the basics. One thing that I still haven’t found though is how to test Camel applications properly.