Spring package how to process list of data before writing in stage
I am trying to read customer data from a database and write the processed data to a flat file. But I need to process the whole result ItemReader
before writing data.
For example, I am reading Client from database rows:
public class Client {
private String id;
private String subscriptionCode;
private Boolean activated;
}
But I want to calculate and write how many users are activated grouped by subscription:
public class Subscription {
private String subscriptionCode;
private Integer activatedUserCount;
}
I don't know how to do this using ItemReader
/ ItemProcessor
/ ItemWriter
, can you help me?
BatchConfiguration:
@CommonsLog
@Configuration
@EnableBatchProcessing
@EnableAutoConfiguration
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Client, Client> chunk(1000)
.reader(new ListItemReader<Client>(new ArrayList<Client>() { // Just for test
{
add(Client.builder().id("1").subscriptionCode("AA").activated(true).build());
add(Client.builder().id("2").subscriptionCode("BB").activated(true).build());
add(Client.builder().id("3").subscriptionCode("AA").activated(false).build());
add(Client.builder().id("4").subscriptionCode("AA").activated(true).build());
}
}))
.processor(new ItemProcessor<Client, Client>() {
public Client process(Client item) throws Exception {
log.info(item);
return item;
}
})
.writer(new ItemWriter<Client>() {
public void write(List<? extends Client> items) throws Exception {
// Only here I can use List of Client
// How can I process this list before to fill Subscription objects ?
}
})
.build();
}
@Bean
public Job job1(Step step1) throws Exception {
return jobBuilderFactory.get("job1").incrementer(new RunIdIncrementer()).start(step1).build();
}
}
Main application:
public class App {
public static void main(String[] args) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {
System.exit(SpringApplication.exit(SpringApplication.run(BatchConfiguration.class, args)));
}
}
source to share
I found a solution based on ItemProcessor
:
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Client, Subscription> chunk(1000)
.reader(new ListItemReader<Client>(new ArrayList<Client>() {
{
add(Client.builder().id("1").subscriptionCode("AA").activated(true).build());
add(Client.builder().id("2").subscriptionCode("BB").activated(true).build());
add(Client.builder().id("3").subscriptionCode("AA").activated(false).build());
add(Client.builder().id("4").subscriptionCode("AA").activated(true).build());
}
}))
.processor(new ItemProcessor<Client, Subscription>() {
private List<Subscription> subscriptions;
public Subscription process(Client item) throws Exception {
for (Subscription s : subscriptions) { // try to retrieve existing element
if (s.getSubscriptionCode().equals(item.getSubscriptionCode())) { // element found
if(item.getActivated()) {
s.getActivatedUserCount().incrementAndGet(); // increment user count
log.info("Incremented subscription : " + s);
}
return null; // existing element -> skip
}
}
// Create new Subscription
Subscription subscription = Subscription.builder().subscriptionCode(item.getSubscriptionCode()).activatedUserCount(new AtomicInteger(1)).build();
subscriptions.add(subscription);
log.info("New subscription : " + subscription);
return subscription;
}
@BeforeStep
public void initList() {
subscriptions = Collections.synchronizedList(new ArrayList<Subscription>());
}
@AfterStep
public void clearList() {
subscriptions.clear();
}
})
.writer(new ItemWriter<Subscription>() {
public void write(List<? extends Subscription> items) throws Exception {
log.info(items);
// do write stuff
}
})
.build();
}
But I need to store the second Subscription
List in ItemProcessor
(I don't know if the stream is safe and efficient?). What do you think of this decision?
source to share
If I understand from your comments, you need to compile a summary of the activated account, right?
You can create Subscription
for each Client
one you process and ItemWriterLister.afterWrite
write the created above created items Subscription
to the database.
source to share