001package org.consensusj.analytics.util.collector;
002
003import org.slf4j.Logger;
004import org.slf4j.LoggerFactory;
005
006import java.util.AbstractQueue;
007import java.util.Comparator;
008import java.util.List;
009import java.util.PriorityQueue;
010import java.util.function.BinaryOperator;
011import java.util.function.Function;
012import java.util.stream.Collectors;
013
014/**
015 * Accumulates the {@code n} largest slices of a stream of objects of type {@code <E>}. Uses a {@link PriorityQueue}
016 * implementation of {@link AbstractQueue} internally.
017 * <p>
018 * This class is <b>not</b> thread-safe and is <b>not</b> designed for use with a {@link java.util.stream.Collector.Characteristics#CONCURRENT}
019 * {@link java.util.stream.Collector}. Switching the {@link AbstractQueue} implementation to {@link java.util.concurrent.PriorityBlockingQueue}
020 * will work (with reduced performance) but the implementations of {@link #accumulate(Object)}, {@link #combine(LargestSliceAccumulator)},
021 * and {@link #drain()} are not thread-safe. Even if this class were thread-safe, enabling {@code CONCURRENT}
022 * in a {@code Collector} using this class would <b>reduce</b> performance because multiple threads would be trying to merge Objects
023 * into the same accumulator. The optimal way to use this class is with one accumulator per thread
024 * and this is what Collector will do without the {@code CONCURRENT} flag.
025 * 
026 * @param <E> must be comparable because it is the second sort field
027 * @param <N> numeric type for Slice Size
028 */
029public final class LargestSliceAccumulator<E, N extends Number & Comparable<? super N>>  {
030    private static final Logger log = LoggerFactory.getLogger(LargestSliceAccumulator.class);
031    private final int n;  // Maximum number of slices to track
032    private final Function<E, N> sliceSizeExtractor;
033    private final BinaryOperator<N> additionOperator;
034    private final AbstractQueue<E> sliceQueue;
035    private final Comparator<E> comparator;
036    private N otherTotal;
037    
038    /**
039     * Construct
040     * @param n maximum number of keys (addresses) to track
041     * @param sliceSizeExtractor Function to compute the slice size
042     * @param zero The value zero for type N
043     * @param additionOperator binary addition operator for type N
044     */
045    public LargestSliceAccumulator(int n,
046                                   Function<E, N> sliceSizeExtractor,
047                                   N zero,
048                                   BinaryOperator<N> additionOperator) {
049        if (n < 1) {
050            throw new IllegalArgumentException("parameter must be 1 or greater");
051        }
052        this.n = n;
053        this.sliceSizeExtractor = sliceSizeExtractor;
054        this.additionOperator = additionOperator;
055        this.otherTotal = zero;
056        this.comparator = Comparator.comparing(sliceSizeExtractor);
057        log.trace("Creating accum queue");
058        sliceQueue = new PriorityQueue<>(n, Comparator.comparing(sliceSizeExtractor));
059    }
060
061    /**
062     * Add a new slice to the accumulator
063     *
064     * @param newSlice slice to accumulate
065     */
066    void accumulate(E newSlice) {
067        //log.trace("accumulating slice of size: {}", sliceSizeExtractor.apply(newSlice).doubleValue());
068        sliceQueue.add(newSlice);
069        drain();    // Remove extra elements.
070    }
071
072    /**
073     * Combine two accumulators
074     *
075     * @param other the other accumulator
076     * @return the combined accumulator (this)
077     */
078    LargestSliceAccumulator<E, N> combine(LargestSliceAccumulator<E, N > other) {
079        other.sliceQueue.forEach(this::accumulate);
080        otherTotal = plus(otherTotal, other.otherTotal);
081        return this;
082    }
083
084    // It's simpler (and possibly faster) to let the queue do the size comparison
085    // and just remove extra elements rather than implement our own checks _before_
086    // adding to the queue.
087    private void drain() {
088        while (sliceQueue.size() > n) {
089            E removed = sliceQueue.poll();
090            if (removed != null) {
091                otherTotal = plus(otherTotal, sliceSizeExtractor.apply(removed));
092            }
093        }
094    }
095    
096    /**
097     * Sort the sliceQueue and return as a {@link List}.
098     * Normally, This should only be called by the Collector finisher function.
099     * 
100     * @return List of slices sorted by their slice size (based on extractor)
101     */
102    List<E> getSortedSliceList() {
103        return sliceQueue.stream()
104                .sorted(comparator)
105                .collect(Collectors.toList());
106    }
107
108    /**
109     * Get the total "slice size" of processed elements that are not saved in this accumulator.
110     *
111     * @return total slice size of other elements.
112     */
113    N getTotalOther() {
114        return otherTotal;
115    }
116
117    private N plus(N n1, N n2) {
118        return additionOperator.apply(n1, n2);
119    }
120}