Spring Package how to filter duplicate items before submitting to ItemWriter
I read a flat file (e.g. CSV file with 1 line per user, Ex: UserId; Data1; Date2).
But how to handle duplicate user element in reader (where there is no list of users reading the previous one) ...
stepBuilderFactory.get("createUserStep1")
.<User, User>chunk(1000)
.reader(flatFileItemReader) // FlatFileItemReader
.writer(itemWriter) // For example JDBC Writer
.build();
source to share
Filtering is usually done with ItemProcessor
. If it ItemProcessor
returns null, the element is filtered and not passed to ItemWriter
. Otherwise. In your case, you can save the list of previously seen users in ItemProcessor
. If the user was not seen before, submit it. If seen before, return null. ItemProcessor
You can read more about filtering using here: http://docs.spring.io/spring-batch/trunk/reference/html/readersAndWriters.html#filiteringRecords
/**
* This implementation assumes that there is enough room in memory to store the duplicate
* Users. Otherwise, you'd want to store them somewhere you can do a look-up on.
*/
public class UserFilterItemProcessor implements ItemProcessor<User, User> {
// This assumes that User.equals() identifies the duplicates
private Set<User> seenUsers = new HashSet<User>();
public User process(User user) {
if(seenUsers.contains(user)) {
return null;
}
seenUsers.add(user);
return user;
}
}
source to share
As you could see here http://docs.spring.io/spring-batch/trunk/reference/html/readersAndWriters.html#faultTolerant
When a chunk is rolled back, items that were cached while being read can be recycled. If a step is configured for failover (typically uses skipped or reprocessing), any ItemProcessor used must be implemented in such a way that it is idempotent
This means that in Michael's example, when the user first accesses the computer, the user is cached in the Set and if there is a failure. Writing an item, if the step is failsafe, the processor will execute again for the same user and this filter will filter out the user.
Improved code:
/**
* This implementation assumes that there is enough room in memory to store the duplicate
* Users. Otherwise, you'd want to store them somewhere you can do a look-up on.
*/
public class UserFilterItemProcessor implements ItemProcessor<User, User> {
// This assumes that User.equals() identifies the duplicates
private Set<User> seenUsers = new HashSet<User>();
public User process(User user) {
if(seenUsers.contains(user) && !user.hasBeenProcessed()) {
return null;
} else {
seenUsers.add(user);
user.setProcessed(true);
return user;
}
}
}
source to share