001/*
002 * Copyright 2014-2026 ConsensusJ Developers.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.consensusj.rx.jsonrpc;
017
018import io.reactivex.rxjava3.core.Flowable;
019import io.reactivex.rxjava3.core.Maybe;
020import io.reactivex.rxjava3.core.Single;
021import org.consensusj.jsonrpc.DefaultRpcClient;
022import org.consensusj.jsonrpc.AsyncSupport;
023import org.reactivestreams.Publisher;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027import java.io.IOError;
028import java.util.Optional;
029import java.util.concurrent.CompletableFuture;
030import java.util.concurrent.CompletionStage;
031import java.util.function.Function;
032import java.util.function.Supplier;
033
034/**
035 * RxJava support for calling JSON-RPC clients. Extend/implement this interface to inherit {@code default} methods
036 * such as {@link #pollOnceAsPublisher(Supplier, DefaultRpcClient.TransientErrorFilter)}.
037 */
038public interface RxJsonRpcClient extends AsyncSupport {
039    Logger log = LoggerFactory.getLogger(RxJsonRpcClient.class);
040
041    /**
042     * Return a <i>cold</i> {@link Single} for calling a provided <b>synchronous</b> JSON-RPC method.
043     * <p>
044     *  A  <i>cold</i> stream does not begin processing until someone subscribes to it.
045     * @param method A {@link org.consensusj.jsonrpc.AsyncSupport.ThrowingSupplier} wrapper for a method call.
046     * @param <RSLT> The type of the expected result
047     * @return A <i>cold</i> {@link Single} for calling the method.
048     */
049    @Deprecated
050    default <RSLT> Single<RSLT> call(AsyncSupport.ThrowingSupplier<RSLT> method) {
051        return Single.defer(() -> Single.fromCompletionStage(supplyAsync(method)));
052    }
053
054    @Deprecated
055    default <RSLT> Single<RSLT> callAsync(Supplier<CompletionStage<RSLT>> supplier) {
056        return defer(supplier);
057    }
058
059    /**
060     * Return a <i>cold</i> {@link Single} for calling a provided <b>asynchronous</b> JSON-RPC method.
061     * (Uses a supplier to make sure the async call isn't made until subscription time)
062     * <p>
063     *  A  <i>cold</i> stream does not begin processing until someone subscribes to it.
064     * @param supplier of completable
065     * @param <RSLT> The type of the expected result
066     * @return A <i>cold</i> {@link Single} for calling the method.
067     */
068    @Deprecated
069    static <RSLT> Single<RSLT> defer(Supplier<CompletionStage<RSLT>> supplier) {
070        return Single.defer(() -> Single.fromCompletionStage(supplier.get()));
071    }
072
073    /**
074     * Takes a supplier of computable futures and returns a publisher that when subscribed to returns 0 or 1 items.
075     * @param supplier supplier for delaying invocation of "hot" futures so they can be "cold" publishers.
076     * @param filter Filters and logs transient errors
077     * @return A publisher of a "cold" stream of items (temporarily Flowable, but will change to Publisher, then Flow.Publisher)
078     * @param <T> result type
079     */
080    default <T> Publisher<T> pollOnceAsPublisher(Supplier<CompletionStage<T>> supplier, DefaultRpcClient.TransientErrorFilter filter) {
081        return Flowable.defer(() -> Flowable.fromCompletionStage(supplier.get()
082                        .handle(filter::handle)
083                        .thenCompose(Function.identity())))
084                .flatMapStream(Optional::stream);
085    }
086
087    // This version doesn't filter or log any exceptions
088    default <T> Publisher<T> pollOnceAsPublisher(Supplier<CompletionStage<T>> supplier) {
089        return Flowable.defer(() -> Flowable.fromCompletionStage(supplier.get()));
090    }
091
092    /**
093     * Poll a method, ignoring {@link IOError}.
094     * The returned {@link Maybe} will:
095     * <ol>
096     *     <li>Emit a value if successful</li>
097     *     <li>Empty Complete on IOError</li>
098     *     <li>Error out if any other Exception occurs</li>
099     * </ol>
100     *
101     * @param method A supplier (should be an RPC Method) that can throw {@link Exception}.
102     * @param <RSLT> The type of the expected result
103     * @return A Maybe for the expected result type
104     * @deprecated Use {@link #pollOnceAsPublisher(Supplier, TransientErrorFilter)} (Supplier)}
105     */
106    @Deprecated
107    default <RSLT> Maybe<RSLT> pollOnce(AsyncSupport.ThrowingSupplier<RSLT> method) {
108        return call(method)
109                .doOnSuccess(this::logSuccess)
110                .doOnError(this::logError)
111                .toMaybe()
112                .onErrorComplete(this::isTransientError);    // Empty completion if IOError
113    }
114
115    /**
116     * Poll a method, ignoring {@link IOError}.
117     * The returned {@link Maybe} will:
118     * <ol>
119     *     <li>Emit a value if successful</li>
120     *     <li>Empty Complete on IOError</li>
121     *     <li>Error out if any other Exception occurs</li>
122     * </ol>
123     *
124     * @param supplier A supplier (should call an async RPC Method and return a {@code CompletableFuture}).
125     * @param <RSLT> The type of the expected result
126     * @return A Maybe for the expected result type
127     */
128    @Deprecated
129    default <RSLT> Maybe<RSLT> pollOnceAsync(Supplier<CompletionStage<RSLT>> supplier) {
130        return Flowable.fromPublisher(pollOnceAsPublisher(supplier, TransientErrorFilter.of(this::isTransientError, this::logError)))
131                .firstElement();
132    }
133
134
135    /**
136     * Determine if error is transient and should be ignored.
137     * <p>
138     * TODO: Ignoring all IOError is too broad
139     * 
140     * @param t Error thrown from calling an RPC method
141     * @return true if the error is transient and can be ignored
142     */
143    private boolean isTransientError(Throwable t) {
144        return t instanceof IOError;
145    }
146    
147    private <RSLT> void logSuccess(RSLT result) {
148        log.debug("RPC call returned: {}", result);
149    }
150
151    private void logError(Throwable throwable) {
152        log.error("Exception in RPCCall", throwable);
153    }
154}