Blogroll

Spring Integration 2.2 is out, it is time for another comparison with Apache Camel

A year after v2.1 had been released, Spring Integration v2.2 is out with 3 new components MondoDB, Redis and JPA (even though the first 2 were listed also in v2.1 as message store, now they are available as Inbound and Outbound Channel Adapters), retry and other new features.
As a comparison Apache Camel has also released v2.10 during the same period with around 18 new components. JPA support in Camel is available from long time, MongoDB since v2.10 and Redis since my commit couple of days ago. But of course the number of connectors (more than 130 for Camel) is not the most important characteristic for an integration framework (some might argue that). There are many other factors which matter when choosing an open source project for your needs. A quick search on the internet will show you which are the main questions you should ask before deciding which open source project to use for your enterprise. Here are only couple of them and my biased answers:

1. What is the license?
- Apache Camel (like all Apache projects) and Spring Integration both are using Apache License version 2.0 which is one of the most permissive licences. It basically says do whatever you want with the software.

2. Is the project actively developed? - According to ohloh.net for the last 12 months Apache Camel has 2415 commits from 24 committers whereas Spring Integration has 949 commits from 17 committers.

3. How mature is the project? - Both projects have started in 2007 and currently Spring Integration has 234K lines of code and is estimated to 60 years of effort, whereas Apache Camel has 800K lines of code and estimated to 220 years of effort.

4. How big and responsive is the community? - If you are creating a project different than "Hello World", no matter how many books and tutorials there are, sooner or later you will have questions. And this is another area where Camel shines. According to Apache stats, Camel users and developers lists have around 700 subscribers combined who send 45 messages per day in total on average. If you ask a question, the chances are, you will get a response in a matter of minutes. On Spring Integration forums there are 24,355 messages since the start of the project, which on average means 13 messages a day.

Jira stats for last 30 days
5. How long it takes to fix a bug? - Personally for me this is a very important metric about open source projects. Imagine you found a bug in the project, a small bug that makes your day miserable. Then you go the extra mile to fix it, prove it with tests, submit a patch, describe it on the project forum/mailing list, but it never gets reviewed or included in the next release... Looking at the reported/resolved issues ratio for the last 30 days, as of mid January, Camel has resolved 80 out of 90 reported issues, whereas SI resolved 3 out of 19. But I strongly believe this is due to recent holiday season and the graph below will look better for SI later in the year.

6. How good is the documentation? - Both projects have quite extensive documentation, tutorials and plenty of blog posts. There are 3 books published covering Spring Integration - having read all of them, I can say more or less they talk about EIPs and repeat Spring Integration documentation but with better graphics. Camel for now has only one book published - Camel in Action by Claus Ibsen and Jonathan Anstey. There are aslo plenty of blog posts, tutorials and webinars for both projects out there. There is one thing that Spring Integration is missing though - CamelOne conference ;)

7. Is there good tool support? - SpringSource develops Eclipse based Spring Tool Suite(STS) which has visual development tools for SI. IntelliJ IDEA also has SI support, but that is limitted to autocompliting varios endpoint options.
Spring Tool Suite
The most popular tool for developing Camel routes is Fuse IDE from FuseSource, which is also Eclipse based tool. There is also another Eclipse based graphical tool for Camel - Talend ESB
Fuse IDE
8. Can I get a commercial support if I need it? - If you have more money than time, both projects have companies offering commercial support. This is useful when you don't have much time to spent for asking questions on the mailing lists or browsing the source code. There are also plenty of independent consultants you can reach through the mailing lists.

These are only some of the questions you should ask yourself before adding one or the other dependency to your product. The more specific questions you should ask depends on your product portfolio,  project stack, team abilities and motivation to learn new technologies.

Disclaimer: I'm Apache Camel committer and I absolutely adore Camel. I've also worked on projects where we chose Spring Integration over Apache Camel.

Monitoring Camel applications on the Cloud

Apache Camel is the swiss army knife of application integration. If your application is processing data from different systems, sooner or later you will end up using some of the 130 (and growing number of) connectors available. Camel also has excellent cloud support. You can use either jCoulds component that provides portable service abstraction layer for Amazon, GoGrid, VMWare, Azure, and Rackspace or use Amazon specific components for SQS, SNS, S3, SES, SDB, DDB (some of which contributed by me).

In this tutorial I'm going to show you how to monitor Camel applications with JMX and Amazon CloudWatch service. The need for such a monitoring arose after I created livephotostream - the photo streaming application I created while playing with Camel. This application retrieves real time photos from Twitter streaming api, and not so real time data from Instagram and Tumblr, then display them using Bootstrap and sometimes Websocket. And I wanted to see how many photos it is receiving, how many of them are failing, how long it takes to process a photo and so on. So I end up creating a camel-cloudwatch component and a simple application demonstrating how to use it to display JMX metrics.

Amazon CloudWatch
Amazon CloudWatch provides monitoring for AWS cloud resources by tracking various metrics and activating alarms based on these metrics. It enables you to monitor AWS resources in real-time such as EC2 instances, EBS volumes, Load Balancers, RDS instances… But for my message-oriented application in addition to CPU utilisation, memory usage, SQS messages, etc... I want to see also metrics specific to my Camel application, such as ExchangesCompleted, ExchangesFailed, MaxProcessingTime, MeanProcessingTime, or number of exchanges processed by a given route or even by a specific processor in a route. Luckily Camel tracks and exposes these metrics over JMX. You can read about how to access these metric with jConsole or FuseIDE on MichaÅ‚ Warecki excellent blog post. But connection to a java application running on AWS instance over JMX, using jConsonle or other application is not a joy. Instead I'd rather see these metrics on the nice graphs CloudWatch provides and even get notified if any of the metrics reaches a threshold value.

This is when the new cloudwatch producer becomes useful. It lets you send custom metrics to amazon CloudWatch service. All you have to do is give a namespace to your metrics, and send each metric by specifying its name, value and optionally the unit and timestamp. If you don't specify the unit and timestamp it will use count as unit and the current time as the metric time.

Camel-JMX
Let's say we want to tract the number of failed exchanges in a Camel route. This metric can be retrieved using the jmx component, which lets you subscribe to an mbean's Notifications. The jmx consumer below will cause a new MonitorBean to be created and deployed to the local mbean server that monitors the "ExchangesFailed" attribute on the "processingRoute" bean. And if that attribute value changes, this consumer will send a new message containing the change - i.e. the number of failures so far. After that all you have to do is set that value as CamelAwsCwMetricValue message header and give it a name with the CamelAwsCwMetricName header. The rest will be handled by the cloudwatch producer.

The route above doesn't require any additional code. It will send any changes to the ExchangesFailed attribute to CloudWatch service, and you can see the error count not changing in the CloudWatch graph:

The downside of using jmx consumer to monitor a bean is that you will need to create separate consumer and route for each bean and attribute monitored. Another approach is to have a processor that can access the beans from the JMX server, read all the attributes of interest and send all of them at once again using the cloudwatch component.

In this scenario you will need also a timer to trigger the polling. The cloudwatch producer can generate a metric based on message headers as in the first example, or simply send the metric objects from the message body if present as in the second example. The processor for retrieving data from camel jmx is also straightforward and can be seen in the demo app on my guthub account.

The result from the last route is couple of metrics, all of which sent to CloudWatch every minute. As you can see from the next image the number of exchanges completed successfully is increasing all the time, but with a different rate:

From the other hand, the mean processing time (in millis) is different for each message but in close interval and not growing drastically.

If the new metrics doesn't exist in CloudWatch, it will be created for you, but if you want to set up alarms and trigger automatic actions, you still have to log into amazon console and to it manually. And finally don't forget to check your "cloud bill" after adding all these metrics, they are not for free ;)

Live Photo Streaming from Twitter, Instagram and Tumblr

The Good
UPDATE: this project is dead now. It didn't turn into a successful startup at the end :)
Streaming real time images from Twitter during the London Olympics was fun. Camelympics turned out to be an interesting(mostly for the developers) project:

Claus Ibsen (the core Camel rider) blogged about it and it was mentioned on DZone.

Bruno Borges' talk on "Leverage Enterprise Integration Patterns with Apache Camel and Twitter" at JavaOne conference (slides available here) had the demo inspired by Camelympics.

Streamed millions of images, had more than 1024 visitors on my blog and 5 comments - not bad. All that with less than 128 lines of Camel code, a little javascript to rotate the images on the client side and powered by free AWS micro instance. Camel is tiny enough to do real time streaming, filtering, throttling, serializing, and serving pages with half a gig memory.

The Bad
The Olympics are over. Limiting photos to a certain topic or tag is fine during an events, but not good enough in long term.

Websockets are not widely supported yet. IE doesnt support them, mobile browsers doesn't support them. The only Android browser I found with websocket support is Dolphin Broser, but then 3G networks are not fast enoug to cope with real time (even throttled) image streaming. It needs at least another year before Websockets become widely useful.

Twitter introduced new Developer Rules and limits to make life harder.

As Bruno proved during the live Twitter demo at JavaOne conference, Twitter is the world's largest nsfw photo streaming service, and there is nothing you can do about it (yet).

The Pivot

I extended the application, so in addition to Twitter it also retrieves photos from Instagram and Tumblr in "real time". Tumblr doesn't have a real time api, and Instagram's real time api, based on tags is useless. So I had to come up with some clever ways to give the same real experience for the user.

Added search functionality. It is actually a real time filtering functionality - the application will monitor Twitter, Instagram and Tumblr for photos with a given tag and stream only these images as soon as they are posted. I am not aware of other application doing it.

Decided to replace the custom styling with Bootstrap, it just works.

and the result is...
The experiment continues, looking forward to hear your constructive feedback.

Olympics image loader powered by Camel

Live #Olympics Image Stream
This is a very short post about a very simple application. Inspired by the London 2012 Olympics and based on Apache Camel examples, I've created an application that displays twitter images related to the Olympics in real time. It listens for tweets containing images, filters out duplicates, and sends the images to the browser using websockets every 5 seconds. See how simple is the application on my github account.

This is an end-to-end push application: once a user pushes an image to twitter, twitter pushes the image to the camel application, and then the camel application pushes the image to all the clients. It is built using only "free stuff": Twitter's free streaming API, Apache Camel framework, Amazon's free micro instance and done during my free time. Here is the essence:

Line by line explanation
Twitter pushes tweets containing the #Olympics and #London2012 tags to the app:
from("twitter://streaming/filter?type=event&keywords=" + searchTerm)
Log statistical information about the number of messages every minute. Not visible the users:
.to("log:tweetStream?level=INFO&groupInterval=60000&groupDelay=60000&groupActiveOnly=false")
Extract images from the tweets which has media associated with:
.process(new ImageExtractor())
Put the current number of tweets and images in the message:
.process(new Statistics())
Filter out all the tweets which doesn't contain images:
.filter(body().isInstanceOf(Tweet.class))
Filter out duplicated images, identified by their url:
.idempotentConsumer(header(UNIQUE_IMAGE_URL), MemoryIdempotentRepository.memoryIdempotentRepository(10000))
Log again, the messages which reached this far in the route:
.to("log:imageStream?level=INFO&groupInterval=60000&groupDelay=60000&groupActiveOnly=false")
Let images go with 5 seconds difference, so the user can enjoy it. Also important - don't block the twitter listener by using callerRunsWhenRejected if the image buffer fills up, Twitter will block you:
.throttle(1).timePeriodMillis(5000).asyncDelayed().callerRunsWhenRejected(false)
Serialize into json:
.marshal().json(JsonLibrary.Jackson)
Push it to the users:
.to("websocket:camelympics?sendToAll=true");
The application can be run also locally and allows filtering images not only for the Olympics but for any keywords passed as argument. Don't forget to use your own twitter oath tokens when using locally though.

Enjoy the Olympics.

Content Migration with Camel-CMIS

Some time ago(actually quite a long time ago now) I was playing with CMIS using Apache Chemistry, Camel, Talend… and created a camel-cmis connector. And recently this component reached camel trunk, so here are some example how it can be used.
Motivation
Content Management Interoperability Services (CMIS) is an open standard that defines an abstraction layer for accessing diverse document management systems and repositories using web protocols. Administered by OASIS and backed by Adobe, Alfresco, HP, IBM, Microsoft, Oracle and many more, it defines data model and query language for accessing content repositories in a language agnostic way.
If you are already familiar with Java Content Repository API (JCR), CMIS is kind of similar. JCR specifies an API while CMIS specifies protocol bindings. Much like the Servlet API in Java and the HTTP protocol are complementary this is also the case for JCR and CMIS.

Apache Chemistry project provides open source implementations of the CMIS specification. It has implementations in Java, Python, PHP and .Net.

CAMEL-CMIS
The component uses OpenCMIS (Apache Chemistry's Java implementation) and contains a producer and a consumer. The producer can be used in two ways:

Create nodes (documents with content data, folders and other node types) from Camel messages. In the example below, the route will copy all of the files from the local file system into a demo directory in Alfresco demo cmis server using the file name as node name.

Query the content repository using cmis query language. The result of the query is a list in the body of the message. The route below will retrieve all documents with name containing the word 'camel', split them and store each one into the local file system, using the document name as the file name.

The consumer accepts a cmis query through the options. If the query string is not provided, the consumer will iterate the complete content tree recursively starting from the root node. Using this option you can replicate the complete cmis structure into another cmis repository (as in the following example, which tries to copy the complete alfresco repo into nuxeo one - don't do this at home) or back it up into the local file system. Notice that to retrieve the actual content data for document nodes, you have to set the readContent option additionally.

If the target repository doesn't support support CMIS, but JCR try replacing the endpoint with the camel-jcr component, or choose something else from one of the 120 existing Camel components.

Connect Apache OFBiz with the Real World

What would you expect from someone who is OFBiz and Camel committer? To integrate them for fun? Fine, here it is. In addition to being fun, I believe this integration will be of real benefit for the OFBiz community, because despite the fact of being a complete ERP software, OFBiz lacks the ability to easily integrate with external systems. The goal of this project is instead of reinventing the wheel and trying to integrate OFBiz with each system separately, integrate it with Camel and let Camel do what it does best: connect your application with every possible protocol and system out there.

Quick OFBiz introduction
Apache OFBiz LogoThe Apache Open For Business Project is an open source, enterprise automation software. It consist mainly from two parts:
A full-stack framework for rapid business application development. It has Entity Engine for the data layer (imagine something like iBATIS and Hibernate combined). It is the same entity engine that powers millions of Attlasian Jira instances. But don't get me wrong, it is not meant for usage outside of OFBiz framework, so use it only as OFBiz data layer.Service Engine - this might be hard to grasp for someone only with DDD background, but OFBiz doesn't have any domain objects. Instead for the service layer it uses SOA and has thousands of services that contains the business logic. A service is an atomic bit of isolated business logic, usually reading and updating the database. If you need you can make services triggering each other using ECAs(event-condition-action) which is kind of rule engine that allows define pre/post conditions for triggering other service calls when a service is executed. The service itself can be written written in java, groovy or simple language (an XML DSL for simple database manipulation) and usually requires authentication, authorisation and finally executed in a transaction.
UI widgets - an XML DSL which allows you easily create complex pages with tables, forms and trees.
And the really great thing about this framework is that 'The whole is greater than the sum of its parts' - all of the above layers works together amazingly: if you have an entity definition (a table) in your data layer, you can use it in your service layer during your service interface definition or its implementation. It takes one line of code(a long one) to create a service which has as input parameters the table columns and return the primary key as result of the service. Then if you are creating a screen with tables or forms, you can base it on your entity definitions or service definitions. It is again only few lines of code to create a form with fields mapping to a service or entity fields.

Out of the box business applications. These are vertical applications for managing the full life cycle of a business domain like ordering, accounting, manufacturing and many more in a horizontally integrated manner. So creating an order from order or ecommerce application will interact with facility to check whether a product is available, and after the order is created will create accounting transaction in accounting application. Before the order is shipped from the facility, it will create invoices and once the invoice is paid, it will close the order. You get the idea.

Camel in 30 seconds
Apache Camel logoApache Camel is an integration framework based on known Enterprise Integration Patterns(EIP). Camel can also be presented as consisting of two artifacts:
The routing framework which can be defined using java, scala, xml DSL with all the EIPs like Pipe, Filter, Router, Splitter, Aggregator, Throttler, Normalizer and many more.
Components and transformers ie all the different connectors to more than 100 different applications and protocols like: AMQP, AWS, web services, REST, MongoDB, Twitter, Websocket, you name it.
If you can imagine a tool, that enables you to consume data from one system, then manipulate the data (transform, filter, split, aggregate) and send it to other systems, using a declarative, concise, English-like DSL without any boilerplate code - that's Apache Camel.

Let OFBiz talk to all of the systems Camel do
The main interaction point with OFBiz are either by using the Entity Engine for direct data manipulation or by calling services through Service Engine. The latter is preferred because it ensures that the user executing the service is authorised to do so, the operation is transactional to ensure data integrity, and also all the business rules are satisfied (there might be other services that have to be executed with ECA rules). So if we can create an OFBiz endpoint in Camel and execute OFBiz services from Camel messages, that would allow OFBiz to receive notifications from Camel endpoints. What about the other way around - making OFBiz notify Camel endpoints? The ideal way would be to have an OFBiz service that sends the IN parameters to Camel endpoints as message body and headers and return the reply message as OFBiz service response.
If you are wondering: why is it so great, what is an endpoint, where is the real world, who is gonna win Euro2012... have a look at the complete list of available Camel components, and you will find out the answer.

Running Camel in OFBiz container
I've started an experimental ofbiz-camel project on github which allows you to do all of the above. It demonstrates how to poll files from a directory using Camel and create notes in OFBiz with the content of the file using createNote service. The project also has an OFBiz service, that enables sending messages from OFBiz to Camel. For example using that service it is possible to send a message to Camel file://data endpoint, and Camel will create a file in the data folder using the service parameters.
The integration between OFBiz and Camel is achieved by running Camel in an OFBiz container as part of the OFBiz framework. This makes quite tight integration, but ensures that there will not be any http, rmi or any other overhead in between. It is still WIP and may change totally.

Running Camel and OFBiz separately
Another approach is KISS: run Camel and OFBiz as they are - separate applications, and let them interact with RMI, WS* or something else. This doesn't require any much coding, but only configuring both systems to talk to each other. I've created a simple demo camel-ofbiz-rmi which demonstrates how to listen for tweets with a specific keyword and store them in OFBiz as notes by calling createNote service using RMI. It uses Camel's twitter and rmi components and requires only configuration. Notice that this example demonstrates only one way interaction: from Camel to OFBiz. In order to invoke a Camel endpoint from OFBiz you can you have to write some RMI, WS* or other code.

PS: I'm looking forward to hear your real world integration requirements for OFBiz.

How to keep your content repository and Solr in synch using Camel

With recent contributions to Camel, now camel-jcr component has a consumer which allows monitoring a Java Content Repository for changes. If your jcr supports OPTION_OBSERVATION_SUPPORTED then the consumer will register an EventListener and get notified for all kind of events. The chances are that you are not interested in all the events from the whole repository and in this case it is possible to narrow down the notifications to receive by further specifying the path of interest, event types, node uuids, nodeTypes and etc.

How can this consumer be useful? (hhmmm, you tell me) Lets say we have a CMS and we want to keep our external Solr index in synch with the content updates.So whenever a new node is added to the content repository all of its properties get indexed in Solr, and if the node is deleted from the content repository then corresponding document is removed from Solr.

Here is a Camel route that will listen for changes under /path/site folder and all its children. But this route will get notified only for two kind of events: NODE_ADDED and NODE_REMOVED, because the value of eventTypes option is a bit mask of the event types of interest (in this case 3 for masking 1 and 2 respectively).
from("jcr://username:password@repository/path/site?deep=true&eventTypes=3")
   .split(body())
   .choice()

   .when(script("beanshell", "request.getBody().getType() == 1"))
   .to("direct:index")

   .when(script("beanshell", "request.getBody().getType() == 2"))
   .to("direct:delete")

   .otherwise()
   .log("Event type not recognized" + body().toString());
Then the route will split each event into a separate message and depending on the event type will send the node creation events to direct:index route and node deletion events to direct:delete route.

Delete route is a simple one: It sets the solr operation to delete_by_id in the message header
and the node identifier into message body which in our case represents also the uniqueKey in the solr schema. Followed by a solr commit.
from("direct:delete")
   .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_DELETE_BY_ID))
   .setBody(script("beanshell", "request.getBody().getIdentifier()"))

   .log("Deleting node with id: ${body}")
   .to(SOLR_URL)

   .setHeader("SolrOperation", constant("COMMIT"))
   .to(SOLR_URL);
Indexing part consist of two routes, where the nodeRetriever route is actually getting the node from content repository using its identifier from the update event:
from("direct:nodeRetriever")
   .setHeader(JcrConstants.JCR_OPERATION, constant(JcrConstants.JCR_GET_BY_ID))
   .setBody(script("beanshell", "request.getBody().getIdentifier()"))

   .log("Reading node with id: ${body}")
   .to("jcr://admin:admin@repository");
After the node is retrieved from the repository using content enricher EIP, there is also a processor
to extract node properties and set them into Camel message properties so that they get indexed as solr document fields.
from("direct:index")
   .enrich("direct:nodeRetriever", nodeEnricher)
   .process(jcrSolrPropertyMapper)

   .log("Indexing node with id: ${body}")
   .setHeader("SolrOperation", constant("INSERT"))
   .to(SOLR_URL);
You can find the complete working example on github. In case your CMS is not a JCR, but CMIS compliant, have a look at this cmis component on my github account.

Indexing data in Solr from disparate sources using Camel

Apache Solr is "the popular, blazing fast open source enterprise search platform" built on top of Lucene.  In order to do a search (and find results) there is the initial requirement of data ingestion usually from disparate sources like content management systems, relational databases, legacy systems, you name it... Then there is also the challenge of keeping the index up to date by adding new data, updating existing records, removing obsolete data. The new sources of data could be the same as the initial ones, but could also be sources like twitter, AWS or rest endpoints.

Solr can understand different file formats and provides fair amount of options for data indexing:
  1. Direct HTTP and remote streaming - allows you to interact with Solr over HTTP by posting a file for direct indexing or the path to the file for remote streaming.
  2. DataImportHandler - is a module that enables both full and incremental delta imports from relational databases or file system.
  3. SolrJ - a java client to access Solr using Apache Commons HTTP Client.
But in real life, indexing data from different sources with millions of documents, dozens of transformations, filtering, content enriching, replication, parallel processing  requires much more than that. One way to cope with such a challenge is by reinventing the wheel: write few custom applications, combine them with some scripts or run cronjobs. Another approach would be to use a tool that is flexible and designed to be configurable and plugable, that can help you to scale and distribute the load with ease. Such a tool is Apache Camel which has also a Solr connector now.

All started few months ago, during basecamp days at Sourcesense, where me and my colleague Alex were experimenting with different projects to implement a pipeline for indexing data into Solr. As expected we discovered Camel and after few days of pairing, we were ready with the initial version of the Solr component which got committed to Camel and extended further by Ben Oday. At the moment it is full featured Solr connector, that uses SolrJ behind the scene and lets you to: configure all parameters of SolrServer and StreamingUpdateSolrServer;  supports the operations: insert, add_bean, delete_by_id, delete_by_query, commit, rolback, optimize; index files, SolrInputDocument instances, beans with annotations or individual message headers.

Creating a Camel route to index all the data from a relational database table and local file system is simple:
public void configure() {
from("timer://clear?repeatCount=1")
        .to("direct:clearIndex");

from("file:src/data?noop=true")
        .to("direct:insert");

from("timer://database?repeatCount=1")
        .to("sql:select * from products?dataSourceRef=productDataSource")
        .split(body())
        .process(new SqlToSolrMapper())
        .to("direct:insert");

from("direct:insert")
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_INSERT))
        .to(SOLR_URL)
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_COMMIT))
        .to(SOLR_URL);

from("direct:clearIndex")
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_DELETE_BY_QUERY))
        .setBody(constant("*:*"))
        .to(SOLR_URL)
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_COMMIT))
        .to(SOLR_URL);
}
The above route will first clear the index by deleting all the documents followed by a commit. Then it will start polling files from src/data folder, read each file and send it to Solr endpoint. Assuming that the files are in a format Solr can understand, they will be indexed and committed. The third route will retrieve all the products from database (in memory), split them into individual records, map each record to Solr fields, and digest :)

Luckily, in 2012, the life of software developer is not that simple boring. Instead nowadays a more realistic indexing requirement would consist of something like this:

1. Get the backup files from amazon S3 and index. If a document is approved, commit it as soon as  possible, otherwise commit every 10 minutes.

How can Camel help you with this requirement? Camel supports most popular amazon APIs including S3. Using aws-s3 component, it is possible to read files from a S3 bucket, then apply a filter for approved documents, in order to send them into a separate route for instant commit.
<route>
  <from uri="aws-s3://MyBucket?delay=5000&maxMessagesPerPoll=5"/>
  <choice>
    <when>
      <xpath>/add/doc[@status='approved']</xpath>
      <to uri="direct:indexAndCommit"/>
    </when>
    <otherwise>
      <to uri="direct:index"/>
    </otherwise>
  </choice>
</route>
<route>
  <from uri="timer://commit?fixedRate=true&period=600s"/>
  <from uri="direct:commit"/>
</route>
2. Retrieve customer data from database every 5 seconds by reading10 records at a time. Also look for deltas. Enrich the address data with latitute/longitute by calling XXX external service to facilitate spatial search in Solr.
<route id="fromDB">
  <from uri="jpa://com.ofbizian.pipeline.Customer?consumer.namedQuery= newCustomers&amp;maximumResults=10&amp;delay=5000"/>
  <enrich uri="direct:coordinateEnricher" strategyRef="latLongAggregationStrategy"/>
  <to uri="direct:index"/>
</route>

<route>
  <from uri="direct:coordinateEnricher"/>
  <setHeader headerName="CamelHttpQuery">
    <simple>address='${body.address}'&amp;sensor=false</simple>
  </setHeader>
  <to uri="http://maps.google.com/maps/api/geocode/xml"/>
  <setHeader headerName="lat">
    <xpath resultType="java.lang.Double">//result[1]/geometry/location/lat/text()</xpath>
  </setHeader>
  <setHeader headerName="lng">
    <xpath resultType="java.lang.Double">//result[1]/geometry/location/lng/text()</xpath>
  </setHeader>
</route>
The above route reads from Customer table 10 records at a time, and for each one will call google's maps API to get latitude and longitude using the customer address field. The coordinates are extracted from response using XPath and merged back into Customer object. Simple, isn't it.

3. Index the content under this/that/path in our content management system and also monitor for updates.
<route>
  <from uri="jcr://user:pass@repository/import/inbox/signal?eventTypes=3&deep=true&synchronous=false"/>
  <to uri="direct:index"/>
</route>
Camel has a jcr connector, which allows you to create content in any java content repository. There is also an improvement submitted in CAMEL-5155 which will allow reading content from JCR v.2 supporting repositories soon.
If you are lucky and your CMS supports CMIS you can use my camel-cmis connector from github for the same purpose.

4. Listen for tweets about our product/company, do sentiment analysis, and index only positive tweets.
<route id="fromTwitter">
  <from uri="twitter://streaming/filter?type=event&keywords=productName&consumerKey={{consumer.key}}&consumerSecret={{consumer.secret}}"/>
  <setHeader headerName="CamelHttpQuery">
    <language language="beanshell">
      "q=" + java.net.URLEncoder.encode(request.getBody().getText(), "UTF-8")
    </language>
  </setHeader>
  <throttle timePeriodMillis="1500">
    <constant>1</constant>
    <to uri="http://data.tweetsentiments.com:8080/api/analyze.xml"/>
    <setHeader headerName="sentiment">
      <xpath resultType="java.lang.Double">/sentiment/value/text()</xpath>
    </setHeader>
    <filter>
      <simple>${in.header.sentiment} > 0</simple>
      <to uri="direct:index"/>
    </filter>
  </throttle>
</route>
This route is going to listen for tweets using twitter's real time api, url encode the tweet and call tweetsentiments api for sentiment analysis. In addition it will apply throttling, so only one request at most is made every 1500 milliseconds, because there is restriction on the number of calls per second. Then the route is applying a filter to ignore all the negative tweets, before indexing.

As you can see Camel can interact with many disparate systems (including Solr) easily, and even if you have a very custom application, writing a connector for it would not be difficult. But this is only one side of the story. At the other side, there is a full list of Enterprise Integration Patterns implemented by Camel which are needed for any serious data ingestion pipeline: Routers, Translator, Filter, Splitter, Aggregator, Content Enricher, Load Balancer... Last but not least: Exception Handling, Logging, Monitoring, DSLs... In two words: Camel Rocks!

PS: The full source code of the examples can be found on my github account.

My first commit to Camel - FOP component

After being invited to join the Apache Camel team, it is time for my first commit. It is actually for one of my very first contributions to Camel that was waiting in the jira for long time - a camel-fop component.
This component uses Apache FOP project to render messages into variety of output formats such as PDF, PostScript, RTF, PNG etc.

Camel-fop has only a producer, which expects messages containing XML data in XSL-FO format in the body. Assuming that your data and representation are separate, xsl-fo can be generated from xml data and xslt template, or instead using freemarker or velocity templates and passing the data with message headers.
Once the content is ready in the message body, camel-fop will transform it to the output format specified in the endpoint URL or message header.

It is also possible to set some metadata for each document (like producer, author, creation date, resolution...) or encrypt the document with password.

Additionally it allows you to override the default fop configuration file by specifying a new userConfig location in the endpoint URL. With a custom configuration you can control fop's rendering behavior and help it find its resources.

Here is an example route that creates PDFs from xml data and xslt template:
      .from("file:source/data/xml")
          .to("xslt:xslt/template.xsl")
          .to("fop:application/pdf")
          .to("file:target/data");
To see the example files in actions, check the test folder in camel-fop component.
Let me know if you find this component any useful.

REST with Apache Camel

There are many ways to expose an HTTP endpoint in Camel: jetty, tomcat, servlet, cxfrs and restlet. Two of these components - cxfrs and restlet also support REST semantics just with few lines of code. This simple example demonstrates how to do CRUD operations with camel-restlet and camel-jdbc.

The four HTTP verbs execute different operations and map to the following single URI template:
  •  POST - create a new user: /user
  •  GET - request the current state of the user specified by the URI: /user/{userId}
  •  PUT - update an user at the given URI with new information: /user/{userId}
  •  DELETE - remove the user identified by the given URI: /user/{userId}
  • There is also a /users URI which returns all the users regardless of the HTTP method used.
Creating such an application with Camel is straightforward. After adding all the necessary dependencies (restlet, spring, jdbc...) configure web.xml to load Camel context:
<context-param>
  <param-name>contextConfigLocation</param-name>
  <param-value>classpath:camel-config.xml</param-value>
</context-param>
<listener>
  <listener-class>org.springframework.web.context.ContextLoaderListener   </listener-class>
</listener>
  and map the Restlet servlet
<servlet>
  <servlet-name>RestletServlet</servlet-name>
  <servlet-class>org.restlet.ext.spring.SpringServerServlet</servlet-class>
  <init-param>
    <param-name>org.restlet.component</param-name>
    <param-value>RestletComponent</param-value>
  </init-param>
</servlet>
<servlet-mapping>
  <servlet-name>RestletServlet</servlet-name>
  <url-pattern>/rs/*</url-pattern>
</servlet-mapping>
In the Spring context, there is a little more Restlet and an in-memory datasource setup code:
<bean id="RestletComponent" class="org.restlet.Component"/>
<bean id="RestletComponentService" class="org.apache.camel.component.restlet.RestletComponent">
   <constructor-arg index="0">
     <ref bean="RestletComponent"/>
   </constructor-arg>
</bean>
<jdbc:embedded-database id="dataSource" type="HSQL">
   <jdbc:script location="classpath:sql/init.sql"/>
</jdbc:embedded-database>

After all the setup is done, the next step is to create Camel routes that will process the HTTP requests and execute appropriate CRUD operations. The first one is createUser route that executes SQL insert command with the parameters from POST requests only and return the newly created user in the response body:
<route id="createUser">
   <from uri="restlet:/user?restletMethod=POST"/>
   <setBody>
     <simple>insert into user(firstName, lastName) values('${header.firstName}','${header.lastName}');  </simple>
   </setBody>
   <to uri="jdbc:dataSource"/>
   <setBody>
     <simple>select * from user ORDER BY id desc LIMIT 1</simple>
   </setBody>
   <to uri="jdbc:dataSource"/>
</route>
The "manipulateUser" route handles GET, PUT and DELETE HTTP methods, but depending on the method used, it executes different SQL commands:

<route id="manipulateUser">
  <from uri="restlet:/user/{userId}?restletMethods=GET,PUT,DELETE"/>
  <choice>
    <when>
    <simple>${header.CamelHttpMethod} == 'GET'</simple>
    <setBody>
      <simple>select * from user where id = ${header.userId}</simple>
    </setBody>
   </when>
   <when>
     <simple>${header.CamelHttpMethod} == 'PUT'</simple>
       <setBody>
       <simple>update user set firstName='${header.firstName}', lastName='${header.lastName}' where id = ${header.userId}</simple>
       </setBody>
   </when>
   <when>
     <simple>${header.CamelHttpMethod} == 'DELETE'</simple>
     <setBody>
       <simple>delete from user where id = ${header.userId}</simple>
     </setBody>
   </when>
   <otherwise>
     <stop/>
   </otherwise>
  </choice>
  <to uri="jdbc:dataSource"/>
</route>
And the last route for listing all the users is self explanatory:
<route id="listUsers">
  <from uri="restlet:/users"/>
  <setBody>
    <constant>select * from user</constant>
  </setBody>
  <to uri="jdbc:dataSource"/>
</route>
If you want to see the application in action, grab the source code from github and run it with the embedded maven-jetty plugin by typing: mvn jetty:run .You can even try some quick queries if you have curl installed:

To create an user, make a http POST request with firstName and lastName parameters
curl -d "firstName=test&lastName=user" http://localhost:8080/rs/user/

To update an existing user, make a http PUT request with firstName and lastName parameters
curl -X PUT -d "firstName=updated&lastName=user" http://localhost:8080/rs/user/2

To retrieve an existing user, make a http GET request with the userId as part of the url
curl -X GET  http://localhost:8080/rs/user/2

To delete an existing user, make a http DELETE request with the userId as part of the url
curl -X DELETE  http://localhost:8080/rs/user/2

To retrieve all the existing users, make a http GET request to users url
curl -X GET  http://localhost:8080/rs/users

About Me