Adding Spring-Integration to a Boot application

Finally using Enterprise Integration Patterns ;)
June 29, 2016 by Michael

This weeks post has some anecdotally value for me. 9 years ago I wrote a horrible tutorial about threads, which I really liked back then. In the meantime I often thought I should delete the post and only kept it around to prove some improvements on my side 😉

Anyway, the code in that tutorial has been the foundation for a module that is running for near as long as the blog post is old. My company uses it on several occasions for data transfer of all kind, being it files, checking emails or ftp servers and it works pretty well. The downside: It’s pretty much not maintainable, the code is bad and half the stuff or more I wrote can be replaced with messaging or a real service bus.

In the first iteration of rewrite I did exactly that: I used a Java Messaging Service implementation for moving stuff around and did the rest myself. Luckily this time I noticed…

Entering Spring Integration:

Extends the Spring programming model to support the well-known Enterprise Integration Patterns. Spring Integration enables lightweight messaging within Spring-based applications and supports integration with external systems via declarative adapters. Those adapters provide a higher-level of abstraction over Spring’s support for remoting, messaging, and scheduling. Spring Integration’s primary goal is to provide a simple model for building enterprise integration solutions while maintaining the separation of concerns that is essential for producing maintainable, testable code.

I really don’t care at the moment which backend or transport I use… As long as I know it’s swappable, I’m fine.

Getting up and running in a Spring Boot application is as easy as adding the dependencies

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>	    	
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-file</artifactId>	  	
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-java-dsl</artifactId>
    <version>1.1.2.RELEASE</version>
</dependency>
@SpringBootApplication
@EnableIntegration
public class Application {
}

Nothing fancy here.

You can either create your flows throw XML Definitions which has the benefit of being supported by Spring Tool Suite™ or as @Beans. For the later, you’ll find a nice sample here.

I opted for the bean version, mainly for one reason: I have to support multiple, differently configured instances of one or more flows.

I have a list of workflow builders (custom interface) in an @Configuration class that gets auto populated through @EnableConfigurationProperties and in a loop i use the method described in this GitHub issue:

@PostConstruct
void initializeWorkflows() {	
    this.workflowBuilders.stream().flatMap(workflowBuilder -> workflowBuilder.build().entrySet().stream()).forEach(workflow -> {
        beanFactory.registerSingleton(workflow.getKey(), workflow.getValue());
        beanFactory.initializeBean(workflow.getValue(), workflow.getKey());
    });	
}

The “Cafe Demo” helped a lot to get started with the Java DSL, especially the lambda version.

To demonstrate what is possible, I can show you my current task, which can be roughly explained as follow “Watch one ore more directories for a given tenant, detect CSV and excel files, split the sheets of excel sheets into separate content and load all contents into a database and call a stored procedure afterwards”.

The first part is really simple:

// 1
final FileListFilter<File> fileListFilter = new CompositeFileListFilter<>(Arrays.asList(
    new FileSystemPersistentAcceptOnceFileListFilter(this.metadataStore, "exceldatenaustausch:"),
    new FilesOnlyFileListFilter()		    
));
final FileReadingMessageSource reader = new FileReadingMessageSource();
reader.setDirectory(new File(s.directory));
reader.setFilter(fileListFilter);
 
// 2	            
IntegrationFlows
    .from(reader, endpoint -> endpoint.poller(Pollers.fixedRate(s.intervall, TimeUnit.SECONDS)))
    .enrich(enricher -> enricher
    	.header("tenant", s.tenant)
    	.<File>headerFunction("contentType", message -> contentTypeMap.getOrDefault(tika.detect(message.getPayload()), "unknown"))
    )
    .route(payload -> "exceldatenaustausch-incoming-channel")
    .get();
  1. Setup a file reading message source, that watches a directory, accepts only files and those only once
  2. Poll that reader every n seconds, add a tenant number and the content type to each file message and route them to a specific channel

And all the rest is easy as well:

IntegrationFlows
    .from("exceldatenaustausch-incoming-channel")
    // 1
    .filter("!headers.contentType.equals('unknown')")
    // 2
    .route("headers.contentType", mapping -> mapping
        .channelMapping("csv",   "read-csv-channel")
        .channelMapping("excel", "split-excel-channel")
    )
    .channel("read-csv-channel")
        // 3
        .transform(File.class, Strings::fromFile)
        .route(payload -> "store-content-channel")
    .channel("split-excel-channel")
        // 4
        .split(new ExcelSplitter(locale, linebreak, separator, dateFormat), "extractSheets")
        .enrich(enricher -> enricher
        	.<ExcelSheet>headerFunction("sheetName", message -> message.getPayload().getSheetName())
        	.<ExcelSheet>headerFunction("sheetNumber", message -> message.getPayload().getSheetNumber())
        )
        // 3
        .transform(ExcelSheet.class, ExcelSheet::getContent)
        .route(payload -> "store-content-channel")
    .channel("store-content-channel")
    // 5
    .<String>handle((content, headers) -> {
        return excelDatenaustauschRepository.add((Integer)headers.get("madantennr"), (String) headers.get("fileName") content);			
    })
    // 6
    .handle(this.excelDatenaustauschService, "callfExcelDatenaustausch")
    .<Long>handle((status, headers) -> {
        LOGGER.info("Stored {} and called F_EXCEL_DATENAUSTAUSCH with result {}.", headers.get("fileName"), status);
        return null;
    })
    .get();

We see here:

  1. Filtering based on SpEL expressions
  2. Routing based on SpEL expressions and following channel mappings
  3. Transformation of messages from files to strings
  4. Splitting of messages from one file into a list of other stuff
  5. Handling of messages with an explicit handler
  6. Handling of messages by an implicit bean method

I like the result of one week work very much: The stuff is actually readable. Even if I don’t have a nice visualization from an XML based configuration, one can understand the flow just from the code and that’s much more than I had with the nearly 10 year old homegrown solution. I also have the infrastructure in place, to distribute that flow (and others) along several services and switch transports if necessary without the need for any boilerplate code.

No comments yet

Post a Comment

Your email is never published nor shared. Required fields are marked *