My Second Camel Ride, Apache Camel That Is!

May 16 2012

This post is a follow-up to first post on using Apache Camel.  As I mentioned in that talk, my next exercise is to use the SFTP component to automatically fetch the files for me.  As I started testing, I realized that I was going to need to handle multiple files, so I started looking at the how I was going to handle that in Camel.  Initially, I thought, why not just use the route I previously created:

// Build the Route
  context.addRoutes(new RouteBuilder {
    // Read the file(s) from the directory
    "file:perf?delete=true&idempotent=true" ==> {
      // Split the file up at each line
      split(_.getIn().getBody(classOf[String]).split("\n")) {
        // The further split up the processing across multiple parsers
        loadbalance roundrobin {
        // Reference to my internal components
          to("direct:x")
          to("direct:y")
          to("direct:z")
        }
      // Join back the results from aboce
      // then split it out again turning off commit level changes
      }.loadbalance roundrobin {
        to("jdbc:dataSource?resetAutoCommit=false")
      	to("jdbc:dataSource?resetAutoCommit=false")
      	to("jdbc:dataSource?resetAutoCommit=false")
      }
    }

    // Setup my internal processors with references to my custom component
    "direct:x" process (processor)
    "direct:y" process (processor)
    "direct:z" process (processor)
  })

 

Much to my surprised, I found that this did not work as expected.  So, what happened, well the first thing  I noticed is that none of the files [2 of them initially] got deleted.  The second thing I noticed is that double the number of records landed into the database than I expected.  I was perplexed as to why this was occurring, so I posted a message on the Apache Camel users list and after some back and forth, we realized that I was hitting this bug in Apache Camel.  After realizing this, one of the main committers on the project Claus Isben posted a patch for pre-released version 2.10.

 

I went ahead and downloaded it attempted to run the same route, but I got the same behavior.  After some back and forth, we never really came to a final conclusion and I was about to punt on using Apache Camel, then a revelation came to me, why not split the single route into multiple routes that are in effect chained off one another.   After some further thought I came up with following set of routes:

 

val context = new DefaultCamelContext(reg)
  context.addRoutes(new RouteBuilder {
    //
    // Fetch performance metrics for the first server and store them locally
    //
    "sftp://myid@myserver1//prod/msp/logs/prtlf_logs/msp_prtlf_qps_07/msp_prtlf_qps_m00?include=epf_perf.log.*&password=Mypassword&localWorkDirectory=tmp/yashin&idempotent=true" --> "file:perf?fileName=${file:name.noext}_${in.header.CamelFileHost}.txt"
    //
    // Fetch performance metrics for the second server and store them locally
    //
    "sftp://myid@ myserver2//prod/msp/logs/prtlf_logs/msp_prtlf_qps_09/msp_prtlf_qps_m00?include=epf_perf.log.*&password=Mypassword&localWorkDirectory=tmp/recoba&idempotent=true" --> "file:perf?fileName=${file:name.noext}_${in.header.CamelFileHost}.txt"
    //
    // Read the files that land in the perf folder and convert them into a CSV file
    //
    "file:perf?delete=true&idempotent=true&initialDelay=1500&delay=500" ==> {
      process(myProcessor).to("file:perf_outbox")
    }
    //
    // Read the files that land in the perf_outbox folder and convert them into file, that
    // contains a series of insert statements.
    //
    "file:perf_outbox?delete=true&idempotent=true&initialDelay=2000&delay=500" ==> {
      process(insertProcessor).to("file:perf_insert")
    }
    //
    // Read the files that land in the perf_insert folder and execute the insert
    // statements.  Because this route is processing a single file at a time, the 
    // split operation works as expected.
    //
    "file:perf_insert?delete=true&idempotent=true&initialDelay=2500&delay=500" ==> {
      split(_.getIn().getBody(classOf[String]).split("\n")).to("jdbc:dataSource")
   }
  })

A couple of things to note, because I’m now fetching the files with the SFTP component, but as the routes indicate, I’m doing this from multiple servers, so I need a way to uniquely name the files.  This is easily accomplished in Apache Camel because the SFTP component emits a header property called CamelFileHost that I can use to name the files as they are processed.  This is done with the following:

${file:name.noext}_${in.header.CamelFileHost}.txt

This essentially takes the incoming file name, strips off the extension and places the host name on the end of the name.

This processing now allows me to fetch the files remotely and process the performance log files.  In my particular case this resulting in the moving of 300MB of log files and processing of approximately 300,000 records in a very efficient fashion.

About the Author

Mr. DiFrango has over 15 years experience specializing in architecture, design, and construction of distributed, integrated systems in the enterprise and web environments. This experience includes expertise in Enterprise Systems Integration, Application Development, Service Oriented Architecture, Content Management Integration and Portal Solutions. Mr. DiFrango specializes in the JBoss Portal, Alfresco, Weblogic and Tibco product suites. Product expertise pertains primarily to Application Servers, Service Enablement Products, Content Management Integration, and Portal frameworks. Mr. DiFrango is experienced in architecting, designing and developing J2EE standardized Applications. This experience includes full lifecycle development from creating OO designs through to product testing. Mr. DiFrango also has experience leading teams of analysts, developers, and testers through troubleshooting complex development, mentoring staff, managing development schedules and work assignments, and providing architectural guidance. He is experienced in software development methodologies including procedural or waterfall, and Agile projects. He also has extensive experience with modeling systems during analysis, design, and construction using UML.

 

Disclaimer

The words and opinions expressed here are those of each article's respective author, and do not necessarily represent the views of CapTech Ventures.