Nested Collectors in Java 8
I work with demographic data. I have a collection of records for different counties in a state (multiple records per county) that I want to aggregate by county.
I have implemented the following Consumer:
public class CountyPopulation implements java.util.function.Consumer<Population>
{
private String countyId ;
private List<Demographic> demographics ;
public CountyPopulation()
{
demographics = new ArrayList<Demographic>() ;
}
public List<Demographic> getDemographics()
{
return demographics ;
}
public void accept(Population pop)
{
if ( countyId == null )
{
countyId = pop.getCtyId() ;
}
demographics.add( pop.getDemographic() ) ;
}
public void combine(CountyPopulation other)
{
demographics.addAll( other.getDemographics() ) ;
}
}
This CountyPopulation function is used to aggregate data for a specific county using the following code (where "089" is the county ID):
CountyPopulation ctyPop = populations
.stream()
.filter( e -> "089".equals( e.getCtyId() ) )
.collect(CountyPopulation::new,
CountyPopulation::accept,
CountyPopulation::combine) ;
Now I would like to remove the "filter" and group the records by county before using my aggregator.
Based on your first answers, I understand that it can be done using the Collector.of static function like this:
Map<String,CountyPopulation> pop = populations
.stream()
.collect(
Collectors.groupingBy(Population::getCtyId,
Collector.of( CountyPopulation::new,
CountyPopulation::accept,
(a,b)->{a.combine(b); return a;} ))) ;
However, this code doesn't work because Collector.of () has a different signature than collect (). I suspect the solution involves modifying the CountyPopulation class so that it implements java.util.function.BiConsumer instead of java.util.function.Consumer, but my attempt to do this did not work and I do not understand why.
source to share
Calling collect
with three arguments is not Stream
equivalent to using Collector.of
.
So, you can achieve your goal using:
Map<String,CountyPopulation> pop = populations.stream().collect( Collectors.groupingBy(Population::getCtyId, Collector.of( CountyPopulation::new, CountyPopulation::accept, CountyPopulation::combine))) ;
For best concurrent performance, it's worth Characteristics
looking into the optional you can provide. If one or both of UNORDERED
or CONCURRENT
matches the behavior of your class CountyPopulation
, you can provide them ( IDENTITY_FINISH
implied in your case).
And using groupingByConcurrent
instead groupingBy
can improve parallel performance as well.
source to share
OK, finally I got it to work, but I had to explicitly add the characteristics parameter:
ConcurrentMap<String,CountyPopulation> pop = populations
.parallelStream().collect(
Collectors.groupingByConcurrent(
Population::getCtyId,
Collector.of(
CountyPopulation::new,
CountyPopulation::accept,
(a,b)-> {a.combine(b); return a; },
Characteristics.IDENTITY_FINISH ) ) ) ;
source to share