Blogroll

Publish/Subscribe Pattern with Apache Camel

Publish/Subscribe is a simple messaging pattern where a publisher sends messages to a channel without the knowledge of who is going to receive them. Then it is the responsibility of the channel to deliver a copy of the messages to each subscriber. This messaging model enables creation of loosely coupled and scalable systems.
It is a very common messaging pattern and there are so many ways to create a kind of pub-sub in Apache Camel. But bear in mind that they are all different and have different characteristics. From the simplest to more complex, here is a list:
  • Multicast - works only with a static list of subscribers, can deliver the message to subscriber in parallel, stops or continues on exception if one of the subscribers fails.
  • Recipient List - it is similar to multicast, but allows the subscribers to be defined at run time, for example in the message header.
  • SEDA - this component provides asynchronous SEDA behaviour using BlockingQueue. When multipleConsumers option is set, it can be used for asynchronous pub-sub messaging. It also has possibilities to block when full, set queue size or time out publishing if the message is not consumed on time.
  • VM - same as SEDA, but works cross multiple CamelContexts, as long as they are in the same JMV. It is a nice mechanism for sending messages between webapps in a web-container or bundles in OSGI container.
  • Spring-redis - Redis has pubsub feature which allows publishing messages to multiple receivers. It is possible to subscribe to a channel by name or using pattern-matching. When pattern-matching is used, the subscriber will receive messages from all the channels matching the pattern. Keep in mind that in this case it is possible to receive a message more than once, if the multiple patterns matches the same channel where the message was sent.
  • JMS (ActiveMQ) - that's probably the best know way for doing pub-sub including durable subscriptions. For a complete list of features check ActiveMQ website.
  • Amazon SNS/SQS - if you need a really scalable and reliable solution, SNS is the way to go. Subscribing a SQS queue to the topic, turns it into a durable subscriber and allows polling the messages later. The important point to remember in this case is that it is not very fast and most importantly, Amazon doesn't guarantee FIFO order for your messages.
There are also less popular Camel components which offer publish-subscribe messaging model:
  • websocket - it uses Eclipse Jetty Server and can sends message to all clients which are currently connected.
  • hazelcast - SEDA implements a work-queue in order to support asynchronous SEDA architectures.
  • guava-eventbus - integration bridge between Camel and Google Guava EventBus infrastructure.
  • spring-event - provides access to the Spring ApplicationEvent objects.
  • eventadmin - on OSGi environment to receive OSGI events.
  • xmpp - implements XMPP (Jabber) transport. Posting a message in chat room is also pub-sub;)
  • mqtt - for communicating with MQTT compliant message brokers.
  • amqp - supports the AMQP protocol using the Client API of the Qpid project.
  • javaspace - a transport for working with any JavaSpace compliant implementation.
Can you name any other way for doing publish-subscribe?


Accessing AWS without key and secret

If you are using Amazon Web Services(AWS), you are probably aware how to access and use resources like SNS, SQS, S3 using key and secret. With the aws-java-sdk that is straight forward:
AmazonSNSClient snsClient = new AmazonSNSClient(
new BasicAWSCredentials("your key", "your secret"))
One of the difficulties with this approach is storing the key/secret securely especially when there are different set of these for different environments. Using java property files, combined with maven or spring profiles might help a little bit to externalize the key/secret out of your source code, but still doesn't solve the issue of securely accessing these resources.
Amazon has another service to help you in this occasion. No, no, this is not one more service to pay for in order to use the previous services. It is a free service, actually it is a feature of the amazon account. AWS Identity and Access Management (IAM) lets you securely control access to AWS services and resources for your users, you can manage users and groups and define permissions for AWS resources.
One interesting functionality of IAM is the ability to assign roles to EC2 instances. The idea is you create roles with sets of permissions and you launch an EC2 instance by assigning the role to the instance. And when you deploy an application on that instance, the application doesn't need to have access key and secret in order to access other amazon resource. The application will use the role credentials to sign the requests. This has a number of benefits like a centralized place to control all the instances credentials, reduced risk with auto refreshing credentials and so on. Here is a short video demonstrating how to assign roles to an EC2 instance:



Once you have role based security enabled for an instance, to access other resources from that instances you have to create and AwsClient using the chained credential provider:
AmazonSNSClient snsClient = new AmazonSNSClient(
new DefaultAWSCredentialsProviderChain())
The provider will search your system properties, environment properties and finally call instance metadata API to retrieve the role credentials in chain of responsibility fashion. It will also refresh the credentials in the background periodically depending on its expiration period.
And finally, if you want to use role based security from Camel applications running on Amazon, all you have to do is create an instance of the client with configured chained credentials object and don't specify any key or secret:
from("direct:start")
.to("aws-sns://MyTopic?amazonSNSClient=#snsClient");

Apache Camel meets Redis

The Lamborghini of Key-Value stores
Camel is the best of bread Integration framework and in this post I'm going to show you how to make it even more powerful by leveraging another great project - Redis. Camel 2.11 is on its way to be released soon with lots of new features, bug fixes and components. Couple of these new components are authored by me, redis-component being my favourite one. Redis - a ligth key/value store is an amazing piece of Italian software designed for speed (same as Lamborghini - a two-seater Italian car designed for speed). Written in C and having an in-memory closer to the metal nature, Redis performs extremely well (Lamborgini's motto is "Closer to the Road"). Redis is often referred to as a data structure server since keys can contain strings, hashes, lists and sorted sets. A fast and light data structure server is like a super sportscars for software engineers - it just flies. If you want to find out more about Redis' and Lamborghini's unique performance characteristics google around and you will see for yourself.
Getting started with Redis is easy: download, make, and start a redis-server. After these steps, you ready to use it from your Camel application. The component uses internally Spring Data which in turn uses Jedis driver, but with possibility to switch to other Redis drivers. Here are few use cases where the camel-redis component is a good fit:

Idempotent Repository
The term idempotent is used in mathematics to describe a function that produces the same result if it is applied to itself. In Messaging this concepts translates into the a message that has the same effect whether it is received once or multiple times. In Camel this pattern is implemented using the IdempotentConsumer class which uses an Expression to calculate a unique message ID string for a given message exchange; this ID can then be looked up in the IdempotentRepository to see if it has been seen before; if it has the message is consumed; if its not then the message is processed and the ID is added to the repository. RedisIdempotentRepository is using a set structure to store and check for existing Ids.

Caching
One of the main uses of Redis is as LRU cache. It can store data inmemory as Memcached or can be tuned to be durable flushing data to a log file that can be replayed if the node restarts.The various policies when maxmemory is reached allows creating caches for specific needs:
  • volatile-lru remove a key among the ones with an expire set, trying to remove keys not recently used.
  • volatile-ttl remove a key among the ones with an expire set, trying to remove keys with short remaining time to live.
  • volatile-random remove a random key among the ones with an expire set.
  • allkeys-lru like volatile-lru, but will remove every kind of key, both normal keys or keys with an expire set.
  • allkeys-random like volatile-random, but will remove every kind of keys, both normal keys and keys with an expire set.
Once your Redis server is configured with the right policies and running, the operation you need to do are SET and GET:

Interap pub/sub with Redis
Camel has various components for interacting between routes:
direct: provides direct, synchronous invocation in the same camel context.
seda: asynchronous behavior, where messages are exchanged on a BlockingQueue, again in the same camel context.
vm: asynchronous behavior like seda, but also supports communication across CamelContext as long as they are in the same JVM.
Complex applications usually consist of more than one standalone Camel instances running on separate machines. For this kind of scenarios, Camel provides jms, activemq, combination of AWS SNS with SQS, for messaging between instances.
Redis has a simpler solution for the Publish/Subscribe messaging paradigm. Subscribers subscribes to one or more channels, by specifying the channel names or using pattern matching for receiving messages from multiple channels. Then the publisher publishes the messages to a channel, and Redis makes sure it reaches all the matching subscribers.

Other usages
Guaranteed Delivery: Camel supports this EIP using JMS, File, JPA and few other components. Here Redis can be used as lightweight key-value persistent store with its transaction support.
The Claim Check from the EIP patterns allows you to replace message content with a claim check (a unique key), which can be used to retrieve the message content at a later time. The message content can be stored temporarily in Redis.
Redis is also very popular for implementing counters, leaderboards, tagging systems and many more functionalities. Now, with two swiss army knives under your belt, the integrations to make are limited only by your imagination.

Spring Integration 2.2 is out, it is time for another comparison with Apache Camel

A year after v2.1 had been released, Spring Integration v2.2 is out with 3 new components MondoDB, Redis and JPA (even though the first 2 were listed also in v2.1 as message store, now they are available as Inbound and Outbound Channel Adapters), retry and other new features.
As a comparison Apache Camel has also released v2.10 during the same period with around 18 new components. JPA support in Camel is available from long time, MongoDB since v2.10 and Redis since my commit couple of days ago. But of course the number of connectors (more than 130 for Camel) is not the most important characteristic for an integration framework (some might argue that). There are many other factors which matter when choosing an open source project for your needs. A quick search on the internet will show you which are the main questions you should ask before deciding which open source project to use for your enterprise. Here are only couple of them and my biased answers:

1. What is the license?
- Apache Camel (like all Apache projects) and Spring Integration both are using Apache License version 2.0 which is one of the most permissive licences. It basically says do whatever you want with the software.

2. Is the project actively developed? - According to ohloh.net for the last 12 months Apache Camel has 2415 commits from 24 committers whereas Spring Integration has 949 commits from 17 committers.

3. How mature is the project? - Both projects have started in 2007 and currently Spring Integration has 234K lines of code and is estimated to 60 years of effort, whereas Apache Camel has 800K lines of code and estimated to 220 years of effort.

4. How big and responsive is the community? - If you are creating a project different than "Hello World", no matter how many books and tutorials there are, sooner or later you will have questions. And this is another area where Camel shines. According to Apache stats, Camel users and developers lists have around 700 subscribers combined who send 45 messages per day in total on average. If you ask a question, the chances are, you will get a response in a matter of minutes. On Spring Integration forums there are 24,355 messages since the start of the project, which on average means 13 messages a day.

Jira stats for last 30 days
5. How long it takes to fix a bug? - Personally for me this is a very important metric about open source projects. Imagine you found a bug in the project, a small bug that makes your day miserable. Then you go the extra mile to fix it, prove it with tests, submit a patch, describe it on the project forum/mailing list, but it never gets reviewed or included in the next release... Looking at the reported/resolved issues ratio for the last 30 days, as of mid January, Camel has resolved 80 out of 90 reported issues, whereas SI resolved 3 out of 19. But I strongly believe this is due to recent holiday season and the graph below will look better for SI later in the year.

6. How good is the documentation? - Both projects have quite extensive documentation, tutorials and plenty of blog posts. There are 3 books published covering Spring Integration - having read all of them, I can say more or less they talk about EIPs and repeat Spring Integration documentation but with better graphics. Camel for now has only one book published - Camel in Action by Claus Ibsen and Jonathan Anstey. There are aslo plenty of blog posts, tutorials and webinars for both projects out there. There is one thing that Spring Integration is missing though - CamelOne conference ;)

7. Is there good tool support? - SpringSource develops Eclipse based Spring Tool Suite(STS) which has visual development tools for SI. IntelliJ IDEA also has SI support, but that is limitted to autocompliting varios endpoint options.
Spring Tool Suite
The most popular tool for developing Camel routes is Fuse IDE from FuseSource, which is also Eclipse based tool. There is also another Eclipse based graphical tool for Camel - Talend ESB
Fuse IDE
8. Can I get a commercial support if I need it? - If you have more money than time, both projects have companies offering commercial support. This is useful when you don't have much time to spent for asking questions on the mailing lists or browsing the source code. There are also plenty of independent consultants you can reach through the mailing lists.

These are only some of the questions you should ask yourself before adding one or the other dependency to your product. The more specific questions you should ask depends on your product portfolio,  project stack, team abilities and motivation to learn new technologies.

Disclaimer: I'm Apache Camel committer and I absolutely adore Camel. I've also worked on projects where we chose Spring Integration over Apache Camel.

Monitoring Camel applications on the Cloud

Apache Camel is the swiss army knife of application integration. If your application is processing data from different systems, sooner or later you will end up using some of the 130 (and growing number of) connectors available. Camel also has excellent cloud support. You can use either jCoulds component that provides portable service abstraction layer for Amazon, GoGrid, VMWare, Azure, and Rackspace or use Amazon specific components for SQS, SNS, S3, SES, SDB, DDB (some of which contributed by me).

In this tutorial I'm going to show you how to monitor Camel applications with JMX and Amazon CloudWatch service. The need for such a monitoring arose after I created livephotostream - the photo streaming application I created while playing with Camel. This application retrieves real time photos from Twitter streaming api, and not so real time data from Instagram and Tumblr, then display them using Bootstrap and sometimes Websocket. And I wanted to see how many photos it is receiving, how many of them are failing, how long it takes to process a photo and so on. So I end up creating a camel-cloudwatch component and a simple application demonstrating how to use it to display JMX metrics.

Amazon CloudWatch
Amazon CloudWatch provides monitoring for AWS cloud resources by tracking various metrics and activating alarms based on these metrics. It enables you to monitor AWS resources in real-time such as EC2 instances, EBS volumes, Load Balancers, RDS instances… But for my message-oriented application in addition to CPU utilisation, memory usage, SQS messages, etc... I want to see also metrics specific to my Camel application, such as ExchangesCompleted, ExchangesFailed, MaxProcessingTime, MeanProcessingTime, or number of exchanges processed by a given route or even by a specific processor in a route. Luckily Camel tracks and exposes these metrics over JMX. You can read about how to access these metric with jConsole or FuseIDE on Michał Warecki excellent blog post. But connection to a java application running on AWS instance over JMX, using jConsonle or other application is not a joy. Instead I'd rather see these metrics on the nice graphs CloudWatch provides and even get notified if any of the metrics reaches a threshold value.

This is when the new cloudwatch producer becomes useful. It lets you send custom metrics to amazon CloudWatch service. All you have to do is give a namespace to your metrics, and send each metric by specifying its name, value and optionally the unit and timestamp. If you don't specify the unit and timestamp it will use count as unit and the current time as the metric time.

Camel-JMX
Let's say we want to tract the number of failed exchanges in a Camel route. This metric can be retrieved using the jmx component, which lets you subscribe to an mbean's Notifications. The jmx consumer below will cause a new MonitorBean to be created and deployed to the local mbean server that monitors the "ExchangesFailed" attribute on the "processingRoute" bean. And if that attribute value changes, this consumer will send a new message containing the change - i.e. the number of failures so far. After that all you have to do is set that value as CamelAwsCwMetricValue message header and give it a name with the CamelAwsCwMetricName header. The rest will be handled by the cloudwatch producer.

The route above doesn't require any additional code. It will send any changes to the ExchangesFailed attribute to CloudWatch service, and you can see the error count not changing in the CloudWatch graph:

The downside of using jmx consumer to monitor a bean is that you will need to create separate consumer and route for each bean and attribute monitored. Another approach is to have a processor that can access the beans from the JMX server, read all the attributes of interest and send all of them at once again using the cloudwatch component.

In this scenario you will need also a timer to trigger the polling. The cloudwatch producer can generate a metric based on message headers as in the first example, or simply send the metric objects from the message body if present as in the second example. The processor for retrieving data from camel jmx is also straightforward and can be seen in the demo app on my guthub account.

The result from the last route is couple of metrics, all of which sent to CloudWatch every minute. As you can see from the next image the number of exchanges completed successfully is increasing all the time, but with a different rate:

From the other hand, the mean processing time (in millis) is different for each message but in close interval and not growing drastically.

If the new metrics doesn't exist in CloudWatch, it will be created for you, but if you want to set up alarms and trigger automatic actions, you still have to log into amazon console and to it manually. And finally don't forget to check your "cloud bill" after adding all these metrics, they are not for free ;)