Spring Inbound-Channel-Adapter Integration to Read Large Files One by One

I am currently using Spring Integration 4.1.0 with Spring 4.1.2. I have a requirement to be able to read a file in turn and use each line read as a message. Basically, I want to allow "redo" for one of our message sources, but the messages are not saved in separate files, but rather in a single file. I have no transactional requirements for this use case. My requirements are similar to this post, except for the file residing on the same server as the JVM server: spring integration - reading remote file line by line

As I see it I have the following options:

1. Use int-file:inbound-channel-adapter

to read a file, then "split" this file so that 1 message now becomes multiple messages. Sample config file:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

        <int-file:inbound-channel-adapter id="filereader" directory="/tmp" filename-pattern="myfile.txt" channel="channel1"/>
        <int-file:file-to-string-transformer input-channel="channel1" output-channel="channel2"/>
        <int:channel id="channel1"/>
        <int:splitter input-channel="channel2" output-channel="nullChannel"/>
        <int:channel id="channel2"/>
    </beans>

      

The problem is that the file is very large and when using the above method the whole file is first read into memory and then split and the JVM ends up from the heap. In fact, you need to follow these steps: read the string and convert the string to a message, send the message, delete the message from memory, repeat.

  1. Use int-file:tail-inbound-channel-adapter

    with end="false"

    (which basically indicates read from the beginning of the file). Start and stop this adapter as needed for each file (change the filename before each start). Sample config file:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
    
        <int-file:tail-inbound-channel-adapter id="apache"
            channel="exchangeSpringQueueChannel"
            task-executor="exchangeFileReplayTaskExecutor"
            file="C:\p2-test.txt"
            delay="1"
            end="false"
            reopen="true"
            file-delay="10000" />
    
        <int:channel id="exchangeSpringQueueChannel" />
        <task:executor id="exchangeFileReplayTaskExecutor" pool-size="1" />
    </beans>
    
          

  2. Have a Spring integration call in Spring Batch and use ItemReader

    to process the file. Allows for finer scale controls throughout the process, of course, but quite a lot of work to configure what's with the job store and the like (and I don't care about the job history, so I would either tell the job not to log status and / or use the built-in memory MapJobRepository

    ).

4. Create your own FileLineByLineInboundChannelAdapter

by expanding MessageProducerSupport

. Most of the code can be borrowed from ApacheCommonsFileTailingMessageProducer

(also see http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter ). Below is a sample, but it needs to be used Thread

to read it so that I evaluate the command stop()

as I read line by line.

    package com.xxx.exchgateway.common.util.springintegration;

    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import org.apache.commons.io.IOUtils;
    import org.springframework.core.task.SimpleAsyncTaskExecutor;
    import org.springframework.core.task.TaskExecutor;
    import org.springframework.integration.core.MessageSource;
    import org.springframework.integration.endpoint.MessageProducerSupport;
    import org.springframework.integration.file.FileHeaders;
    import org.springframework.messaging.Message;
    import org.springframework.util.Assert;

    /**
     * A lot of the logic for this class came from {@link #ApacheCommonsFileTailingMessageProducer}.
     * See {@link http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter}
     */
    public class FileLineByLineInboundChannelAdapter extends MessageProducerSupport implements MessageSource<String> {
        private volatile File file;

        /**
         * The name of the file you wish to tail.
         * @param file The absolute path of the file.
         */
        public void setFile(File file) {
            Assert.notNull("'file' cannot be null");
            this.file = file;
        }

        protected File getFile() {
            if (this.file == null) {
                throw new IllegalStateException("No 'file' has been provided");
            }
            return this.file;
        }

        @Override
        public String getComponentType() {
            return "file:line-by-line-inbound-channel-adapter";
        }

        private void readFile() {
            FileInputStream fstream;
            try {
                fstream = new FileInputStream(getFile());

                BufferedReader br = new BufferedReader(new InputStreamReader(fstream));

                String strLine;

                // Read File Line By Line, make sure we honor if someone manually sets the isRunning=false (via clicking the stop() method in JMX)
                while ((strLine = br.readLine()) != null && isRunning()) {
                    send(strLine);
                }

                //Close the input stream
                IOUtils.closeQuietly(br);
                IOUtils.closeQuietly(fstream);
            } catch (FileNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        @Override
        protected void doStart() {
            super.doStart();

            // TODO this needs to be moved into it own thread since isRunning() will return "false" until this method has completed
            // and we want to honor the stop() command while we read line-by-line
            readFile();
        }

        protected void send(String line) {
            Message<?> message = this.getMessageBuilderFactory().withPayload(line).setHeader(FileHeaders.FILENAME, this.file.getAbsolutePath()).build();
            super.sendMessage(message);
        }

        @Override
        public Message<String> receive() {
            // TODO Auto-generated method stub
            return null;
        }
    }

      

I don't feel like my use case is outside the realm of typical things people might want to do, I'm surprised I can't find a solution for it out of the box. I've searched quite a bit and looked at lots of examples and unfortunately haven't found one that suits my needs yet.

My guess is that maybe I missed something obvious that the frameworks are already suggesting (although perhaps this falls into the blurry line between Spring Integraton and Spring Batch). Can anyone let me know if I am completely out of base with my ideas or if there is a simple solution I missed, or offer alternative suggestions?

+3


source to share


2 answers


Spring Integration 4.x has a nice new feature of using Iterator as messages:

Spring Integration link

Since version 4.1, AbstractMessageSplitter supports the Iterator type for splitting a value.

This allows the Iterator to be sent as messages, not reading the entire file in memory.

Here is a simple Spring example Splitting the CSV context files into one message per line:



<int-file:inbound-channel-adapter 
        directory="${inputFileDirectory:/tmp}"
        channel="inputFiles"/>

<int:channel id="inputFiles">
    <int:dispatcher task-executor="executor"/>
</int:channel>

<int:splitter 
    input-channel="inputFiles" 
    output-channel="output">
    <bean 
        class="FileSplitter" 
        p:commentPrefix="${commentPrefix:#}" />
</int:splitter>

<task:executor 
    id="executor" 
    pool-size="${poolSize:8}" 
    queue-capacity="${aueueCapacity:0}" 
    rejection-policy="CALLER_RUNS" />

<int:channel id="output"/>

      

And this is the splitter implementation :

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Iterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

public class FileSplitter extends AbstractMessageSplitter {
    private static final Logger log = LoggerFactory.getLogger(FileSplitter.class);

    private String commentPrefix = "#";

    public Object splitMessage(Message<?> message) {
        if(log.isDebugEnabled()) {
            log.debug(message.toString());
        }
        try {

            Object payload = message.getPayload();
            Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload"); 

            return new BufferedReaderFileIterator((File) payload);
        } 
        catch (IOException e) {
            String msg = "Unable to transform file: " + e.getMessage();
            log.error(msg);
            throw new MessageTransformationException(msg, e);
        }
    }

    public void setCommentPrefix(String commentPrefix) {
        this.commentPrefix = commentPrefix;
    }

    public class BufferedReaderFileIterator implements Iterator<String> {

        private File file;
        private BufferedReader bufferedReader;
        private String line;

        public BufferedReaderFileIterator(File file) throws IOException {
            this.file = file;
            this.bufferedReader = new BufferedReader(new FileReader(file));
            readNextLine();
        }

        @Override
        public boolean hasNext() {
            return line != null;
        }

        @Override
        public String next() {
            try {
                String res = this.line;
                readNextLine();
                return res;
            } 
            catch (IOException e) {
                log.error("Error reading file", e);
                throw new RuntimeException(e);
            }   
        }

        void readNextLine() throws IOException {
            do {
                line = bufferedReader.readLine();
            }
            while(line != null && line.trim().startsWith(commentPrefix));

            if(log.isTraceEnabled()) {
                log.trace("Read next line: {}", line);
            }

            if(line == null) {
                close();
            }
        }

        void close() throws IOException {
            bufferedReader.close();
            file.delete();
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }

    }

}

      

Note that the Iterator object is returned from the splitMessage () handler method.

+3


source


I also have this, I am also copying files to another folder and reading data from the file as well

fileCopyApplicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:file="http://www.springframework.org/schema/integration/file"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file
            http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/context 
            http://www.springframework.org/schema/context/spring-context.xsd">

    <context:property-placeholder />

    <file:inbound-channel-adapter id="filesIn"
        directory="E:/usmandata/logs/input/" filter="onlyPropertyFiles"
        auto-startup="true">
        <int:poller id="poller" fixed-delay="500" />
    </file:inbound-channel-adapter>



    <int:service-activator input-channel="filesIn"
        output-channel="filesOut" ref="handler" />

    <file:outbound-channel-adapter id="filesOut"
        directory="E:/usmandata/logs/output/" />




    <bean id="handler" class="com.javarticles.spring.integration.file.FileHandler" />
    <bean id="onlyPropertyFiles"
        class="org.springframework.integration.file.config.FileListFilterFactoryBean"
        p:filenamePattern="*.log" />
</beans>

      

FileHandler.java



package com.javarticles.spring.integration.file;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class FileHandler {
    public File handleFile(File input) throws IOException {
       // System.out.println("Copying file: " + input.getAbsolutePath());


        RandomAccessFile file = new RandomAccessFile(input,"r");

        FileChannel channel = file.getChannel();

        //System.out.println("File size is: " + channel.size());

        ByteBuffer buffer = ByteBuffer.allocate((int) channel.size());

        channel.read(buffer);

        buffer.flip();//Restore buffer to position 0 to read it

        System.out.println("Reading content and printing ... ");

        for (int i = 0; i < channel.size(); i++) {
            System.out.print((char) buffer.get());
        }

        channel.close();
        file.close();
        return input;
    }
}

      

SpringIntegrationFileCopyExample.java

package com.javarticles.spring.integration.file;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SpringIntegrationFileCopyExample {

    public static void main(String[] args) throws InterruptedException, IOException {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                "fileCopyApplicationContext.xml");

    }

}

      

0


source







All Articles