cxdx

Blogroll

Showing posts with label Solr. Show all posts
Showing posts with label Solr. Show all posts

How to keep your content repository and Solr in synch using Camel

With recent contributions to Camel, now camel-jcr component has a consumer which allows monitoring a Java Content Repository for changes. If your jcr supports OPTION_OBSERVATION_SUPPORTED then the consumer will register an EventListener and get notified for all kind of events. The chances are that you are not interested in all the events from the whole repository and in this case it is possible to narrow down the notifications to receive by further specifying the path of interest, event types, node uuids, nodeTypes and etc.

How can this consumer be useful? (hhmmm, you tell me) Lets say we have a CMS and we want to keep our external Solr index in synch with the content updates.So whenever a new node is added to the content repository all of its properties get indexed in Solr, and if the node is deleted from the content repository then corresponding document is removed from Solr.

Here is a Camel route that will listen for changes under /path/site folder and all its children. But this route will get notified only for two kind of events: NODE_ADDED and NODE_REMOVED, because the value of eventTypes option is a bit mask of the event types of interest (in this case 3 for masking 1 and 2 respectively).
from("jcr://username:password@repository/path/site?deep=true&eventTypes=3")
   .split(body())
   .choice()

   .when(script("beanshell", "request.getBody().getType() == 1"))
   .to("direct:index")

   .when(script("beanshell", "request.getBody().getType() == 2"))
   .to("direct:delete")

   .otherwise()
   .log("Event type not recognized" + body().toString());
Then the route will split each event into a separate message and depending on the event type will send the node creation events to direct:index route and node deletion events to direct:delete route.

Delete route is a simple one: It sets the solr operation to delete_by_id in the message header
and the node identifier into message body which in our case represents also the uniqueKey in the solr schema. Followed by a solr commit.
from("direct:delete")
   .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_DELETE_BY_ID))
   .setBody(script("beanshell", "request.getBody().getIdentifier()"))

   .log("Deleting node with id: ${body}")
   .to(SOLR_URL)

   .setHeader("SolrOperation", constant("COMMIT"))
   .to(SOLR_URL);
Indexing part consist of two routes, where the nodeRetriever route is actually getting the node from content repository using its identifier from the update event:
from("direct:nodeRetriever")
   .setHeader(JcrConstants.JCR_OPERATION, constant(JcrConstants.JCR_GET_BY_ID))
   .setBody(script("beanshell", "request.getBody().getIdentifier()"))

   .log("Reading node with id: ${body}")
   .to("jcr://admin:admin@repository");
After the node is retrieved from the repository using content enricher EIP, there is also a processor
to extract node properties and set them into Camel message properties so that they get indexed as solr document fields.
from("direct:index")
   .enrich("direct:nodeRetriever", nodeEnricher)
   .process(jcrSolrPropertyMapper)

   .log("Indexing node with id: ${body}")
   .setHeader("SolrOperation", constant("INSERT"))
   .to(SOLR_URL);
You can find the complete working example on github. In case your CMS is not a JCR, but CMIS compliant, have a look at this cmis component on my github account.

Indexing data in Solr from disparate sources using Camel

Apache Solr is "the popular, blazing fast open source enterprise search platform" built on top of Lucene.  In order to do a search (and find results) there is the initial requirement of data ingestion usually from disparate sources like content management systems, relational databases, legacy systems, you name it... Then there is also the challenge of keeping the index up to date by adding new data, updating existing records, removing obsolete data. The new sources of data could be the same as the initial ones, but could also be sources like twitter, AWS or rest endpoints.

Solr can understand different file formats and provides fair amount of options for data indexing:
  1. Direct HTTP and remote streaming - allows you to interact with Solr over HTTP by posting a file for direct indexing or the path to the file for remote streaming.
  2. DataImportHandler - is a module that enables both full and incremental delta imports from relational databases or file system.
  3. SolrJ - a java client to access Solr using Apache Commons HTTP Client.
But in real life, indexing data from different sources with millions of documents, dozens of transformations, filtering, content enriching, replication, parallel processing  requires much more than that. One way to cope with such a challenge is by reinventing the wheel: write few custom applications, combine them with some scripts or run cronjobs. Another approach would be to use a tool that is flexible and designed to be configurable and plugable, that can help you to scale and distribute the load with ease. Such a tool is Apache Camel which has also a Solr connector now.

All started few months ago, during basecamp days at Sourcesense, where me and my colleague Alex were experimenting with different projects to implement a pipeline for indexing data into Solr. As expected we discovered Camel and after few days of pairing, we were ready with the initial version of the Solr component which got committed to Camel and extended further by Ben Oday. At the moment it is full featured Solr connector, that uses SolrJ behind the scene and lets you to: configure all parameters of SolrServer and StreamingUpdateSolrServer;  supports the operations: insert, add_bean, delete_by_id, delete_by_query, commit, rolback, optimize; index files, SolrInputDocument instances, beans with annotations or individual message headers.

Creating a Camel route to index all the data from a relational database table and local file system is simple:
public void configure() {
from("timer://clear?repeatCount=1")
        .to("direct:clearIndex");

from("file:src/data?noop=true")
        .to("direct:insert");

from("timer://database?repeatCount=1")
        .to("sql:select * from products?dataSourceRef=productDataSource")
        .split(body())
        .process(new SqlToSolrMapper())
        .to("direct:insert");

from("direct:insert")
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_INSERT))
        .to(SOLR_URL)
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_COMMIT))
        .to(SOLR_URL);

from("direct:clearIndex")
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_DELETE_BY_QUERY))
        .setBody(constant("*:*"))
        .to(SOLR_URL)
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_COMMIT))
        .to(SOLR_URL);
}
The above route will first clear the index by deleting all the documents followed by a commit. Then it will start polling files from src/data folder, read each file and send it to Solr endpoint. Assuming that the files are in a format Solr can understand, they will be indexed and committed. The third route will retrieve all the products from database (in memory), split them into individual records, map each record to Solr fields, and digest :)

Luckily, in 2012, the life of software developer is not that simple boring. Instead nowadays a more realistic indexing requirement would consist of something like this:

1. Get the backup files from amazon S3 and index. If a document is approved, commit it as soon as  possible, otherwise commit every 10 minutes.

How can Camel help you with this requirement? Camel supports most popular amazon APIs including S3. Using aws-s3 component, it is possible to read files from a S3 bucket, then apply a filter for approved documents, in order to send them into a separate route for instant commit.
<route>
  <from uri="aws-s3://MyBucket?delay=5000&maxMessagesPerPoll=5"/>
  <choice>
    <when>
      <xpath>/add/doc[@status='approved']</xpath>
      <to uri="direct:indexAndCommit"/>
    </when>
    <otherwise>
      <to uri="direct:index"/>
    </otherwise>
  </choice>
</route>
<route>
  <from uri="timer://commit?fixedRate=true&period=600s"/>
  <from uri="direct:commit"/>
</route>
2. Retrieve customer data from database every 5 seconds by reading10 records at a time. Also look for deltas. Enrich the address data with latitute/longitute by calling XXX external service to facilitate spatial search in Solr.
<route id="fromDB">
  <from uri="jpa://com.ofbizian.pipeline.Customer?consumer.namedQuery= newCustomers&amp;maximumResults=10&amp;delay=5000"/>
  <enrich uri="direct:coordinateEnricher" strategyRef="latLongAggregationStrategy"/>
  <to uri="direct:index"/>
</route>

<route>
  <from uri="direct:coordinateEnricher"/>
  <setHeader headerName="CamelHttpQuery">
    <simple>address='${body.address}'&amp;sensor=false</simple>
  </setHeader>
  <to uri="http://maps.google.com/maps/api/geocode/xml"/>
  <setHeader headerName="lat">
    <xpath resultType="java.lang.Double">//result[1]/geometry/location/lat/text()</xpath>
  </setHeader>
  <setHeader headerName="lng">
    <xpath resultType="java.lang.Double">//result[1]/geometry/location/lng/text()</xpath>
  </setHeader>
</route>
The above route reads from Customer table 10 records at a time, and for each one will call google's maps API to get latitude and longitude using the customer address field. The coordinates are extracted from response using XPath and merged back into Customer object. Simple, isn't it.

3. Index the content under this/that/path in our content management system and also monitor for updates.
<route>
  <from uri="jcr://user:pass@repository/import/inbox/signal?eventTypes=3&deep=true&synchronous=false"/>
  <to uri="direct:index"/>
</route>
Camel has a jcr connector, which allows you to create content in any java content repository. There is also an improvement submitted in CAMEL-5155 which will allow reading content from JCR v.2 supporting repositories soon.
If you are lucky and your CMS supports CMIS you can use my camel-cmis connector from github for the same purpose.

4. Listen for tweets about our product/company, do sentiment analysis, and index only positive tweets.
<route id="fromTwitter">
  <from uri="twitter://streaming/filter?type=event&keywords=productName&consumerKey={{consumer.key}}&consumerSecret={{consumer.secret}}"/>
  <setHeader headerName="CamelHttpQuery">
    <language language="beanshell">
      "q=" + java.net.URLEncoder.encode(request.getBody().getText(), "UTF-8")
    </language>
  </setHeader>
  <throttle timePeriodMillis="1500">
    <constant>1</constant>
    <to uri="http://data.tweetsentiments.com:8080/api/analyze.xml"/>
    <setHeader headerName="sentiment">
      <xpath resultType="java.lang.Double">/sentiment/value/text()</xpath>
    </setHeader>
    <filter>
      <simple>${in.header.sentiment} > 0</simple>
      <to uri="direct:index"/>
    </filter>
  </throttle>
</route>
This route is going to listen for tweets using twitter's real time api, url encode the tweet and call tweetsentiments api for sentiment analysis. In addition it will apply throttling, so only one request at most is made every 1500 milliseconds, because there is restriction on the number of calls per second. Then the route is applying a filter to ignore all the negative tweets, before indexing.

As you can see Camel can interact with many disparate systems (including Solr) easily, and even if you have a very custom application, writing a connector for it would not be difficult. But this is only one side of the story. At the other side, there is a full list of Enterprise Integration Patterns implemented by Camel which are needed for any serious data ingestion pipeline: Routers, Translator, Filter, Splitter, Aggregator, Content Enricher, Load Balancer... Last but not least: Exception Handling, Logging, Monitoring, DSLs... In two words: Camel Rocks!

PS: The full source code of the examples can be found on my github account.

Find Me Online