001package org.consensusj.bitcoin.rx.jsonrpc;
002
003import io.reactivex.rxjava3.core.BackpressureStrategy;
004import io.reactivex.rxjava3.core.Flowable;
005import io.reactivex.rxjava3.core.Observable;
006import io.reactivex.rxjava3.disposables.Disposable;
007import io.reactivex.rxjava3.processors.BehaviorProcessor;
008import io.reactivex.rxjava3.processors.FlowableProcessor;
009import org.consensusj.bitcoin.json.pojo.ChainTip;
010import org.consensusj.bitcoin.jsonrpc.BitcoinClient;
011import org.consensusj.bitcoin.jsonrpc.ChainTipClient;
012import org.consensusj.bitcoin.rx.ChainTipPublisher;
013import org.consensusj.bitcoin.rx.ChainTipService;
014import org.consensusj.jsonrpc.JsonRpcStatusException;
015import org.consensusj.rx.jsonrpc.RxJsonRpcClient;
016import org.reactivestreams.Publisher;
017import org.slf4j.Logger;
018import org.slf4j.LoggerFactory;
019
020import java.io.Closeable;
021import java.io.IOError;
022import java.io.IOException;
023import java.util.List;
024import java.util.concurrent.CompletableFuture;
025import java.util.concurrent.Flow;
026import java.util.concurrent.TimeUnit;
027
028// TODO: Rewrite using ScheduledThreadExecutor (instead of an Observable interval) and SubmissionPublisher instead of FlowableProcessor.
029//  Then we can merge it into BitcoinClient (as a default component.)
030//  We may need to use atomic object (or something else?) to replace distinctUntilChanged
031/**
032 * Provides {@link ChainTipService} a using a {@link BitcoinClient} and a polling interval.
033 * This can be used as a fallback if ZeroMQ is not available.
034 */
035public class PollingChainTipServiceImpl implements ChainTipService, ChainTipClient, RxJsonRpcClient, Closeable {
036    private static final Logger log = LoggerFactory.getLogger(PollingChainTipServiceImpl.class);
037    private final BitcoinClient client;
038    private final Observable<Long> interval;
039    // How will we properly use backpressure here?
040    private final Flowable<ChainTip> chainTipSource;
041    private Disposable chainTipSubscription;
042    private final FlowableProcessor<ChainTip> chainTipProcessor = BehaviorProcessor.create();
043
044    /**
045     * Construct from a {@link BitcoinClient} or subclass and a polling interval
046     * @param bitcoinClient a client instance
047     * @param interval a polling interval
048     */
049    public PollingChainTipServiceImpl(BitcoinClient bitcoinClient, Observable<Long> interval) {
050        client = bitcoinClient;
051        this.interval = interval;
052        log.info("Constructing polling ChainTipService: {}, {}", client.getNetwork().id(), client.getServerURI());
053        chainTipSource = Flowable.fromPublisher(pollForDistinctChainTip());
054    }
055
056    public PollingChainTipServiceImpl(BitcoinClient bitcoinClient) {
057        this(bitcoinClient, Observable.interval(2,10, TimeUnit.SECONDS));
058    }
059
060    public synchronized void start() {
061        if (chainTipSubscription == null) {
062            chainTipSubscription = chainTipSource.subscribe(chainTipProcessor::onNext, chainTipProcessor::onError, chainTipProcessor::onComplete);
063        }
064    }
065
066    @Override
067    public ChainTipPublisher chainTipPublisher() {
068        start();
069        return ChainTipPublisher.of(chainTipProcessor);
070    }
071
072    /**
073     * Provide a polling interval
074     *
075     * @return polling interval with desired frequency for polling for new ChainTips.
076     */
077    public Observable<Long> getPollingInterval() {
078        return interval;
079    }
080
081    /**
082     * Using a polling interval provided by {@link #getPollingInterval()} provide a
083     * stream of distinct {@link ChainTip}s.
084     *
085     * @return A stream of distinct {@code ChainTip}s.
086     */
087    Publisher<ChainTip> pollForDistinctChainTip() {
088        return getPollingInterval()
089                // ERROR backpressure strategy is compatible with BehaviorProcessor since it subscribes to MAX items
090                .toFlowable(BackpressureStrategy.ERROR)
091                .doOnNext(t -> log.debug("got interval"))
092                .flatMap(t -> this.currentChainTipMaybe())
093                .doOnNext(tip -> log.debug("blockheight, blockhash = {}, {}", tip.getHeight(), tip.getHash()))
094                .distinctUntilChanged(ChainTip::getHash)
095                .doOnNext(tip -> log.info("** NEW ** blockheight, blockhash = {}, {}", tip.getHeight(), tip.getHash()));
096    }
097
098    /**
099     * Get the active chain tip if there is one (useful for polling clients)
100     *
101     * @return The active ChainTip if available (onSuccess) otherwise onComplete (if not available) or onError (if error occurred)
102     */
103    private Publisher<ChainTip> currentChainTipMaybe() {
104        return Flowable.fromPublisher(pollOnceAsPublisher(client::getChainTipsAsync, TransientErrorFilter.of(this::isTransientError, this::logError)))
105                .mapOptional(ChainTip::findActiveChainTip);
106    }
107
108    @Override
109    public void close() {
110        chainTipSubscription.dispose();
111    }
112
113    @Override
114    public CompletableFuture<List<ChainTip>> getChainTipsAsync() {
115        return client.getChainTipsAsync();
116    }
117
118    @Override
119    @Deprecated
120    public List<ChainTip> getChainTips() throws JsonRpcStatusException, IOException {
121        return client.syncGet(client.getChainTipsAsync());
122    }
123
124    private boolean isTransientError(Throwable t) {
125        return t instanceof IOError;
126    }
127    private void logError(Throwable throwable) {
128        log.error("Exception in RPCCall", throwable);
129    }
130}