001package org.consensusj.analytics.util.collector; 002 003import org.slf4j.Logger; 004import org.slf4j.LoggerFactory; 005 006import java.util.AbstractQueue; 007import java.util.Comparator; 008import java.util.List; 009import java.util.PriorityQueue; 010import java.util.function.BinaryOperator; 011import java.util.function.Function; 012import java.util.stream.Collectors; 013 014/** 015 * Accumulates the {@code n} largest slices of a stream of objects of type {@code <E>}. Uses a {@link PriorityQueue} 016 * implementation of {@link AbstractQueue} internally. 017 * <p> 018 * This class is <b>not</b> thread-safe and is <b>not</b> designed for use with a {@link java.util.stream.Collector.Characteristics#CONCURRENT} 019 * {@link java.util.stream.Collector}. Switching the {@link AbstractQueue} implementation to {@link java.util.concurrent.PriorityBlockingQueue} 020 * will work (with reduced performance) but the implementations of {@link #accumulate(Object)}, {@link #combine(LargestSliceAccumulator)}, 021 * and {@link #drain()} are not thread-safe. Even if this class were thread-safe, enabling {@code CONCURRENT} 022 * in a {@code Collector} using this class would <b>reduce</b> performance because multiple threads would be trying to merge Objects 023 * into the same accumulator. The optimal way to use this class is with one accumulator per thread 024 * and this is what Collector will do without the {@code CONCURRENT} flag. 025 * 026 * @param <E> must be comparable because it is the second sort field 027 * @param <N> numeric type for Slice Size 028 */ 029public final class LargestSliceAccumulator<E, N extends Number & Comparable<? super N>> { 030 private static final Logger log = LoggerFactory.getLogger(LargestSliceAccumulator.class); 031 private final int n; // Maximum number of slices to track 032 private final Function<E, N> sliceSizeExtractor; 033 private final BinaryOperator<N> additionOperator; 034 private final AbstractQueue<E> sliceQueue; 035 private final Comparator<E> comparator; 036 private N otherTotal; 037 038 /** 039 * Construct 040 * @param n maximum number of keys (addresses) to track 041 * @param sliceSizeExtractor Function to compute the slice size 042 * @param zero The value zero for type N 043 * @param additionOperator binary addition operator for type N 044 */ 045 public LargestSliceAccumulator(int n, 046 Function<E, N> sliceSizeExtractor, 047 N zero, 048 BinaryOperator<N> additionOperator) { 049 if (n < 1) { 050 throw new IllegalArgumentException("parameter must be 1 or greater"); 051 } 052 this.n = n; 053 this.sliceSizeExtractor = sliceSizeExtractor; 054 this.additionOperator = additionOperator; 055 this.otherTotal = zero; 056 this.comparator = Comparator.comparing(sliceSizeExtractor); 057 log.trace("Creating accum queue"); 058 sliceQueue = new PriorityQueue<>(n, Comparator.comparing(sliceSizeExtractor)); 059 } 060 061 /** 062 * Add a new slice to the accumulator 063 * 064 * @param newSlice slice to accumulate 065 */ 066 void accumulate(E newSlice) { 067 //log.trace("accumulating slice of size: {}", sliceSizeExtractor.apply(newSlice).doubleValue()); 068 sliceQueue.add(newSlice); 069 drain(); // Remove extra elements. 070 } 071 072 /** 073 * Combine two accumulators 074 * 075 * @param other the other accumulator 076 * @return the combined accumulator (this) 077 */ 078 LargestSliceAccumulator<E, N> combine(LargestSliceAccumulator<E, N > other) { 079 other.sliceQueue.forEach(this::accumulate); 080 otherTotal = plus(otherTotal, other.otherTotal); 081 return this; 082 } 083 084 // It's simpler (and possibly faster) to let the queue do the size comparison 085 // and just remove extra elements rather than implement our own checks _before_ 086 // adding to the queue. 087 private void drain() { 088 while (sliceQueue.size() > n) { 089 E removed = sliceQueue.poll(); 090 if (removed != null) { 091 otherTotal = plus(otherTotal, sliceSizeExtractor.apply(removed)); 092 } 093 } 094 } 095 096 /** 097 * Sort the sliceQueue and return as a {@link List}. 098 * Normally, This should only be called by the Collector finisher function. 099 * 100 * @return List of slices sorted by their slice size (based on extractor) 101 */ 102 List<E> getSortedSliceList() { 103 return sliceQueue.stream() 104 .sorted(comparator) 105 .collect(Collectors.toList()); 106 } 107 108 /** 109 * Get the total "slice size" of processed elements that are not saved in this accumulator. 110 * 111 * @return total slice size of other elements. 112 */ 113 N getTotalOther() { 114 return otherTotal; 115 } 116 117 private N plus(N n1, N n2) { 118 return additionOperator.apply(n1, n2); 119 } 120}