Google

Jan 3, 2013

Spring Integration -- polling for a file and processing the file



Q. What are the main differences and similarities between light-weight integration frameworks like Spring Integration, Apache Camel, etc and an ESB like Oracle Service Bus, Mule, etc?


A. The core concepts are about the same, and based on the Enterprise integration patterns (EIP) book, and you can do the things mentioned there in the book. These are all about connectivity using different protocols, routing and messaging patterns, transformation, orchestration, business rules engine, and business and technical monitoring. The real difference lies where the ESBs are more powerful in the space of orchestration, business rules engine, and business monitoring compared to the light-weight integration frameworks.

Some of the commercial ESBs provide graphical drag and drop feature for routing and orchestration to model the system. So, the ESBs are more suited for more complex orchestration and BPM (Business Process Management). These extra enterprise level flexibility and features do come at additional cost and effort.

Q. Why do you need integration frameworks?Can you give an example where you used an integration framework?

A. Data exchanges between and within companies are very common. The number of applications which must be integrated require different technologies, protocols and data formats to be handled uniformly and efficiently. There are 3 integration frameworks available in the JVM environment : Spring Integration, Mule ESB and Apache Camel. These frameworks implement the well-known Enteprise Integration Patterns (EIP) and therefore offer a standardized, domain-specific language (DSL) to integrate applications. These integration frameworks can be used in almost every integration project within the JVM environment – no matter which technologies, transport protocols or data formats are used. If you know one of these frameworks, you can learn the others very easily as they use similar concepts. Each framework has its on pros and cons, it is always pays to do a "proof of concept" to see if it serves your purpose.

This blog entry provides a simple example as to how to make use of Spring integration to write a file polling task. Spring integration framework is used for watching for file that ends with ".end" or ".END" in a given folder and once the file arrives, kick off a method to process that file.

Step 1: Define the applicationContext-myapp.xml file to define the Spring integration channel, MyappInputFileHandler bean, etc.

 
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
 xmlns:task="http://www.springframework.org/schema/task" xmlns:context="http://www.springframework.org/schema/context"
 xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
 xmlns:int="http://www.springframework.org/schema/integration"
 xmlns:file="http://www.springframework.org/schema/integration/file"
 xmlns:util="http://www.springframework.org/schema/util"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
 http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd
 http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
 http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
 http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
 http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file-2.0.xsd
 http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.0.xsd
 http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">


 <context:property-placeholder location="classpath:myapp.properties" />
 <tx:annotation-driven />

 <!-- INTEGRATION BEANS -->

 <bean id="myappInputFileHandler"
  class="com.myapp.MyappInputFileHandler">
  <constructor-arg value="${myapp.file.path}" />
  <constructor-arg value="${myapp.file.regex}" />
 </bean>
 
 <int:channel id="fileIn"></int:channel>

 <file:inbound-channel-adapter id="inputChannelAdapter"
  channel="fileIn" directory="${myapp.file.path}"
  prevent-duplicates="false" filename-regex="${myapp.file.regex}">

  <int:poller id="poller" fixed-delay="5000" />

 </file:inbound-channel-adapter>


 <int:service-activator id="inputFileServiceActivator"
  input-channel="fileIn" method="processFile" ref="myappInputFileHandler" />

  
</beans> 


Step 2: Define the myapp.properties file. These properties are used by both the spring context file and the MyappInputFileHandler Java class.
 
#read from within spring context file
myapp.file.path=C:\\TEMP\\myapp\\
myapp.file.regex=.*\\.(end|END)

#properties read from the MyappInputFileHandler
cashforecast.job.lock.retry.count=5
cashforecast.job.lock.retry.wait.duration=20000


Step 3: The final step is to define the MyappInputFileHandler class and the processFile(..) method that gets invoked.
 
package com.myapp;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.Semaphore;

import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;


public class MyappInputFileHandler {

 private final static Logger logger = LoggerFactory.getLogger(MyappInputFileHandler.class);

 private static final String CSV_EXT = ".CSV";
 private static final String METACSV_EXT = ".META_CSV";

 private static final Semaphore mutex = new Semaphore(1);

 @Value("${myapp.job.lock.retry.wait.duration}")
 private long lockRetryWaitDuration;

 @Value("${myapp.job.lock.retry.count}")
 private long lockRetryCount;

 public MyappInputFileHandler(String path, String fileRegEx) {

  logger.info("Launching Job CashForecasting. Polling Folder: ".concat(path).concat(" for file ").concat(fileRegEx));
 }

 public void processFile(File file) {
  try {
   String fileNameNoExtension = file.getAbsolutePath().substring(0, file.getAbsolutePath().length() - 8);

   // Delete .end file and Move .META_CSV and .CSV File to process
   // folder
   mutex.acquire();

   File csvFile = new File(fileNameNoExtension.concat(CSV_EXT));
   File metaCsvFile = new File(fileNameNoExtension.concat(METACSV_EXT));
   file.delete();

   mutex.release();

   // Create Lock File
   File lockFile = createLockFile(csvFile);

   // further file processing ....................

  } catch (Exception ex) {
   logger.error("Error processing myapp: ", ex);
   throw new RuntimeException(ex);
  }
 }

 private File createLockFile(File csvFile) {

  if (!csvFile.exists()) {
   throw new RuntimeException("File not found: " + csvFile.getAbsolutePath());
  }

  String feedKey = "some key";

  File lockFile = new File(csvFile.getParent() + File.separator + feedKey.concat(".lock"));
  boolean fileCreated = false;

  int count = 0;
  while (lockFile.exists()) {
   try {
    Thread.sleep(lockRetryWaitDuration);
   } catch (InterruptedException e) {
    logger.error("Interrupted ", e);
   }

   if (++count > lockRetryCount) {
    throw new RuntimeException("Timedout acquiring a lock file for file : " + csvFile.getAbsolutePath() + " and "
      + "input data: " + feedKey);
   }
  }

  try {
      //Apache io library 
   FileUtils.touch(lockFile);
  } catch (IOException e) {
   logger.error("Error creating a lock file: " + lockFile.getAbsolutePath(), e);
   throw new RuntimeException(e);
  }

  if (logger.isDebugEnabled() && fileCreated) {
   logger.debug("Lock file created: " + lockFile.getAbsolutePath());
  }

  return lockFile;

 }

}


Another alternative to achieve similar behavior is to use Apache Camel.  Both Apache Camel and Spring Integration are light weight integration frameworks. It is very common in polling for a file in batch processes. Spring batch is a good fit for writing batch jobs in Java. You could also use this approach to transfer internal data files to/from external parties which usually requires format conversions and sending it using ftp/sftp/scp, etc or attaching it to an email and sending it out.

Another typical use case is to write your own automated custom end to end application integration testing framework using a light weight integration testing framework like Apache Camel. Your custom integration testing framework will have routes defined  for testing different tasks in an end to end manner.

1. Extracting files from System A.
2. Transforming the extracts to format that Systems B and C  can understand.
3. Publish the message to a JMS queue so that Systems B and C can load that data   into their databases.
4. Invoke a RESTful Web Service on System B to produce a report that needs to be emailed to a number of recipients.

So, good knowledge of integration frameworks like Spring Integration, Apache Camel, etc and enterprise service buses like Mule will be a plus to sell yourself in the job interviews.


Labels: ,

0 Comments:

Post a Comment

Subscribe to Post Comments [Atom]

<< Home