001package org.consensusj.bitcoin.rx.jsonrpc; 002 003import io.reactivex.rxjava3.core.BackpressureStrategy; 004import io.reactivex.rxjava3.core.Maybe; 005import io.reactivex.rxjava3.core.Observable; 006import org.consensusj.bitcoin.json.pojo.ChainTip; 007import org.consensusj.bitcoin.jsonrpc.ChainTipClient; 008import org.consensusj.bitcoin.rx.ChainTipService; 009import org.consensusj.rx.jsonrpc.RxJsonRpcClient; 010import org.reactivestreams.Publisher; 011import org.slf4j.Logger; 012import org.slf4j.LoggerFactory; 013 014/** 015 * Interface with {@link PollingChainTipService#pollForDistinctChainTip()} method. 016 * @deprecated Use the {@link PollingChainTipServiceImpl} implementation 017 */ 018@Deprecated 019public interface PollingChainTipService extends ChainTipService, ChainTipClient, RxJsonRpcClient { 020 Logger log = LoggerFactory.getLogger(PollingChainTipService.class); 021 022 /** 023 * Implement this method to provide a polling interval 024 * 025 * @return polling interval with desired frequency for polling for new ChainTips. 026 */ 027 Observable<Long> getPollingInterval(); 028 029 /** 030 * Using a polling interval provided by {@link PollingChainTipService#getPollingInterval()} provide a 031 * stream of distinct {@link ChainTip}s. 032 * 033 * @return A stream of distinct {@code ChainTip}s. 034 */ 035 default Publisher<ChainTip> pollForDistinctChainTip() { 036 return getPollingInterval() 037 .doOnNext(t -> log.debug("got interval")) 038 .flatMapMaybe(t -> this.currentChainTipMaybe()) 039 .doOnNext(tip -> log.debug("blockheight, blockhash = {}, {}", tip.getHeight(), tip.getHash())) 040 .distinctUntilChanged(ChainTip::getHash) 041 .doOnNext(tip -> log.info("** NEW ** blockheight, blockhash = {}, {}", tip.getHeight(), tip.getHash())) 042 // ERROR backpressure strategy is compatible with BehaviorProcessor since it subscribes to MAX items 043 .toFlowable(BackpressureStrategy.ERROR); 044 } 045 046 /** 047 * Get the active chain tip if there is one (useful for polling clients) 048 * 049 * @return The active ChainTip if available (onSuccess) otherwise onComplete (if not available) or onError (if error occurred) 050 */ 051 private Maybe<ChainTip> currentChainTipMaybe() { 052 return pollOnceAsync(this::getChainTipsAsync) 053 .mapOptional(ChainTip::findActiveChainTip); 054 } 055}