001package org.consensusj.jsonrpc; 002 003import java.io.IOException; 004import java.util.Optional; 005import java.util.concurrent.CompletableFuture; 006import java.util.concurrent.Executor; 007import java.util.function.BiFunction; 008import java.util.function.Consumer; 009import java.util.function.Function; 010import java.util.function.Predicate; 011import java.util.function.Supplier; 012 013/** 014 * Helper methods for creating asynchronous calls using {@link CompletableFuture}. Since the 015 * synchronous methods in {@link DefaultRpcClient} throw checked exceptions this interface 016 * provides wrapper support to make it easier to convert them to async calls. 017 */ 018public interface AsyncSupport { 019 /** 020 * Supply async for a ThrowingSupplier by catching exceptions and completing exceptionally. 021 * 022 * @param throwingSupplier Supplier of T that can throw an {@code IOException} 023 * @param <T> return type 024 * @return A completable future, returning T 025 */ 026 default <T> CompletableFuture<T> supplyAsync(ThrowingSupplier<T> throwingSupplier) { 027 return supplyAsync(throwingSupplier, getDefaultAsyncExecutor()); 028 } 029 030 /** 031 * Supply async for a ThrowingSupplier by catching exceptions and completing exceptionally. 032 * 033 * @param throwingSupplier Supplier of T that can throw an {@code IOException} 034 * @param executor Executor to run the Supplier 035 * @param <T> return type 036 * @return A completable future, returning T 037 */ 038 default <T> CompletableFuture<T> supplyAsync(ThrowingSupplier<T> throwingSupplier, Executor executor) { 039 return AsyncSupport.supplyCatchingAsync(throwingSupplier, executor); 040 } 041 042 /** 043 * Return the default executor for supplying asynchronicity. 044 * 045 * @return An executor. 046 */ 047 default Executor getDefaultAsyncExecutor() { 048 return (Runnable r) -> new Thread(r).start(); 049 } 050 051 private static <T> CompletableFuture<T> supplyCatchingAsync(ThrowingSupplier<T> throwingSupplier, Executor executor) { 052 CompletableFuture<T> future = new CompletableFuture<>(); 053 executor.execute(() -> { 054 try { 055 T result = throwingSupplier.getThrows(); 056 future.complete(result); 057 } catch (IOException e) { 058 future.completeExceptionally(e); 059 } 060 }); 061 return future; 062 } 063 064 /** 065 * Subinterface of {@link Supplier} for Lambdas which throw {@link IOException}. 066 * Can be used for two purposes: 067 * <ol> 068 * <li>To cast a lambda that throws an {@code IOException} to a {@link Supplier} while 069 * automatically wrapping any exception thrown by the lambda with {@link RuntimeException}.</li> 070 * <li>As a {@code FunctionalInterface} where a lambda that throws exceptions is 071 * expected or allowed.</li> 072 * </ol> 073 * This is intended to be used to wrap JSON-RPC I/O methods of {@link JsonRpcClient} that throw {@link IOException} and 074 * subclasses such as {@link JsonRpcException}, so we have narrowed the allowed exceptions in {@link #getThrows()} to 075 * {@link IOException}. 076 * 077 * @param <T> 078 */ 079 @FunctionalInterface 080 interface ThrowingSupplier<T> extends Supplier<T> { 081 082 /** 083 * Gets a result wrapping checked Exceptions with {@link RuntimeException} 084 * @return a result 085 */ 086 @Override 087 default T get() { 088 try { 089 return getThrows(); 090 } catch (final Exception e) { 091 throw new RuntimeException(e); 092 } 093 } 094 095 /** 096 * Gets a result. 097 * 098 * @return a result 099 * @throws IOException A (checked) exception 100 */ 101 T getThrows() throws IOException; 102 } 103 104 /** 105 * Error filter for resilient polling. Uses a Predicate to specify what to ignore and a Consumer to log 106 * what is ignored. 107 */ 108 interface TransientErrorFilter { 109 static TransientErrorFilter of(Predicate<Throwable> filter, Consumer<Throwable> logger) { 110 return new TransientErrorFilter() { 111 @Override 112 public boolean isTransient(Throwable t) { 113 return filter.test(t); 114 } 115 116 @Override 117 public void log(Throwable t) { 118 logger.accept(t); 119 } 120 }; 121 } 122 123 static TransientErrorFilter none() { 124 return of( 125 (t) -> false, // No errors are consider transient 126 (t) -> {} // Nothing to log because we're not swallowing anything. 127 ); 128 } 129 130 /** 131 * Handler to transpose to a "future maybe". Use with {@link CompletableFuture#handle(BiFunction)} 132 * followed by {@code .thenCompose(Function.identity())} (or if JDK 12+ {@code CompletableFuture#exceptionallyCompose(Function)}) 133 * to swallow transient errors. 134 * @param result T 135 * @param t An error, possibly transient 136 * @return A completable future of future maybe 137 * @param <T> The desired return type 138 */ 139 default <T> CompletableFuture<Optional<T>> handle(T result, Throwable t) { 140 if (result != null) { 141 return CompletableFuture.completedFuture(Optional.of(result)); 142 } else if (isTransient(t)) { 143 log(t); 144 return CompletableFuture.completedFuture(Optional.empty()); 145 } else { 146 return CompletableFuture.failedFuture(t); 147 } 148 } 149 150 // TODO: Should this be flipped to isFatal()? 151 boolean isTransient(Throwable t); 152 void log(Throwable t); 153 154 } 155}