Spring Package how to filter duplicate items before submitting to ItemWriter

I read a flat file (e.g. CSV file with 1 line per user, Ex: UserId; Data1; Date2).

But how to handle duplicate user element in reader (where there is no list of users reading the previous one) ...

stepBuilderFactory.get("createUserStep1")
.<User, User>chunk(1000)
.reader(flatFileItemReader) // FlatFileItemReader
.writer(itemWriter) // For example JDBC Writer
.build();

      

+3


source to share


2 answers


Filtering is usually done with ItemProcessor

. If it ItemProcessor

returns null, the element is filtered and not passed to ItemWriter

. Otherwise. In your case, you can save the list of previously seen users in ItemProcessor

. If the user was not seen before, submit it. If seen before, return null. ItemProcessor

You can read more about filtering using here: http://docs.spring.io/spring-batch/trunk/reference/html/readersAndWriters.html#filiteringRecords



/**
* This implementation assumes that there is enough room in memory to store the duplicate
* Users.  Otherwise, you'd want to store them somewhere you can do a look-up on.
*/
public class UserFilterItemProcessor implements ItemProcessor<User, User> {

    // This assumes that User.equals() identifies the duplicates
    private Set<User> seenUsers = new HashSet<User>();

    public User process(User user) {
        if(seenUsers.contains(user)) {
            return null;
        }
        seenUsers.add(user);
        return user;

    }
}

      

+12


source


As you could see here http://docs.spring.io/spring-batch/trunk/reference/html/readersAndWriters.html#faultTolerant

When a chunk is rolled back, items that were cached while being read can be recycled. If a step is configured for failover (typically uses skipped or reprocessing), any ItemProcessor used must be implemented in such a way that it is idempotent

This means that in Michael's example, when the user first accesses the computer, the user is cached in the Set and if there is a failure. Writing an item, if the step is failsafe, the processor will execute again for the same user and this filter will filter out the user.



Improved code:

/**
 * This implementation assumes that there is enough room in memory to store the duplicate
 * Users.  Otherwise, you'd want to store them somewhere you can do a look-up on.
 */
public class UserFilterItemProcessor implements ItemProcessor<User, User> {

    // This assumes that User.equals() identifies the duplicates
    private Set<User> seenUsers = new HashSet<User>();

    public User process(User user) {
        if(seenUsers.contains(user) && !user.hasBeenProcessed()) {
            return null;
        } else {
            seenUsers.add(user);
            user.setProcessed(true);
            return user;
        }
    }
}

      

+3


source







All Articles