001package org.consensusj.rx.jsonrpc; 002 003import io.reactivex.rxjava3.core.Flowable; 004import io.reactivex.rxjava3.core.Maybe; 005import io.reactivex.rxjava3.core.Single; 006import org.consensusj.jsonrpc.DefaultRpcClient; 007import org.consensusj.jsonrpc.AsyncSupport; 008import org.reactivestreams.Publisher; 009import org.slf4j.Logger; 010import org.slf4j.LoggerFactory; 011 012import java.io.IOError; 013import java.util.Optional; 014import java.util.concurrent.CompletableFuture; 015import java.util.concurrent.CompletionStage; 016import java.util.function.Function; 017import java.util.function.Supplier; 018 019/** 020 * RxJava support for calling JSON-RPC clients. Extend/implement this interface to inherit {@code default} methods 021 * such as {@link #pollOnceAsPublisher(Supplier, DefaultRpcClient.TransientErrorFilter)}. 022 */ 023public interface RxJsonRpcClient extends AsyncSupport { 024 Logger log = LoggerFactory.getLogger(RxJsonRpcClient.class); 025 026 /** 027 * Return a <i>cold</i> {@link Single} for calling a provided <b>synchronous</b> JSON-RPC method. 028 * <p> 029 * A <i>cold</i> stream does not begin processing until someone subscribes to it. 030 * @param method A {@link org.consensusj.jsonrpc.AsyncSupport.ThrowingSupplier} wrapper for a method call. 031 * @param <RSLT> The type of the expected result 032 * @return A <i>cold</i> {@link Single} for calling the method. 033 */ 034 @Deprecated 035 default <RSLT> Single<RSLT> call(AsyncSupport.ThrowingSupplier<RSLT> method) { 036 return Single.defer(() -> Single.fromCompletionStage(supplyAsync(method))); 037 } 038 039 @Deprecated 040 default <RSLT> Single<RSLT> callAsync(Supplier<CompletionStage<RSLT>> supplier) { 041 return defer(supplier); 042 } 043 044 /** 045 * Return a <i>cold</i> {@link Single} for calling a provided <b>asynchronous</b> JSON-RPC method. 046 * (Uses a supplier to make sure the async call isn't made until subscription time) 047 * <p> 048 * A <i>cold</i> stream does not begin processing until someone subscribes to it. 049 * @param supplier of completable 050 * @param <RSLT> The type of the expected result 051 * @return A <i>cold</i> {@link Single} for calling the method. 052 */ 053 @Deprecated 054 static <RSLT> Single<RSLT> defer(Supplier<CompletionStage<RSLT>> supplier) { 055 return Single.defer(() -> Single.fromCompletionStage(supplier.get())); 056 } 057 058 /** 059 * Takes a supplier of computable futures and returns a publisher that when subscribed to returns 0 or 1 items. 060 * @param supplier supplier for delaying invocation of "hot" futures so they can be "cold" publishers. 061 * @param filter Filters and logs transient errors 062 * @return A publisher of a "cold" stream of items (temporarily Flowable, but will change to Publisher, then Flow.Publisher) 063 * @param <T> result type 064 */ 065 default <T> Publisher<T> pollOnceAsPublisher(Supplier<CompletionStage<T>> supplier, DefaultRpcClient.TransientErrorFilter filter) { 066 return Flowable.defer(() -> Flowable.fromCompletionStage(supplier.get() 067 .handle(filter::handle) 068 .thenCompose(Function.identity()))) 069 .flatMapStream(Optional::stream); 070 } 071 072 // This version doesn't filter or log any exceptions 073 default <T> Publisher<T> pollOnceAsPublisher(Supplier<CompletionStage<T>> supplier) { 074 return Flowable.defer(() -> Flowable.fromCompletionStage(supplier.get())); 075 } 076 077 /** 078 * Poll a method, ignoring {@link IOError}. 079 * The returned {@link Maybe} will: 080 * <ol> 081 * <li>Emit a value if successful</li> 082 * <li>Empty Complete on IOError</li> 083 * <li>Error out if any other Exception occurs</li> 084 * </ol> 085 * 086 * @param method A supplier (should be an RPC Method) that can throw {@link Exception}. 087 * @param <RSLT> The type of the expected result 088 * @return A Maybe for the expected result type 089 * @deprecated Use {@link #pollOnceAsPublisher(Supplier, TransientErrorFilter)} (Supplier)} 090 */ 091 @Deprecated 092 default <RSLT> Maybe<RSLT> pollOnce(AsyncSupport.ThrowingSupplier<RSLT> method) { 093 return call(method) 094 .doOnSuccess(this::logSuccess) 095 .doOnError(this::logError) 096 .toMaybe() 097 .onErrorComplete(this::isTransientError); // Empty completion if IOError 098 } 099 100 /** 101 * Poll a method, ignoring {@link IOError}. 102 * The returned {@link Maybe} will: 103 * <ol> 104 * <li>Emit a value if successful</li> 105 * <li>Empty Complete on IOError</li> 106 * <li>Error out if any other Exception occurs</li> 107 * </ol> 108 * 109 * @param supplier A supplier (should call an async RPC Method and return a {@code CompletableFuture}). 110 * @param <RSLT> The type of the expected result 111 * @return A Maybe for the expected result type 112 */ 113 @Deprecated 114 default <RSLT> Maybe<RSLT> pollOnceAsync(Supplier<CompletionStage<RSLT>> supplier) { 115 return Flowable.fromPublisher(pollOnceAsPublisher(supplier, TransientErrorFilter.of(this::isTransientError, this::logError))) 116 .firstElement(); 117 } 118 119 120 /** 121 * Determine if error is transient and should be ignored. 122 * <p> 123 * TODO: Ignoring all IOError is too broad 124 * 125 * @param t Error thrown from calling an RPC method 126 * @return true if the error is transient and can be ignored 127 */ 128 private boolean isTransientError(Throwable t) { 129 return t instanceof IOError; 130 } 131 132 private <RSLT> void logSuccess(RSLT result) { 133 log.debug("RPC call returned: {}", result); 134 } 135 136 private void logError(Throwable throwable) { 137 log.error("Exception in RPCCall", throwable); 138 } 139}