cxdx

Blogroll

Content Migration with Camel-CMIS

Some time ago(actually quite a long time ago now) I was playing with CMIS using Apache Chemistry, Camel, Talend… and created a camel-cmis connector. And recently this component reached camel trunk, so here are some example how it can be used.
Motivation
Content Management Interoperability Services (CMIS) is an open standard that defines an abstraction layer for accessing diverse document management systems and repositories using web protocols. Administered by OASIS and backed by Adobe, Alfresco, HP, IBM, Microsoft, Oracle and many more, it defines data model and query language for accessing content repositories in a language agnostic way.
If you are already familiar with Java Content Repository API (JCR), CMIS is kind of similar. JCR specifies an API while CMIS specifies protocol bindings. Much like the Servlet API in Java and the HTTP protocol are complementary this is also the case for JCR and CMIS.

Apache Chemistry project provides open source implementations of the CMIS specification. It has implementations in Java, Python, PHP and .Net.

CAMEL-CMIS
The component uses OpenCMIS (Apache Chemistry's Java implementation) and contains a producer and a consumer. The producer can be used in two ways:

Create nodes (documents with content data, folders and other node types) from Camel messages. In the example below, the route will copy all of the files from the local file system into a demo directory in Alfresco demo cmis server using the file name as node name.

Query the content repository using cmis query language. The result of the query is a list in the body of the message. The route below will retrieve all documents with name containing the word 'camel', split them and store each one into the local file system, using the document name as the file name.

The consumer accepts a cmis query through the options. If the query string is not provided, the consumer will iterate the complete content tree recursively starting from the root node. Using this option you can replicate the complete cmis structure into another cmis repository (as in the following example, which tries to copy the complete alfresco repo into nuxeo one - don't do this at home) or back it up into the local file system. Notice that to retrieve the actual content data for document nodes, you have to set the readContent option additionally.

If the target repository doesn't support support CMIS, but JCR try replacing the endpoint with the camel-jcr component, or choose something else from one of the 120 existing Camel components.

Connect Apache OFBiz with the Real World

What would you expect from someone who is OFBiz and Camel committer? To integrate them for fun? Fine, here it is. In addition to being fun, I believe this integration will be of real benefit for the OFBiz community, because despite the fact of being a complete ERP software, OFBiz lacks the ability to easily integrate with external systems. The goal of this project is instead of reinventing the wheel and trying to integrate OFBiz with each system separately, integrate it with Camel and let Camel do what it does best: connect your application with every possible protocol and system out there.

Quick OFBiz introduction
Apache OFBiz LogoThe Apache Open For Business Project is an open source, enterprise automation software. It consist mainly from two parts:
A full-stack framework for rapid business application development. It has Entity Engine for the data layer (imagine something like iBATIS and Hibernate combined). It is the same entity engine that powers millions of Attlasian Jira instances. But don't get me wrong, it is not meant for usage outside of OFBiz framework, so use it only as OFBiz data layer.Service Engine - this might be hard to grasp for someone only with DDD background, but OFBiz doesn't have any domain objects. Instead for the service layer it uses SOA and has thousands of services that contains the business logic. A service is an atomic bit of isolated business logic, usually reading and updating the database. If you need you can make services triggering each other using ECAs(event-condition-action) which is kind of rule engine that allows define pre/post conditions for triggering other service calls when a service is executed. The service itself can be written written in java, groovy or simple language (an XML DSL for simple database manipulation) and usually requires authentication, authorisation and finally executed in a transaction.
UI widgets - an XML DSL which allows you easily create complex pages with tables, forms and trees.
And the really great thing about this framework is that 'The whole is greater than the sum of its parts' - all of the above layers works together amazingly: if you have an entity definition (a table) in your data layer, you can use it in your service layer during your service interface definition or its implementation. It takes one line of code(a long one) to create a service which has as input parameters the table columns and return the primary key as result of the service. Then if you are creating a screen with tables or forms, you can base it on your entity definitions or service definitions. It is again only few lines of code to create a form with fields mapping to a service or entity fields.

Out of the box business applications. These are vertical applications for managing the full life cycle of a business domain like ordering, accounting, manufacturing and many more in a horizontally integrated manner. So creating an order from order or ecommerce application will interact with facility to check whether a product is available, and after the order is created will create accounting transaction in accounting application. Before the order is shipped from the facility, it will create invoices and once the invoice is paid, it will close the order. You get the idea.

Camel in 30 seconds
Apache Camel logoApache Camel is an integration framework based on known Enterprise Integration Patterns(EIP). Camel can also be presented as consisting of two artifacts:
The routing framework which can be defined using java, scala, xml DSL with all the EIPs like Pipe, Filter, Router, Splitter, Aggregator, Throttler, Normalizer and many more.
Components and transformers ie all the different connectors to more than 100 different applications and protocols like: AMQP, AWS, web services, REST, MongoDB, Twitter, Websocket, you name it.
If you can imagine a tool, that enables you to consume data from one system, then manipulate the data (transform, filter, split, aggregate) and send it to other systems, using a declarative, concise, English-like DSL without any boilerplate code - that's Apache Camel.

Let OFBiz talk to all of the systems Camel do
The main interaction point with OFBiz are either by using the Entity Engine for direct data manipulation or by calling services through Service Engine. The latter is preferred because it ensures that the user executing the service is authorised to do so, the operation is transactional to ensure data integrity, and also all the business rules are satisfied (there might be other services that have to be executed with ECA rules). So if we can create an OFBiz endpoint in Camel and execute OFBiz services from Camel messages, that would allow OFBiz to receive notifications from Camel endpoints. What about the other way around - making OFBiz notify Camel endpoints? The ideal way would be to have an OFBiz service that sends the IN parameters to Camel endpoints as message body and headers and return the reply message as OFBiz service response.
If you are wondering: why is it so great, what is an endpoint, where is the real world, who is gonna win Euro2012... have a look at the complete list of available Camel components, and you will find out the answer.

Running Camel in OFBiz container
I've started an experimental ofbiz-camel project on github which allows you to do all of the above. It demonstrates how to poll files from a directory using Camel and create notes in OFBiz with the content of the file using createNote service. The project also has an OFBiz service, that enables sending messages from OFBiz to Camel. For example using that service it is possible to send a message to Camel file://data endpoint, and Camel will create a file in the data folder using the service parameters.
The integration between OFBiz and Camel is achieved by running Camel in an OFBiz container as part of the OFBiz framework. This makes quite tight integration, but ensures that there will not be any http, rmi or any other overhead in between. It is still WIP and may change totally.

Running Camel and OFBiz separately
Another approach is KISS: run Camel and OFBiz as they are - separate applications, and let them interact with RMI, WS* or something else. This doesn't require any much coding, but only configuring both systems to talk to each other. I've created a simple demo camel-ofbiz-rmi which demonstrates how to listen for tweets with a specific keyword and store them in OFBiz as notes by calling createNote service using RMI. It uses Camel's twitter and rmi components and requires only configuration. Notice that this example demonstrates only one way interaction: from Camel to OFBiz. In order to invoke a Camel endpoint from OFBiz you can you have to write some RMI, WS* or other code.

PS: I'm looking forward to hear your real world integration requirements for OFBiz.

How to keep your content repository and Solr in synch using Camel

With recent contributions to Camel, now camel-jcr component has a consumer which allows monitoring a Java Content Repository for changes. If your jcr supports OPTION_OBSERVATION_SUPPORTED then the consumer will register an EventListener and get notified for all kind of events. The chances are that you are not interested in all the events from the whole repository and in this case it is possible to narrow down the notifications to receive by further specifying the path of interest, event types, node uuids, nodeTypes and etc.

How can this consumer be useful? (hhmmm, you tell me) Lets say we have a CMS and we want to keep our external Solr index in synch with the content updates.So whenever a new node is added to the content repository all of its properties get indexed in Solr, and if the node is deleted from the content repository then corresponding document is removed from Solr.

Here is a Camel route that will listen for changes under /path/site folder and all its children. But this route will get notified only for two kind of events: NODE_ADDED and NODE_REMOVED, because the value of eventTypes option is a bit mask of the event types of interest (in this case 3 for masking 1 and 2 respectively).
from("jcr://username:password@repository/path/site?deep=true&eventTypes=3")
   .split(body())
   .choice()

   .when(script("beanshell", "request.getBody().getType() == 1"))
   .to("direct:index")

   .when(script("beanshell", "request.getBody().getType() == 2"))
   .to("direct:delete")

   .otherwise()
   .log("Event type not recognized" + body().toString());
Then the route will split each event into a separate message and depending on the event type will send the node creation events to direct:index route and node deletion events to direct:delete route.

Delete route is a simple one: It sets the solr operation to delete_by_id in the message header
and the node identifier into message body which in our case represents also the uniqueKey in the solr schema. Followed by a solr commit.
from("direct:delete")
   .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_DELETE_BY_ID))
   .setBody(script("beanshell", "request.getBody().getIdentifier()"))

   .log("Deleting node with id: ${body}")
   .to(SOLR_URL)

   .setHeader("SolrOperation", constant("COMMIT"))
   .to(SOLR_URL);
Indexing part consist of two routes, where the nodeRetriever route is actually getting the node from content repository using its identifier from the update event:
from("direct:nodeRetriever")
   .setHeader(JcrConstants.JCR_OPERATION, constant(JcrConstants.JCR_GET_BY_ID))
   .setBody(script("beanshell", "request.getBody().getIdentifier()"))

   .log("Reading node with id: ${body}")
   .to("jcr://admin:admin@repository");
After the node is retrieved from the repository using content enricher EIP, there is also a processor
to extract node properties and set them into Camel message properties so that they get indexed as solr document fields.
from("direct:index")
   .enrich("direct:nodeRetriever", nodeEnricher)
   .process(jcrSolrPropertyMapper)

   .log("Indexing node with id: ${body}")
   .setHeader("SolrOperation", constant("INSERT"))
   .to(SOLR_URL);
You can find the complete working example on github. In case your CMS is not a JCR, but CMIS compliant, have a look at this cmis component on my github account.

Indexing data in Solr from disparate sources using Camel

Apache Solr is "the popular, blazing fast open source enterprise search platform" built on top of Lucene.  In order to do a search (and find results) there is the initial requirement of data ingestion usually from disparate sources like content management systems, relational databases, legacy systems, you name it... Then there is also the challenge of keeping the index up to date by adding new data, updating existing records, removing obsolete data. The new sources of data could be the same as the initial ones, but could also be sources like twitter, AWS or rest endpoints.

Solr can understand different file formats and provides fair amount of options for data indexing:
  1. Direct HTTP and remote streaming - allows you to interact with Solr over HTTP by posting a file for direct indexing or the path to the file for remote streaming.
  2. DataImportHandler - is a module that enables both full and incremental delta imports from relational databases or file system.
  3. SolrJ - a java client to access Solr using Apache Commons HTTP Client.
But in real life, indexing data from different sources with millions of documents, dozens of transformations, filtering, content enriching, replication, parallel processing  requires much more than that. One way to cope with such a challenge is by reinventing the wheel: write few custom applications, combine them with some scripts or run cronjobs. Another approach would be to use a tool that is flexible and designed to be configurable and plugable, that can help you to scale and distribute the load with ease. Such a tool is Apache Camel which has also a Solr connector now.

All started few months ago, during basecamp days at Sourcesense, where me and my colleague Alex were experimenting with different projects to implement a pipeline for indexing data into Solr. As expected we discovered Camel and after few days of pairing, we were ready with the initial version of the Solr component which got committed to Camel and extended further by Ben Oday. At the moment it is full featured Solr connector, that uses SolrJ behind the scene and lets you to: configure all parameters of SolrServer and StreamingUpdateSolrServer;  supports the operations: insert, add_bean, delete_by_id, delete_by_query, commit, rolback, optimize; index files, SolrInputDocument instances, beans with annotations or individual message headers.

Creating a Camel route to index all the data from a relational database table and local file system is simple:
public void configure() {
from("timer://clear?repeatCount=1")
        .to("direct:clearIndex");

from("file:src/data?noop=true")
        .to("direct:insert");

from("timer://database?repeatCount=1")
        .to("sql:select * from products?dataSourceRef=productDataSource")
        .split(body())
        .process(new SqlToSolrMapper())
        .to("direct:insert");

from("direct:insert")
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_INSERT))
        .to(SOLR_URL)
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_COMMIT))
        .to(SOLR_URL);

from("direct:clearIndex")
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_DELETE_BY_QUERY))
        .setBody(constant("*:*"))
        .to(SOLR_URL)
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_COMMIT))
        .to(SOLR_URL);
}
The above route will first clear the index by deleting all the documents followed by a commit. Then it will start polling files from src/data folder, read each file and send it to Solr endpoint. Assuming that the files are in a format Solr can understand, they will be indexed and committed. The third route will retrieve all the products from database (in memory), split them into individual records, map each record to Solr fields, and digest :)

Luckily, in 2012, the life of software developer is not that simple boring. Instead nowadays a more realistic indexing requirement would consist of something like this:

1. Get the backup files from amazon S3 and index. If a document is approved, commit it as soon as  possible, otherwise commit every 10 minutes.

How can Camel help you with this requirement? Camel supports most popular amazon APIs including S3. Using aws-s3 component, it is possible to read files from a S3 bucket, then apply a filter for approved documents, in order to send them into a separate route for instant commit.
<route>
  <from uri="aws-s3://MyBucket?delay=5000&maxMessagesPerPoll=5"/>
  <choice>
    <when>
      <xpath>/add/doc[@status='approved']</xpath>
      <to uri="direct:indexAndCommit"/>
    </when>
    <otherwise>
      <to uri="direct:index"/>
    </otherwise>
  </choice>
</route>
<route>
  <from uri="timer://commit?fixedRate=true&period=600s"/>
  <from uri="direct:commit"/>
</route>
2. Retrieve customer data from database every 5 seconds by reading10 records at a time. Also look for deltas. Enrich the address data with latitute/longitute by calling XXX external service to facilitate spatial search in Solr.
<route id="fromDB">
  <from uri="jpa://com.ofbizian.pipeline.Customer?consumer.namedQuery= newCustomers&amp;maximumResults=10&amp;delay=5000"/>
  <enrich uri="direct:coordinateEnricher" strategyRef="latLongAggregationStrategy"/>
  <to uri="direct:index"/>
</route>

<route>
  <from uri="direct:coordinateEnricher"/>
  <setHeader headerName="CamelHttpQuery">
    <simple>address='${body.address}'&amp;sensor=false</simple>
  </setHeader>
  <to uri="http://maps.google.com/maps/api/geocode/xml"/>
  <setHeader headerName="lat">
    <xpath resultType="java.lang.Double">//result[1]/geometry/location/lat/text()</xpath>
  </setHeader>
  <setHeader headerName="lng">
    <xpath resultType="java.lang.Double">//result[1]/geometry/location/lng/text()</xpath>
  </setHeader>
</route>
The above route reads from Customer table 10 records at a time, and for each one will call google's maps API to get latitude and longitude using the customer address field. The coordinates are extracted from response using XPath and merged back into Customer object. Simple, isn't it.

3. Index the content under this/that/path in our content management system and also monitor for updates.
<route>
  <from uri="jcr://user:pass@repository/import/inbox/signal?eventTypes=3&deep=true&synchronous=false"/>
  <to uri="direct:index"/>
</route>
Camel has a jcr connector, which allows you to create content in any java content repository. There is also an improvement submitted in CAMEL-5155 which will allow reading content from JCR v.2 supporting repositories soon.
If you are lucky and your CMS supports CMIS you can use my camel-cmis connector from github for the same purpose.

4. Listen for tweets about our product/company, do sentiment analysis, and index only positive tweets.
<route id="fromTwitter">
  <from uri="twitter://streaming/filter?type=event&keywords=productName&consumerKey={{consumer.key}}&consumerSecret={{consumer.secret}}"/>
  <setHeader headerName="CamelHttpQuery">
    <language language="beanshell">
      "q=" + java.net.URLEncoder.encode(request.getBody().getText(), "UTF-8")
    </language>
  </setHeader>
  <throttle timePeriodMillis="1500">
    <constant>1</constant>
    <to uri="http://data.tweetsentiments.com:8080/api/analyze.xml"/>
    <setHeader headerName="sentiment">
      <xpath resultType="java.lang.Double">/sentiment/value/text()</xpath>
    </setHeader>
    <filter>
      <simple>${in.header.sentiment} > 0</simple>
      <to uri="direct:index"/>
    </filter>
  </throttle>
</route>
This route is going to listen for tweets using twitter's real time api, url encode the tweet and call tweetsentiments api for sentiment analysis. In addition it will apply throttling, so only one request at most is made every 1500 milliseconds, because there is restriction on the number of calls per second. Then the route is applying a filter to ignore all the negative tweets, before indexing.

As you can see Camel can interact with many disparate systems (including Solr) easily, and even if you have a very custom application, writing a connector for it would not be difficult. But this is only one side of the story. At the other side, there is a full list of Enterprise Integration Patterns implemented by Camel which are needed for any serious data ingestion pipeline: Routers, Translator, Filter, Splitter, Aggregator, Content Enricher, Load Balancer... Last but not least: Exception Handling, Logging, Monitoring, DSLs... In two words: Camel Rocks!

PS: The full source code of the examples can be found on my github account.

My first commit to Camel - FOP component

After being invited to join the Apache Camel team, it is time for my first commit. It is actually for one of my very first contributions to Camel that was waiting in the jira for long time - a camel-fop component.
This component uses Apache FOP project to render messages into variety of output formats such as PDF, PostScript, RTF, PNG etc.

Camel-fop has only a producer, which expects messages containing XML data in XSL-FO format in the body. Assuming that your data and representation are separate, xsl-fo can be generated from xml data and xslt template, or instead using freemarker or velocity templates and passing the data with message headers.
Once the content is ready in the message body, camel-fop will transform it to the output format specified in the endpoint URL or message header.

It is also possible to set some metadata for each document (like producer, author, creation date, resolution...) or encrypt the document with password.

Additionally it allows you to override the default fop configuration file by specifying a new userConfig location in the endpoint URL. With a custom configuration you can control fop's rendering behavior and help it find its resources.

Here is an example route that creates PDFs from xml data and xslt template:
      .from("file:source/data/xml")
          .to("xslt:xslt/template.xsl")
          .to("fop:application/pdf")
          .to("file:target/data");
To see the example files in actions, check the test folder in camel-fop component.
Let me know if you find this component any useful.

REST with Apache Camel

There are many ways to expose an HTTP endpoint in Camel: jetty, tomcat, servlet, cxfrs and restlet. Two of these components - cxfrs and restlet also support REST semantics just with few lines of code. This simple example demonstrates how to do CRUD operations with camel-restlet and camel-jdbc.

The four HTTP verbs execute different operations and map to the following single URI template:
  •  POST - create a new user: /user
  •  GET - request the current state of the user specified by the URI: /user/{userId}
  •  PUT - update an user at the given URI with new information: /user/{userId}
  •  DELETE - remove the user identified by the given URI: /user/{userId}
  • There is also a /users URI which returns all the users regardless of the HTTP method used.
Creating such an application with Camel is straightforward. After adding all the necessary dependencies (restlet, spring, jdbc...) configure web.xml to load Camel context:
<context-param>
  <param-name>contextConfigLocation</param-name>
  <param-value>classpath:camel-config.xml</param-value>
</context-param>
<listener>
  <listener-class>org.springframework.web.context.ContextLoaderListener   </listener-class>
</listener>
  and map the Restlet servlet
<servlet>
  <servlet-name>RestletServlet</servlet-name>
  <servlet-class>org.restlet.ext.spring.SpringServerServlet</servlet-class>
  <init-param>
    <param-name>org.restlet.component</param-name>
    <param-value>RestletComponent</param-value>
  </init-param>
</servlet>
<servlet-mapping>
  <servlet-name>RestletServlet</servlet-name>
  <url-pattern>/rs/*</url-pattern>
</servlet-mapping>
In the Spring context, there is a little more Restlet and an in-memory datasource setup code:
<bean id="RestletComponent" class="org.restlet.Component"/>
<bean id="RestletComponentService" class="org.apache.camel.component.restlet.RestletComponent">
   <constructor-arg index="0">
     <ref bean="RestletComponent"/>
   </constructor-arg>
</bean>
<jdbc:embedded-database id="dataSource" type="HSQL">
   <jdbc:script location="classpath:sql/init.sql"/>
</jdbc:embedded-database>

After all the setup is done, the next step is to create Camel routes that will process the HTTP requests and execute appropriate CRUD operations. The first one is createUser route that executes SQL insert command with the parameters from POST requests only and return the newly created user in the response body:
<route id="createUser">
   <from uri="restlet:/user?restletMethod=POST"/>
   <setBody>
     <simple>insert into user(firstName, lastName) values('${header.firstName}','${header.lastName}');  </simple>
   </setBody>
   <to uri="jdbc:dataSource"/>
   <setBody>
     <simple>select * from user ORDER BY id desc LIMIT 1</simple>
   </setBody>
   <to uri="jdbc:dataSource"/>
</route>
The "manipulateUser" route handles GET, PUT and DELETE HTTP methods, but depending on the method used, it executes different SQL commands:

<route id="manipulateUser">
  <from uri="restlet:/user/{userId}?restletMethods=GET,PUT,DELETE"/>
  <choice>
    <when>
    <simple>${header.CamelHttpMethod} == 'GET'</simple>
    <setBody>
      <simple>select * from user where id = ${header.userId}</simple>
    </setBody>
   </when>
   <when>
     <simple>${header.CamelHttpMethod} == 'PUT'</simple>
       <setBody>
       <simple>update user set firstName='${header.firstName}', lastName='${header.lastName}' where id = ${header.userId}</simple>
       </setBody>
   </when>
   <when>
     <simple>${header.CamelHttpMethod} == 'DELETE'</simple>
     <setBody>
       <simple>delete from user where id = ${header.userId}</simple>
     </setBody>
   </when>
   <otherwise>
     <stop/>
   </otherwise>
  </choice>
  <to uri="jdbc:dataSource"/>
</route>
And the last route for listing all the users is self explanatory:
<route id="listUsers">
  <from uri="restlet:/users"/>
  <setBody>
    <constant>select * from user</constant>
  </setBody>
  <to uri="jdbc:dataSource"/>
</route>
If you want to see the application in action, grab the source code from github and run it with the embedded maven-jetty plugin by typing: mvn jetty:run .You can even try some quick queries if you have curl installed:

To create an user, make a http POST request with firstName and lastName parameters
curl -d "firstName=test&lastName=user" http://localhost:8080/rs/user/

To update an existing user, make a http PUT request with firstName and lastName parameters
curl -X PUT -d "firstName=updated&lastName=user" http://localhost:8080/rs/user/2

To retrieve an existing user, make a http GET request with the userId as part of the url
curl -X GET  http://localhost:8080/rs/user/2

To delete an existing user, make a http DELETE request with the userId as part of the url
curl -X DELETE  http://localhost:8080/rs/user/2

To retrieve all the existing users, make a http GET request to users url
curl -X GET  http://localhost:8080/rs/users

Find Me Online