001package org.consensusj.bitcoin.rx.jsonrpc; 002 003import org.bitcoinj.base.Network; 004import org.consensusj.bitcoin.json.pojo.ChainTip; 005import org.consensusj.bitcoin.jsonrpc.BitcoinClient; 006import io.reactivex.rxjava3.core.Flowable; 007import org.consensusj.bitcoin.jsonrpc.BitcoinExtendedClient; 008import org.consensusj.bitcoin.rx.ChainTipPublisher; 009import org.consensusj.bitcoin.rx.ChainTipService; 010import org.consensusj.bitcoin.rx.zeromq.RxBitcoinZmqService; 011import org.consensusj.jsonrpc.JsonRpcTransport; 012import org.consensusj.rx.jsonrpc.RxJsonRpcClient; 013import org.reactivestreams.Publisher; 014 015import javax.net.ssl.SSLContext; 016import java.net.URI; 017import java.time.Duration; 018import java.util.concurrent.CompletionStage; 019import java.util.concurrent.TimeUnit; 020import java.util.function.Supplier; 021 022/** 023 * A {@link BitcoinClient} enhanced with Reactive features. Can use either ZeroMQ or polling 024 * to implement {@link ChainTipService}. 025 * <p> 026 * TODO: answer the below questions 027 * <p> 028 * Should this class eventually implement {@link org.consensusj.bitcoin.rx.RxBlockchainService} 029 * or {@link org.consensusj.bitcoin.rx.RxBlockchainBinaryService}? 030 * <p> 031 * Should this class be renamed to {@code RxBitcoinJsonRpcClient} and the {@code RxBitcoinClient} interface be moved 032 * to {@code cj-btc-rx?} 033 */ 034public class RxBitcoinClient extends BitcoinExtendedClient implements ChainTipService, RxJsonRpcClient { 035 private final boolean useZmq; 036 private /* Lazy */ ChainTipService chainTipService; 037 038 public RxBitcoinClient(Network network, URI server, String rpcuser, String rpcpassword) { 039 this(network, server, rpcuser, rpcpassword, true); 040 } 041 042 public RxBitcoinClient(Network network, URI server, String rpcuser, String rpcpassword, boolean useZmq) { 043 this(JsonRpcTransport.getDefaultSSLContext(), network, server, rpcuser, rpcpassword, useZmq); 044 } 045 046 public RxBitcoinClient(SSLContext sslContext, Network network, URI server, String rpcuser, String rpcpassword, boolean useZmq) { 047 super(sslContext, network, server, rpcuser, rpcpassword); 048 // TODO: Determine if ZMQ is available by querying the server 049 this.useZmq = useZmq; 050 // TODO: Determine whether server is up or down -- add a session re-establishment service 051 } 052 053 private void initChainTipService(Duration timeout) { 054 if (chainTipService == null) { 055 if (useZmq) { 056 log.warn("(useZmq enabled) Initiating server connection (with timeout of {} seconds)", timeout.toSeconds()); 057 this.connectToServer(timeout).join(); 058 chainTipService = new RxBitcoinZmqService(this); 059 } else { 060 chainTipService = new PollingChainTipServiceImpl(this); 061 } 062 } 063 } 064 065 /** 066 * Repeatedly once-per-new-block poll an async method 067 * 068 * @param supplier A supplier (should be an RPC Method) of a CompletionStage 069 * @param <RSLT> The type of the expected result 070 * @return An Observable for the expected result type, so we can expect one call to {@code onNext} per block. 071 */ 072 public <RSLT> Publisher<RSLT> pollOnNewBlockAsync(Supplier<CompletionStage<RSLT>> supplier) { 073 return Flowable.fromPublisher(chainTipPublisher()).flatMap(tip -> pollOnceAsPublisher(supplier)); 074 } 075 076 /** 077 * The BitcoinClient must have "connected once" before this is called. This means something else 078 * needs to have called something to do that. 079 * @return a publisher of Chain Tips 080 */ 081 @Override 082 public ChainTipPublisher chainTipPublisher() { 083 initChainTipService(Duration.ofMinutes(60)); 084 return chainTipService.chainTipPublisher(); 085 } 086}