001package org.consensusj.bitcoin.rx.jsonrpc; 002 003import io.reactivex.rxjava3.core.BackpressureStrategy; 004import io.reactivex.rxjava3.core.Flowable; 005import io.reactivex.rxjava3.core.Observable; 006import io.reactivex.rxjava3.disposables.Disposable; 007import io.reactivex.rxjava3.processors.BehaviorProcessor; 008import io.reactivex.rxjava3.processors.FlowableProcessor; 009import org.consensusj.bitcoin.json.pojo.ChainTip; 010import org.consensusj.bitcoin.jsonrpc.BitcoinClient; 011import org.consensusj.bitcoin.jsonrpc.ChainTipClient; 012import org.consensusj.bitcoin.rx.ChainTipPublisher; 013import org.consensusj.bitcoin.rx.ChainTipService; 014import org.consensusj.jsonrpc.JsonRpcStatusException; 015import org.consensusj.rx.jsonrpc.RxJsonRpcClient; 016import org.reactivestreams.Publisher; 017import org.slf4j.Logger; 018import org.slf4j.LoggerFactory; 019 020import java.io.Closeable; 021import java.io.IOError; 022import java.io.IOException; 023import java.util.List; 024import java.util.concurrent.CompletableFuture; 025import java.util.concurrent.Flow; 026import java.util.concurrent.TimeUnit; 027 028// TODO: Rewrite using ScheduledThreadExecutor (instead of an Observable interval) and SubmissionPublisher instead of FlowableProcessor. 029// Then we can merge it into BitcoinClient (as a default component.) 030// We may need to use atomic object (or something else?) to replace distinctUntilChanged 031/** 032 * Provides {@link ChainTipService} a using a {@link BitcoinClient} and a polling interval. 033 * This can be used as a fallback if ZeroMQ is not available. 034 */ 035public class PollingChainTipServiceImpl implements ChainTipService, ChainTipClient, RxJsonRpcClient, Closeable { 036 private static final Logger log = LoggerFactory.getLogger(PollingChainTipServiceImpl.class); 037 private final BitcoinClient client; 038 private final Observable<Long> interval; 039 // How will we properly use backpressure here? 040 private final Flowable<ChainTip> chainTipSource; 041 private Disposable chainTipSubscription; 042 private final FlowableProcessor<ChainTip> chainTipProcessor = BehaviorProcessor.create(); 043 044 /** 045 * Construct from a {@link BitcoinClient} or subclass and a polling interval 046 * @param bitcoinClient a client instance 047 * @param interval a polling interval 048 */ 049 public PollingChainTipServiceImpl(BitcoinClient bitcoinClient, Observable<Long> interval) { 050 client = bitcoinClient; 051 this.interval = interval; 052 log.info("Constructing polling ChainTipService: {}, {}", client.getNetwork().id(), client.getServerURI()); 053 chainTipSource = Flowable.fromPublisher(pollForDistinctChainTip()); 054 } 055 056 public PollingChainTipServiceImpl(BitcoinClient bitcoinClient) { 057 this(bitcoinClient, Observable.interval(2,10, TimeUnit.SECONDS)); 058 } 059 060 public synchronized void start() { 061 if (chainTipSubscription == null) { 062 chainTipSubscription = chainTipSource.subscribe(chainTipProcessor::onNext, chainTipProcessor::onError, chainTipProcessor::onComplete); 063 } 064 } 065 066 @Override 067 public ChainTipPublisher chainTipPublisher() { 068 start(); 069 return ChainTipPublisher.of(chainTipProcessor); 070 } 071 072 /** 073 * Provide a polling interval 074 * 075 * @return polling interval with desired frequency for polling for new ChainTips. 076 */ 077 public Observable<Long> getPollingInterval() { 078 return interval; 079 } 080 081 /** 082 * Using a polling interval provided by {@link #getPollingInterval()} provide a 083 * stream of distinct {@link ChainTip}s. 084 * 085 * @return A stream of distinct {@code ChainTip}s. 086 */ 087 Publisher<ChainTip> pollForDistinctChainTip() { 088 return getPollingInterval() 089 // ERROR backpressure strategy is compatible with BehaviorProcessor since it subscribes to MAX items 090 .toFlowable(BackpressureStrategy.ERROR) 091 .doOnNext(t -> log.debug("got interval")) 092 .flatMap(t -> this.currentChainTipMaybe()) 093 .doOnNext(tip -> log.debug("blockheight, blockhash = {}, {}", tip.getHeight(), tip.getHash())) 094 .distinctUntilChanged(ChainTip::getHash) 095 .doOnNext(tip -> log.info("** NEW ** blockheight, blockhash = {}, {}", tip.getHeight(), tip.getHash())); 096 } 097 098 /** 099 * Get the active chain tip if there is one (useful for polling clients) 100 * 101 * @return The active ChainTip if available (onSuccess) otherwise onComplete (if not available) or onError (if error occurred) 102 */ 103 private Publisher<ChainTip> currentChainTipMaybe() { 104 return Flowable.fromPublisher(pollOnceAsPublisher(client::getChainTipsAsync, TransientErrorFilter.of(this::isTransientError, this::logError))) 105 .mapOptional(ChainTip::findActiveChainTip); 106 } 107 108 @Override 109 public void close() { 110 chainTipSubscription.dispose(); 111 } 112 113 @Override 114 public CompletableFuture<List<ChainTip>> getChainTipsAsync() { 115 return client.getChainTipsAsync(); 116 } 117 118 @Override 119 @Deprecated 120 public List<ChainTip> getChainTips() throws JsonRpcStatusException, IOException { 121 return client.syncGet(client.getChainTipsAsync()); 122 } 123 124 private boolean isTransientError(Throwable t) { 125 return t instanceof IOError; 126 } 127 private void logError(Throwable throwable) { 128 log.error("Exception in RPCCall", throwable); 129 } 130}