001package org.consensusj.rx.jsonrpc;
002
003import io.reactivex.rxjava3.core.Flowable;
004import io.reactivex.rxjava3.core.Maybe;
005import io.reactivex.rxjava3.core.Single;
006import org.consensusj.jsonrpc.DefaultRpcClient;
007import org.consensusj.jsonrpc.AsyncSupport;
008import org.reactivestreams.Publisher;
009import org.slf4j.Logger;
010import org.slf4j.LoggerFactory;
011
012import java.io.IOError;
013import java.util.Optional;
014import java.util.concurrent.CompletableFuture;
015import java.util.concurrent.CompletionStage;
016import java.util.function.Function;
017import java.util.function.Supplier;
018
019/**
020 * RxJava support for calling JSON-RPC clients. Extend/implement this interface to inherit {@code default} methods
021 * such as {@link #pollOnceAsPublisher(Supplier, DefaultRpcClient.TransientErrorFilter)}.
022 */
023public interface RxJsonRpcClient extends AsyncSupport {
024    Logger log = LoggerFactory.getLogger(RxJsonRpcClient.class);
025
026    /**
027     * Return a <i>cold</i> {@link Single} for calling a provided <b>synchronous</b> JSON-RPC method.
028     * <p>
029     *  A  <i>cold</i> stream does not begin processing until someone subscribes to it.
030     * @param method A {@link org.consensusj.jsonrpc.AsyncSupport.ThrowingSupplier} wrapper for a method call.
031     * @param <RSLT> The type of the expected result
032     * @return A <i>cold</i> {@link Single} for calling the method.
033     */
034    @Deprecated
035    default <RSLT> Single<RSLT> call(AsyncSupport.ThrowingSupplier<RSLT> method) {
036        return Single.defer(() -> Single.fromCompletionStage(supplyAsync(method)));
037    }
038
039    @Deprecated
040    default <RSLT> Single<RSLT> callAsync(Supplier<CompletionStage<RSLT>> supplier) {
041        return defer(supplier);
042    }
043
044    /**
045     * Return a <i>cold</i> {@link Single} for calling a provided <b>asynchronous</b> JSON-RPC method.
046     * (Uses a supplier to make sure the async call isn't made until subscription time)
047     * <p>
048     *  A  <i>cold</i> stream does not begin processing until someone subscribes to it.
049     * @param supplier of completable
050     * @param <RSLT> The type of the expected result
051     * @return A <i>cold</i> {@link Single} for calling the method.
052     */
053    @Deprecated
054    static <RSLT> Single<RSLT> defer(Supplier<CompletionStage<RSLT>> supplier) {
055        return Single.defer(() -> Single.fromCompletionStage(supplier.get()));
056    }
057
058    /**
059     * Takes a supplier of computable futures and returns a publisher that when subscribed to returns 0 or 1 items.
060     * @param supplier supplier for delaying invocation of "hot" futures so they can be "cold" publishers.
061     * @param filter Filters and logs transient errors
062     * @return A publisher of a "cold" stream of items (temporarily Flowable, but will change to Publisher, then Flow.Publisher)
063     * @param <T> result type
064     */
065    default <T> Publisher<T> pollOnceAsPublisher(Supplier<CompletionStage<T>> supplier, DefaultRpcClient.TransientErrorFilter filter) {
066        return Flowable.defer(() -> Flowable.fromCompletionStage(supplier.get()
067                        .handle(filter::handle)
068                        .thenCompose(Function.identity())))
069                .flatMapStream(Optional::stream);
070    }
071
072    // This version doesn't filter or log any exceptions
073    default <T> Publisher<T> pollOnceAsPublisher(Supplier<CompletionStage<T>> supplier) {
074        return Flowable.defer(() -> Flowable.fromCompletionStage(supplier.get()));
075    }
076
077    /**
078     * Poll a method, ignoring {@link IOError}.
079     * The returned {@link Maybe} will:
080     * <ol>
081     *     <li>Emit a value if successful</li>
082     *     <li>Empty Complete on IOError</li>
083     *     <li>Error out if any other Exception occurs</li>
084     * </ol>
085     *
086     * @param method A supplier (should be an RPC Method) that can throw {@link Exception}.
087     * @param <RSLT> The type of the expected result
088     * @return A Maybe for the expected result type
089     * @deprecated Use {@link #pollOnceAsPublisher(Supplier, TransientErrorFilter)} (Supplier)}
090     */
091    @Deprecated
092    default <RSLT> Maybe<RSLT> pollOnce(AsyncSupport.ThrowingSupplier<RSLT> method) {
093        return call(method)
094                .doOnSuccess(this::logSuccess)
095                .doOnError(this::logError)
096                .toMaybe()
097                .onErrorComplete(this::isTransientError);    // Empty completion if IOError
098    }
099
100    /**
101     * Poll a method, ignoring {@link IOError}.
102     * The returned {@link Maybe} will:
103     * <ol>
104     *     <li>Emit a value if successful</li>
105     *     <li>Empty Complete on IOError</li>
106     *     <li>Error out if any other Exception occurs</li>
107     * </ol>
108     *
109     * @param supplier A supplier (should call an async RPC Method and return a {@code CompletableFuture}).
110     * @param <RSLT> The type of the expected result
111     * @return A Maybe for the expected result type
112     */
113    @Deprecated
114    default <RSLT> Maybe<RSLT> pollOnceAsync(Supplier<CompletionStage<RSLT>> supplier) {
115        return Flowable.fromPublisher(pollOnceAsPublisher(supplier, TransientErrorFilter.of(this::isTransientError, this::logError)))
116                .firstElement();
117    }
118
119
120    /**
121     * Determine if error is transient and should be ignored.
122     * <p>
123     * TODO: Ignoring all IOError is too broad
124     * 
125     * @param t Error thrown from calling an RPC method
126     * @return true if the error is transient and can be ignored
127     */
128    private boolean isTransientError(Throwable t) {
129        return t instanceof IOError;
130    }
131    
132    private <RSLT> void logSuccess(RSLT result) {
133        log.debug("RPC call returned: {}", result);
134    }
135
136    private void logError(Throwable throwable) {
137        log.error("Exception in RPCCall", throwable);
138    }
139}