001package org.consensusj.jsonrpc;
002
003import java.io.IOException;
004import java.util.Optional;
005import java.util.concurrent.CompletableFuture;
006import java.util.concurrent.Executor;
007import java.util.function.BiFunction;
008import java.util.function.Consumer;
009import java.util.function.Function;
010import java.util.function.Predicate;
011import java.util.function.Supplier;
012
013/**
014 * Helper methods for creating asynchronous calls using {@link CompletableFuture}. Since the
015 * synchronous methods in {@link DefaultRpcClient} throw checked exceptions this interface
016 * provides wrapper support to make it easier to convert them to async calls.
017 */
018public interface AsyncSupport {
019    /**
020     * Supply async for a ThrowingSupplier by catching exceptions and completing exceptionally.
021     *
022     * @param throwingSupplier Supplier of T that can throw an {@code IOException}
023     * @param <T> return type
024     * @return A completable future, returning T
025     */
026    default <T> CompletableFuture<T> supplyAsync(ThrowingSupplier<T> throwingSupplier) {
027        return supplyAsync(throwingSupplier, getDefaultAsyncExecutor());
028    }
029
030    /**
031     * Supply async for a ThrowingSupplier by catching exceptions and completing exceptionally.
032     *
033     * @param throwingSupplier Supplier of T that can throw an {@code IOException}
034     * @param executor Executor to run the Supplier
035     * @param <T> return type
036     * @return A completable future, returning T
037     */
038    default <T> CompletableFuture<T> supplyAsync(ThrowingSupplier<T> throwingSupplier, Executor executor) {
039        return AsyncSupport.supplyCatchingAsync(throwingSupplier, executor);
040    }
041
042    /**
043     * Return the default executor for supplying asynchronicity.
044     *
045     * @return An executor.
046     */
047    default Executor getDefaultAsyncExecutor() {
048        return (Runnable r) -> new Thread(r).start();
049    }
050
051    private static <T> CompletableFuture<T> supplyCatchingAsync(ThrowingSupplier<T> throwingSupplier, Executor executor) {
052        CompletableFuture<T> future = new CompletableFuture<>();
053        executor.execute(() -> {
054            try {
055                T result = throwingSupplier.getThrows();
056                future.complete(result);
057            } catch (IOException e) {
058                future.completeExceptionally(e);
059            }
060        });
061        return future;
062    }
063
064    /**
065     * Subinterface of {@link Supplier} for Lambdas which throw {@link IOException}.
066     * Can be used for two purposes:
067     * <ol>
068     *     <li>To cast a lambda that throws an {@code IOException} to a {@link Supplier} while
069     *      automatically wrapping any exception thrown by the lambda with {@link RuntimeException}.</li>
070     *     <li>As a {@code FunctionalInterface} where a lambda that throws exceptions is
071     *      expected or allowed.</li>
072     * </ol>
073     * This is intended to be used to wrap JSON-RPC I/O methods of {@link JsonRpcClient} that throw {@link IOException} and
074     * subclasses such as {@link JsonRpcException}, so we have narrowed the allowed exceptions in {@link #getThrows()} to
075     * {@link IOException}.
076     *
077     * @param <T>
078     */
079    @FunctionalInterface
080    interface ThrowingSupplier<T> extends Supplier<T> {
081
082        /**
083         * Gets a result wrapping checked Exceptions with {@link RuntimeException}
084         * @return a result
085         */
086        @Override
087        default T get() {
088            try {
089                return getThrows();
090            } catch (final Exception e) {
091                throw new RuntimeException(e);
092            }
093        }
094
095        /**
096         * Gets a result.
097         *
098         * @return a result
099         * @throws IOException A (checked) exception
100         */
101        T getThrows() throws IOException;
102    }
103
104    /**
105     * Error filter for resilient polling. Uses a Predicate to specify what to ignore and a Consumer to log
106     * what is ignored.
107     */
108    interface TransientErrorFilter {
109        static TransientErrorFilter of(Predicate<Throwable> filter, Consumer<Throwable> logger) {
110            return new TransientErrorFilter() {
111                @Override
112                public boolean isTransient(Throwable t) {
113                    return filter.test(t);
114                }
115
116                @Override
117                public void log(Throwable t) {
118                    logger.accept(t);
119                }
120            };
121        }
122
123        static TransientErrorFilter none() {
124            return of(
125                    (t) -> false,   // No errors are consider transient
126                    (t) -> {}       // Nothing to log because we're not swallowing anything.
127            );
128        }
129
130        /**
131         * Handler to transpose to a "future maybe". Use with {@link CompletableFuture#handle(BiFunction)}
132         * followed by {@code .thenCompose(Function.identity())} (or if JDK 12+ {@code CompletableFuture#exceptionallyCompose(Function)})
133         * to swallow transient errors.
134         * @param result T
135         * @param t An error, possibly transient
136         * @return A completable future of future maybe
137         * @param <T> The desired return type
138         */
139        default <T> CompletableFuture<Optional<T>> handle(T result, Throwable t) {
140            if (result != null) {
141                return CompletableFuture.completedFuture(Optional.of(result));
142            } else if (isTransient(t)) {
143                log(t);
144                return CompletableFuture.completedFuture(Optional.empty());
145            } else {
146                return CompletableFuture.failedFuture(t);
147            }
148        }
149
150        // TODO: Should this be flipped to isFatal()?
151        boolean isTransient(Throwable t);
152        void log(Throwable t);
153
154    }
155}