001/* 002 * Copyright 2014-2026 ConsensusJ Developers. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.consensusj.rx.jsonrpc; 017 018import io.reactivex.rxjava3.core.Flowable; 019import io.reactivex.rxjava3.core.Maybe; 020import io.reactivex.rxjava3.core.Single; 021import org.consensusj.jsonrpc.DefaultRpcClient; 022import org.consensusj.jsonrpc.AsyncSupport; 023import org.reactivestreams.Publisher; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026 027import java.io.IOError; 028import java.util.Optional; 029import java.util.concurrent.CompletableFuture; 030import java.util.concurrent.CompletionStage; 031import java.util.function.Function; 032import java.util.function.Supplier; 033 034/** 035 * RxJava support for calling JSON-RPC clients. Extend/implement this interface to inherit {@code default} methods 036 * such as {@link #pollOnceAsPublisher(Supplier, DefaultRpcClient.TransientErrorFilter)}. 037 */ 038public interface RxJsonRpcClient extends AsyncSupport { 039 Logger log = LoggerFactory.getLogger(RxJsonRpcClient.class); 040 041 /** 042 * Return a <i>cold</i> {@link Single} for calling a provided <b>synchronous</b> JSON-RPC method. 043 * <p> 044 * A <i>cold</i> stream does not begin processing until someone subscribes to it. 045 * @param method A {@link org.consensusj.jsonrpc.AsyncSupport.ThrowingSupplier} wrapper for a method call. 046 * @param <RSLT> The type of the expected result 047 * @return A <i>cold</i> {@link Single} for calling the method. 048 */ 049 @Deprecated 050 default <RSLT> Single<RSLT> call(AsyncSupport.ThrowingSupplier<RSLT> method) { 051 return Single.defer(() -> Single.fromCompletionStage(supplyAsync(method))); 052 } 053 054 @Deprecated 055 default <RSLT> Single<RSLT> callAsync(Supplier<CompletionStage<RSLT>> supplier) { 056 return defer(supplier); 057 } 058 059 /** 060 * Return a <i>cold</i> {@link Single} for calling a provided <b>asynchronous</b> JSON-RPC method. 061 * (Uses a supplier to make sure the async call isn't made until subscription time) 062 * <p> 063 * A <i>cold</i> stream does not begin processing until someone subscribes to it. 064 * @param supplier of completable 065 * @param <RSLT> The type of the expected result 066 * @return A <i>cold</i> {@link Single} for calling the method. 067 */ 068 @Deprecated 069 static <RSLT> Single<RSLT> defer(Supplier<CompletionStage<RSLT>> supplier) { 070 return Single.defer(() -> Single.fromCompletionStage(supplier.get())); 071 } 072 073 /** 074 * Takes a supplier of computable futures and returns a publisher that when subscribed to returns 0 or 1 items. 075 * @param supplier supplier for delaying invocation of "hot" futures so they can be "cold" publishers. 076 * @param filter Filters and logs transient errors 077 * @return A publisher of a "cold" stream of items (temporarily Flowable, but will change to Publisher, then Flow.Publisher) 078 * @param <T> result type 079 */ 080 default <T> Publisher<T> pollOnceAsPublisher(Supplier<CompletionStage<T>> supplier, DefaultRpcClient.TransientErrorFilter filter) { 081 return Flowable.defer(() -> Flowable.fromCompletionStage(supplier.get() 082 .handle(filter::handle) 083 .thenCompose(Function.identity()))) 084 .flatMapStream(Optional::stream); 085 } 086 087 // This version doesn't filter or log any exceptions 088 default <T> Publisher<T> pollOnceAsPublisher(Supplier<CompletionStage<T>> supplier) { 089 return Flowable.defer(() -> Flowable.fromCompletionStage(supplier.get())); 090 } 091 092 /** 093 * Poll a method, ignoring {@link IOError}. 094 * The returned {@link Maybe} will: 095 * <ol> 096 * <li>Emit a value if successful</li> 097 * <li>Empty Complete on IOError</li> 098 * <li>Error out if any other Exception occurs</li> 099 * </ol> 100 * 101 * @param method A supplier (should be an RPC Method) that can throw {@link Exception}. 102 * @param <RSLT> The type of the expected result 103 * @return A Maybe for the expected result type 104 * @deprecated Use {@link #pollOnceAsPublisher(Supplier, TransientErrorFilter)} (Supplier)} 105 */ 106 @Deprecated 107 default <RSLT> Maybe<RSLT> pollOnce(AsyncSupport.ThrowingSupplier<RSLT> method) { 108 return call(method) 109 .doOnSuccess(this::logSuccess) 110 .doOnError(this::logError) 111 .toMaybe() 112 .onErrorComplete(this::isTransientError); // Empty completion if IOError 113 } 114 115 /** 116 * Poll a method, ignoring {@link IOError}. 117 * The returned {@link Maybe} will: 118 * <ol> 119 * <li>Emit a value if successful</li> 120 * <li>Empty Complete on IOError</li> 121 * <li>Error out if any other Exception occurs</li> 122 * </ol> 123 * 124 * @param supplier A supplier (should call an async RPC Method and return a {@code CompletableFuture}). 125 * @param <RSLT> The type of the expected result 126 * @return A Maybe for the expected result type 127 */ 128 @Deprecated 129 default <RSLT> Maybe<RSLT> pollOnceAsync(Supplier<CompletionStage<RSLT>> supplier) { 130 return Flowable.fromPublisher(pollOnceAsPublisher(supplier, TransientErrorFilter.of(this::isTransientError, this::logError))) 131 .firstElement(); 132 } 133 134 135 /** 136 * Determine if error is transient and should be ignored. 137 * <p> 138 * TODO: Ignoring all IOError is too broad 139 * 140 * @param t Error thrown from calling an RPC method 141 * @return true if the error is transient and can be ignored 142 */ 143 private boolean isTransientError(Throwable t) { 144 return t instanceof IOError; 145 } 146 147 private <RSLT> void logSuccess(RSLT result) { 148 log.debug("RPC call returned: {}", result); 149 } 150 151 private void logError(Throwable throwable) { 152 log.error("Exception in RPCCall", throwable); 153 } 154}