001package org.consensusj.bitcoin.rx.jsonrpc;
002
003import org.bitcoinj.base.Network;
004import org.consensusj.bitcoin.json.pojo.ChainTip;
005import org.consensusj.bitcoin.jsonrpc.BitcoinClient;
006import io.reactivex.rxjava3.core.Flowable;
007import org.consensusj.bitcoin.jsonrpc.BitcoinExtendedClient;
008import org.consensusj.bitcoin.rx.ChainTipPublisher;
009import org.consensusj.bitcoin.rx.ChainTipService;
010import org.consensusj.bitcoin.rx.zeromq.RxBitcoinZmqService;
011import org.consensusj.jsonrpc.JsonRpcTransport;
012import org.consensusj.rx.jsonrpc.RxJsonRpcClient;
013import org.reactivestreams.Publisher;
014
015import javax.net.ssl.SSLContext;
016import java.net.URI;
017import java.time.Duration;
018import java.util.concurrent.CompletionStage;
019import java.util.concurrent.TimeUnit;
020import java.util.function.Supplier;
021
022/**
023 * A {@link BitcoinClient} enhanced with Reactive features. Can use either ZeroMQ or polling
024 * to implement {@link ChainTipService}.
025 * <p>
026 * TODO: answer the below questions
027 * <p>
028 * Should this class eventually implement {@link org.consensusj.bitcoin.rx.RxBlockchainService}
029 * or {@link org.consensusj.bitcoin.rx.RxBlockchainBinaryService}?
030 * <p>
031 * Should this class be renamed to {@code RxBitcoinJsonRpcClient} and the {@code RxBitcoinClient} interface be moved
032 * to {@code cj-btc-rx?}
033 */
034public class RxBitcoinClient extends BitcoinExtendedClient implements ChainTipService, RxJsonRpcClient {
035    private final boolean useZmq;
036    private /* Lazy */ ChainTipService chainTipService;
037
038    public RxBitcoinClient(Network network, URI server, String rpcuser, String rpcpassword) {
039        this(network, server, rpcuser, rpcpassword, true);
040    }
041
042    public RxBitcoinClient(Network network, URI server, String rpcuser, String rpcpassword, boolean useZmq) {
043        this(JsonRpcTransport.getDefaultSSLContext(), network, server, rpcuser, rpcpassword, useZmq);
044    }
045
046    public RxBitcoinClient(SSLContext sslContext, Network network, URI server, String rpcuser, String rpcpassword, boolean useZmq) {
047        super(sslContext, network, server, rpcuser, rpcpassword);
048        // TODO: Determine if ZMQ is available by querying the server
049        this.useZmq = useZmq;
050        // TODO: Determine whether server is up or down -- add a session re-establishment service
051    }
052
053    private void initChainTipService(Duration timeout) {
054        if (chainTipService == null) {
055            if (useZmq) {
056                log.warn("(useZmq enabled) Initiating server connection (with timeout of {} seconds)", timeout.toSeconds());
057                this.connectToServer(timeout).join();
058                chainTipService = new RxBitcoinZmqService(this);
059            } else {
060                chainTipService = new PollingChainTipServiceImpl(this);
061            }
062        }
063    }
064
065    /**
066     * Repeatedly once-per-new-block poll an async method
067     *
068     * @param supplier A supplier (should be an RPC Method) of a CompletionStage
069     * @param <RSLT> The type of the expected result
070     * @return An Observable for the expected result type, so we can expect one call to {@code onNext} per block.
071     */
072    public <RSLT> Publisher<RSLT> pollOnNewBlockAsync(Supplier<CompletionStage<RSLT>> supplier) {
073        return Flowable.fromPublisher(chainTipPublisher()).flatMap(tip -> pollOnceAsPublisher(supplier));
074    }
075
076    /**
077     * The BitcoinClient must have "connected once" before this is called. This means something else
078     * needs to have called something to do that.
079     * @return a publisher of Chain Tips
080     */
081    @Override
082    public ChainTipPublisher chainTipPublisher() {
083        initChainTipService(Duration.ofMinutes(60));
084        return chainTipService.chainTipPublisher();
085    }
086}