Read CSV file at the same time using Spring Integration

I would like to process a CSV file at the same time using spring integration. Each line will be converted into a separate message. So, assuming I have 10K lines in a CSV file, I would like to start 10 Threads, each line will go to that thread. it would be great if someone could show me an example with examples.

thank

+1


source to share


2 answers


Since Spring Integration 4.0 <splitter>

supports Iterator

both payload

split. Hence, you can convert incoming File

to LineIterator

and process messages for each line in paralle if output-channel

of <splitter>

is ExecutorChannel

:



<splitter input-channel="splitChannel" output-channel="executorChannel"
          expression="T(org.apache.commons.io.FileUtils).lineIterator(payload)"/>

      

+1


source


I am using Spring Integration 4.1.0 and tried your suggestion, but it doesn't seem to work for me. Today I thought about it a bit and now I am leaning towards the fact that this is a Spring Integration 4.1.0 bug.

See if it makes sense to explain.

If you try this example, you will see that it works (note that this does NOT use your SpEL example):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:int-stream="http://www.springframework.org/schema/integration/stream" xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
        http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <int:inbound-channel-adapter id="exchangeReplayFileAdapter" ref="exchangeReplayFileReadingMessageSource" method="receive" auto-startup="true" channel="channel1">
        <int:poller fixed-delay="10000000" />
    </int:inbound-channel-adapter>

    <bean id="exchangeReplayFileReadingMessageSource" class="org.springframework.integration.file.FileReadingMessageSource">
        <property name="directory" value="/tmp/inputdir" />
    </bean>

    <int:channel id="channel1">
        <int:dispatcher task-executor="taskExecutor" />
    </int:channel>

    <int:splitter input-channel="channel1" output-channel="channel2">
        <bean class="com.xxx.common.util.springintegration.FileSplitter" />
    </int:splitter>

    <int:channel id="channel2"></int:channel>
    <int-stream:stdout-channel-adapter channel="channel2"></int-stream:stdout-channel-adapter>

    <task:executor id="taskExecutor" pool-size="1" />
</beans>

      

With this implementation Splitter

...

package com.xxx.common.util.springintegration;

import java.io.File;
import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

public class FileSplitter extends AbstractMessageSplitter {
    private static final Logger log = LoggerFactory.getLogger(FileSplitterNew.class);

    public Object splitMessage(Message<?> message) {
        if (log.isDebugEnabled()) {
            log.debug(message.toString());
        }
        try {

            Object payload = message.getPayload();
            Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload");
            return org.apache.commons.io.FileUtils.lineIterator((File) payload);
        } catch (IOException e) {
            String msg = "Unable to transform file: " + e.getMessage();
            log.error(msg);
            throw new MessageTransformationException(msg, e);
        }
    }

}

      

Using the Spel example:

<int:splitter input-channel="exchangeReplayFiles" output-channel="exchangeSpringQueueChannel"  
    expression="T(org.apache.commons.io.FileUtils).lineIterator(payload)"/>

      

what is internally generated by the parser is this (note the type List.class

passed to the constructor ExpressionEvaluatingMessageProcessor

:

/**
 * A Message Splitter implementation that evaluates the specified SpEL
 * expression. The result of evaluation will typically be a Collection or
 * Array. If the result is not a Collection or Array, then the single Object
 * will be returned as the payload of a single reply Message.
 *
 * @author Mark Fisher
 * @author Gary Russell
 * @since 2.0
 */
public class ExpressionEvaluatingSplitter extends AbstractMessageProcessingSplitter {

    @SuppressWarnings({"unchecked", "rawtypes"})
    public ExpressionEvaluatingSplitter(Expression expression) {
        super(new ExpressionEvaluatingMessageProcessor(expression, List.class));
    }

}

      



And the class ExpressionEvaluatingMessageProcessor

:

/**
 * A {@link MessageProcessor} implementation that evaluates a SpEL expression
 * with the Message itself as the root object within the evaluation context.
 *
 * @author Mark Fisher
 * @author Artem Bilan
 * @since 2.0
 */
public class ExpressionEvaluatingMessageProcessor<T> extends AbstractMessageProcessor<T> {

    private final Expression expression;

    private final Class<T> expectedType;


  ...
    /**
     * Create an {@link ExpressionEvaluatingMessageProcessor} for the given expression
     * and expected type for its evaluation result.
     * @param expression The expression.
     * @param expectedType The expected type.
     */
    public ExpressionEvaluatingMessageProcessor(Expression expression, Class<T> expectedType) {
        Assert.notNull(expression, "The expression must not be null");
        try {
            this.expression = expression;
            this.expectedType = expectedType;
        }
        catch (ParseException e) {
            throw new IllegalArgumentException("Failed to parse expression.", e);
        }
    }

    /**
     * Processes the Message by evaluating the expression with that Message as the
     * root object. The expression evaluation result Object will be returned.
     * @param message The message.
     * @return The result of processing the message.
     */
    @Override
    public T processMessage(Message<?> message) {
        return this.evaluateExpression(this.expression, message, this.expectedType);
    }
...

}

      

What is returned from the provided sample ends up as ArrayList

(which implements the interface Collection

) containing one element LineIterator

.

ExpressionEvaluatingSplitter

is a subclass AbstractMessageSplitter

and does NOT override the method handleRequestMessage(Message<?> message)

.
This method looks like this:

public abstract class AbstractMessageSplitter extends AbstractReplyProducingMessageHandler {
    protected final Object handleRequestMessage(Message<?> message) {
        Object result = this.splitMessage(message);
        // return null if 'null'
        if (result == null) {
            return null;
        }

        Iterator<Object> iterator;
        final int sequenceSize;
        if (result instanceof Collection) {
            Collection<Object> items = (Collection<Object>) result;
            sequenceSize = items.size();
            iterator = items.iterator();
        }
        else if (result.getClass().isArray()) {
            Object[] items = (Object[]) result;
            sequenceSize = items.length;
            iterator = Arrays.asList(items).iterator();
        }
        else if (result instanceof Iterable<?>) {
            sequenceSize = 0;
            iterator = ((Iterable<Object>) result).iterator();
        }
        else if (result instanceof Iterator<?>) {
            sequenceSize = 0;
            iterator = (Iterator<Object>) result;
        }
        else {
            sequenceSize = 1;
            iterator = Collections.singleton(result).iterator();
        }

        if (!iterator.hasNext()) {
            return null;
        }

        final MessageHeaders headers = message.getHeaders();
        final Object correlationId = headers.getId();
        final AtomicInteger sequenceNumber = new AtomicInteger(1);

        return new FunctionIterator<Object, AbstractIntegrationMessageBuilder<?>>(iterator,
                new Function<Object, AbstractIntegrationMessageBuilder<?>>() {
                    @Override
                    public AbstractIntegrationMessageBuilder<?> apply(Object object) {
                        return createBuilder(object, headers, correlationId, sequenceNumber.getAndIncrement(),
                                sequenceSize);
                    }
                });
    }

      

Since it ArrayList

really is Collection

, it never gets into logic where it sets up an iterator, and therefore never calls next()

on an iterator in a method produceOutput(...)

.

So why is it LineIterator

intertwined with ArrayList

? I believe there is a mistake in ExpressionEvaluatingSplitter

that it always does this:

public ExpressionEvaluatingSplitter(Expression expression) {
    super(new ExpressionEvaluatingMessageProcessor(expression, List.class));
}

      

I think that in Spring Integration 4 now you need to look at the type that the expression expresses (either List

, or Iterator

) and then call the super (it might need to be refactored, as is done after the type selection would have been done before calling the super, which the JVM will not allow) ...

What do you think?

+1


source







All Articles