001package org.consensusj.bitcoin.rx.jsonrpc;
002
003import io.reactivex.rxjava3.core.BackpressureStrategy;
004import io.reactivex.rxjava3.core.Maybe;
005import io.reactivex.rxjava3.core.Observable;
006import org.consensusj.bitcoin.json.pojo.ChainTip;
007import org.consensusj.bitcoin.jsonrpc.ChainTipClient;
008import org.consensusj.bitcoin.rx.ChainTipService;
009import org.consensusj.rx.jsonrpc.RxJsonRpcClient;
010import org.reactivestreams.Publisher;
011import org.slf4j.Logger;
012import org.slf4j.LoggerFactory;
013
014/**
015 * Interface with {@link PollingChainTipService#pollForDistinctChainTip()} method.
016 * @deprecated Use the {@link PollingChainTipServiceImpl} implementation
017 */
018@Deprecated
019public interface PollingChainTipService extends ChainTipService, ChainTipClient, RxJsonRpcClient {
020    Logger log = LoggerFactory.getLogger(PollingChainTipService.class);
021
022    /**
023     * Implement this method to provide a polling interval
024     *
025     * @return polling interval with desired frequency for polling for new ChainTips.
026     */
027    Observable<Long> getPollingInterval();
028
029    /**
030     * Using a polling interval provided by {@link PollingChainTipService#getPollingInterval()} provide a
031     * stream of distinct {@link ChainTip}s.
032     *
033     * @return A stream of distinct {@code ChainTip}s.
034     */
035    default Publisher<ChainTip> pollForDistinctChainTip() {
036        return getPollingInterval()
037                .doOnNext(t -> log.debug("got interval"))
038                .flatMapMaybe(t -> this.currentChainTipMaybe())
039                .doOnNext(tip -> log.debug("blockheight, blockhash = {}, {}", tip.getHeight(), tip.getHash()))
040                .distinctUntilChanged(ChainTip::getHash)
041                .doOnNext(tip -> log.info("** NEW ** blockheight, blockhash = {}, {}", tip.getHeight(), tip.getHash()))
042                // ERROR backpressure strategy is compatible with BehaviorProcessor since it subscribes to MAX items
043                .toFlowable(BackpressureStrategy.ERROR);
044    }
045
046    /**
047     * Get the active chain tip if there is one (useful for polling clients)
048     *
049     * @return The active ChainTip if available (onSuccess) otherwise onComplete (if not available) or onError (if error occurred)
050     */
051    private Maybe<ChainTip> currentChainTipMaybe() {
052        return pollOnceAsync(this::getChainTipsAsync)
053                .mapOptional(ChainTip::findActiveChainTip);
054    }
055}