My First Camel Ride, Apache Camel That Is!

Apr 29 2012

I was preparing to attend the CamelOne conference and since it had been a while since I’ve used Apache Camel, I thought I would take it for a spin.  For those of you who don’t know Apache Camel, is the premier open source integration framework and it implements all of the Enterprise Integration patterns.  For a good overall introduction I would recommend the following post.  I also, would highly recommend the Camel In Action book as well.

This desire to utilize happened to coincide with a problem I was attempting to solve at my current client.  At my client, we have what I will call is poor man’s performance monitoring via log4j based aspects.  These aspects start and stop a timer on each annotated method invocation and write a record to our log files.  We were running our performance testing and I wanted to parse the log file and write the records to a database table so that we could analyze the results.

Apache Camel supports Java, Spring XML, and Scala DSL’s, among others.  As a recent convert to Scala, I chose the Scala DSL.  For the process I was trying to model needed to accomplish the following:

  1. Read a file from the file system.
  2. Parse each line of the file into the individual pieces and parts that are to be stored into the database.
  3. Load the resulting lines into the table.

In order to accomplish, this I used the following components:

  1. File processor with a Splitter
  2. Custom processor to parse each line.
  3. JDBC Processor

In order to implement a custom processor, you need need to implement Processor interface.  Because, I’m using Scala and I’m just constructing an inline app, the function definition is as follows:

val processor = (exchange: Exchange) => {
    // Get the incoming line from the message exchange
    val line = exchange.getIn.getBody(classOf[String])
    // Remove the first part of the log message
    val lineSplit = line.split("\\[\\]\\: ")
    // Break it down into the individual components
    val split = lineSplit(1).split(" ")
    // Build the insert statement
    val x = MessageFormat.format(format, split(0), split(1), split(2), split(3), split(4), split(5), split(7), split(10))
    // Replace the incoming message with the insert statement
    exchange.getIn().setBody(x)
  }

Next up, I need to establish the Database connection so that the JDBC component can insert.  I used Apache Commons DBCP and setup the connection and registered it with Camel:

// Build the connection pool
  val ds = new BasicDataSource
  ds.setDriverClassName("oracle.jdbc.OracleDriver")
  ds.setUrl("jdbc:oracle:thin:@localhost:1521:xe")
  ds.setUsername("*****")
  ds.setPassword("*****")
  // Register it with Camel
  val reg = new SimpleRegistry 
  reg.put("dataSource", ds)

The next part of the exercise is to setup the camel route builder:

// Create the Camel Context
  val context = new DefaultCamelContext(reg)
  // Build the Route
  context.addRoutes(new RouteBuilder {
    // Read the file(s) from the directory
    "file:perf" ==> {
      // Split the file up at each line
      split(_.getIn().getBody(classOf[String]).split("\n")) {
        // The further split up the processing across multiple parsers
        loadbalance roundrobin {
        // Reference to my internal components
          to("direct:x")
          to("direct:y")
          to("direct:z")
        }
      // Join back the results from aboce
      // then split it out again turning off commit level changes
      }.loadbalance roundrobin {
        to("jdbc:dataSource?resetAutoCommit=false")
      	to("jdbc:dataSource?resetAutoCommit=false")
      	to("jdbc:dataSource?resetAutoCommit=false")
      }
    }

    // Setup my internal processors with references to my custom component
    "direct:x" process (processor)
    "direct:y" process (processor)
    "direct:z" process (processor)
  })

The last part of the exercise is to execute this from my main program:

// Start the camel process
  context.start
  // Wait for the process to execute
  Thread.sleep(10000)
  // Stop the camel process
  context.stop

I was impressed with a little bit of code [less than 70 lines], I was able to process a 50MB in under 10 seconds.  One note, the Scala builder does not support all of the features of the Java DSL.  In particular, the splitter does not delete files after processing them , therefore requiring me to process 1 file at a time.

My next exercise is to use the SFTP component to automatically fetch the files for me.

<!--[if gte mso 9]> Normal 0 false false false EN-US X-NONE X-NONE MicrosoftInternetExplorer4 <![endif]--><!--[if gte mso 9]> <![endif]--><!--[if gte mso 10]> <! /* Style Definitions */ table.MsoNormalTable {mso-style-name:"Table Normal"; mso-tstyle-rowband-size:0; mso-tstyle-colband-size:0; mso-style-noshow:yes; mso-style-priority:99; mso-style-qformat:yes; mso-style-parent:""; mso-padding-alt:0in 5.4pt 0in 5.4pt; mso-para-margin:0in; mso-para-margin-bottom:.0001pt; mso-pagination:widow-orphan; font-size:10.0pt; font-family:"Calibri","sans-serif"; mso-bidi-font-family:"Times New Roman";} --> <!--[endif] -->

I was preparing to attend the CamelOne conference and since it had been a while since I’ve used Apache Camel, I thought I would take it for a spin.  For those of you who don’t know Apache Camel, is the premier open source integration framework and it implements all of the Enterprise Integration patterns.  For a good overall introduction I would recommend the following post.  I also, would highly recommend the Camel In Action book as well.

This desire to utilize happened to coincide with a problem I was attempting to solve at my current client.  At my client, we have what I will call is poor man’s performance monitoring via log4j based aspects.  These aspects start and stop a timer on each annotated method invocation and write a record to our log files.  We were running our performance testing and I wanted to parse the log file and write the records to a database table so that we could analyze the results.

Apache Camel supports Java, Spring XML, and Scala DSL’s, among others.  As a recent convert to Scala, I chose the Scala DSL.  For the process I was trying to model needed to accomplish the following:

1.       Read a file from the file system.

2.       Parse each line of the file into the individual pieces and parts that 

I was preparing to attend the CamelOne conference and since it had been a while since I’ve used Apache Camel, I thought I would take it for a spin.  For those of you who don’t know Apache Camel, is the premier open source integration framework and it implements all of the Enterprise Integration patterns.  For a good overall introduction I would recommend the following post.  I also, would highly recommend the Camel In Action book as well.

This desire to utilize happened to coincide with a problem I was attempting to solve at my current client.  At my client, we have what I will call is poor man’s performance monitoring via log4j based aspects.  These aspects start and stop a timer on each annotated method invocation and write a record to our log files.  We were running our performance testing and I wanted to parse the log file and write the records to a database table so that we could analyze the results.

Apache Camel supports Java, Spring XML, and Scala DSL’s, among others.  As a recent convert to Scala, I chose the Scala DSL.  For the process I was trying to model needed to accomplish the following:

  1. Read a file from the file system.
  2. Parse each line of the file into the individual pieces and parts that are to be stored into the database.
  3. Load the resulting lines into the table.

In order to accomplish, this I used the following components:

  1. File processor with a Splitter
  2. Custom processor to parse each line.
  3. JDBC Processor
In order to implement a custom processor, you need need to implement Processor interface.  Because, I’m using Scala and I’m just constructing an inline app, the function definition is as follows:

are to be stored into the database.

3.       Load the resulting lines into the table.

In order to accomplish, this I used the following components:

1.       File processor with a Splitter

2.       Custom processor to parse each line.

3.       JDBC Processor

In order to implement a custom processor, you need need to implement Processor interface.  Because, I’m using Scala and I’m just constructing an inline app, the function definition is as follows:

About the Author

Mr. DiFrango has over 15 years experience specializing in architecture, design, and construction of distributed, integrated systems in the enterprise and web environments. This experience includes expertise in Enterprise Systems Integration, Application Development, Service Oriented Architecture, Content Management Integration and Portal Solutions. Mr. DiFrango specializes in the JBoss Portal, Alfresco, Weblogic and Tibco product suites. Product expertise pertains primarily to Application Servers, Service Enablement Products, Content Management Integration, and Portal frameworks. Mr. DiFrango is experienced in architecting, designing and developing J2EE standardized Applications. This experience includes full lifecycle development from creating OO designs through to product testing. Mr. DiFrango also has experience leading teams of analysts, developers, and testers through troubleshooting complex development, mentoring staff, managing development schedules and work assignments, and providing architectural guidance. He is experienced in software development methodologies including procedural or waterfall, and Agile projects. He also has extensive experience with modeling systems during analysis, design, and construction using UML.

 

Disclaimer

The words and opinions expressed here are those of each article's respective author, and do not necessarily represent the views of CapTech Ventures.