001package org.consensusj.jsonrpc;
002
003import com.fasterxml.jackson.databind.DeserializationFeature;
004import com.fasterxml.jackson.databind.JavaType;
005import com.fasterxml.jackson.databind.JsonNode;
006import com.fasterxml.jackson.databind.ObjectMapper;
007import org.slf4j.Logger;
008import org.slf4j.LoggerFactory;
009
010import javax.net.ssl.SSLContext;
011import java.lang.reflect.Type;
012import java.time.Duration;
013import java.net.URI;
014import java.util.Collection;
015import java.util.List;
016import java.util.Objects;
017import java.util.concurrent.CompletableFuture;
018import java.util.concurrent.ExecutionException;
019import java.util.concurrent.TimeoutException;
020import java.util.function.Function;
021import java.util.function.Supplier;
022
023// We're overusing inheritance in this hierarchy. We are breaking Effective Java, Item 18: Favor composition over inheritance.
024// We're using inheritance to configure:
025// A. The set of JSON-RPC methods that are supported, e.g. `getblockcount()` (which creates potential conflicts with `send()` etc)
026// B. The JSON mapping implementation. Which for now and the foreseeable future is Jackson only.
027//
028// TODO: Maybe look at extracting a class with some common "mapper" functions between the two transport implementations
029//
030// The proper separation for (A) is probably a complete separation. There should be no required inheritance to implement
031// a client with a set of methods. Internally the client would have a transport and a mapper and those could optionally be made available
032// via some accessor methods if the client application deems necessary.
033// At the heart of (B) (at least as currently implemented) is the mapping from a Java (or Groovy) method name to (i) a JSON-RPC method
034// name, (ii) optional parameter type-conversion for JSON serialization in the request, and (iii) type mapping for the deserialization
035// of the `result` field in the JsonRpcResponse<RSLT>. It might be helpful to think of this as two functional mappings:
036// (1) Map Java method name and parameters to a JSON-RPC request (either map to set of Java Objects _or_ all the way to JSON)
037// (2) Map from received JSON-RPC response to JsonRpcResponse<RSLT> -- this response mapper function is configured as part of making the request.
038//
039// To abstract the specifics of Jackson from the (two) transport implementations. Basically methods/functions to
040// map from request to string/stream and to map from string/stream to response.  The java.net.http implementation has already defined
041// some functional interfaces for this, so coming up with an interface that both the java.net.http implementation and the HttpUrlConnection
042// implementation can use will lead to this "SECOND STEP"
043//
044// Now that JsonRpcClient is a generic with <T extends Type>, we have loosened the Jackson coupling somewhat.
045//
046/**
047 * A strongly-typed, Jackson-based JSON-RPC client. {@link JsonRpcClient} provides many convenience send `default` methods in a
048 * JSON-library-independent way. DefaultRpcClient provides the needed implementation support for Jackson. This class implements
049 * the constructors, static fields, and getters, but delegates the core
050 * {@link JsonRpcTransport#sendRequestForResponseAsync(JsonRpcRequest, Type)} method to a {@link JsonRpcTransport} implementation component.
051 */
052public class DefaultRpcClient implements JsonRpcClient<JavaType> {
053    private static final Logger log = LoggerFactory.getLogger(DefaultRpcClient.class);
054    private static final JsonRpcMessage.Version DEFAULT_JSON_RPC_VERSION = JsonRpcMessage.Version.V2;
055
056    /**
057     * Functional interface for creating JsonRpcTransport instances. This is used to prevent a circular
058     * dependency on {@link ObjectMapper}.
059     */
060    @FunctionalInterface
061    public interface TransportFactory {
062        /**
063         * @param mapper mapper that is shared between {@code DefaultRpcClient} and the {@link JsonRpcTransport}.
064         * @return a transport instance
065         */
066        JsonRpcTransport<JavaType> create(ObjectMapper mapper);
067    }
068    protected final JsonRpcMessage.Version jsonRpcVersion;
069    protected final ObjectMapper mapper;
070    private final JavaType defaultType;
071    private final JsonRpcTransport<JavaType> transport;
072
073    public DefaultRpcClient(URI server, final String rpcUser, final String rpcPassword) {
074        this(JsonRpcTransport.getDefaultSSLContext(), DEFAULT_JSON_RPC_VERSION, server, rpcUser, rpcPassword);
075    }
076
077    public DefaultRpcClient(JsonRpcMessage.Version jsonRpcVersion, URI server, final String rpcUser, final String rpcPassword) {
078        this(JsonRpcTransport.getDefaultSSLContext(), jsonRpcVersion, server, rpcUser, rpcPassword);
079    }
080    public DefaultRpcClient(SSLContext sslContext, JsonRpcMessage.Version jsonRpcVersion, URI server, final String rpcUser, final String rpcPassword) {
081        this((m) -> new JsonRpcClientJavaNet(m, sslContext, server, rpcUser, rpcPassword), jsonRpcVersion);
082    }
083
084    public DefaultRpcClient(TransportFactory transportFactory, JsonRpcMessage.Version jsonRpcVersion) {
085        this.jsonRpcVersion = jsonRpcVersion;
086        mapper = new ObjectMapper();
087        // TODO: Provide external API to configure FAIL_ON_UNKNOWN_PROPERTIES
088        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
089        defaultType = mapper.getTypeFactory().constructType(Object.class);
090        transport = transportFactory.create(mapper);
091    }
092
093    @Override
094    public JsonRpcMessage.Version getJsonRpcVersion() {
095        return jsonRpcVersion;
096    }
097
098    @Override
099    public URI getServerURI() {
100        return transport.getServerURI();
101    }
102
103    @Override
104    public <R> CompletableFuture<JsonRpcResponse<R>> sendRequestForResponseAsync(JsonRpcRequest request, JavaType responseType) {
105        return transport.sendRequestForResponseAsync(request, responseType);
106    }
107
108    /**
109     * Convenience method for requesting an asynchronous response with a {@link JsonNode} for the result.
110     * @param request The request to send
111     * @return A future JSON RPC Response with `result` of type {@code JsonNode}
112     */
113    public CompletableFuture<JsonRpcResponse<JsonNode>> sendRequestForResponseAsync(JsonRpcRequest request) {
114        return sendRequestForResponseAsync(request, responseTypeFor(JsonNode.class));
115    }
116
117    public ObjectMapper getMapper() {
118        return mapper;
119    }
120
121    @Override
122    public JavaType defaultType() {
123        return defaultType;
124    }
125
126    @Override
127    public JavaType responseTypeFor(JavaType resultType) {
128        return getMapper().getTypeFactory().
129                constructParametricType(JsonRpcResponse.class, resultType);
130    }
131
132    @Override
133    public JavaType responseTypeFor(Class<?> resultType) {
134        return getMapper().getTypeFactory().
135                constructParametricType(JsonRpcResponse.class, resultType);
136    }
137
138    @Override
139    public JavaType typeForClass(Class<?> clazz) {
140        return getMapper().constructType(clazz);
141    }
142
143    @Override
144    public JavaType collectionTypeForClasses(Class<? extends Collection> collection, Class<?> clazz) {
145        return getMapper().getTypeFactory()
146                .constructCollectionType(collection, clazz);
147    }
148
149    @Override
150    public JavaType collectionTypeForClasses(Class<? extends Collection> collection, JavaType type) {
151        return getMapper().getTypeFactory()
152                .constructCollectionType(collection, type);
153    }
154
155    public <T> CompletableFuture<JsonRpcResponse<T>> pollOnce(JsonRpcRequest request, JavaType resultType, TransientErrorMapper<T> errorMapper) {
156        CompletableFuture<JsonRpcResponse<T>> f = sendRequestForResponseAsync(request, responseTypeFor(resultType));
157        // In Java 12+ this can be replaced with exceptionallyCompose()
158        return f.handle((r, t) -> errorMapper.map(request, r, t))
159                .thenCompose(Function.identity());
160    }
161
162    /**
163     * A wait-for-server routine that is agnostic about which RPC methods the server supports. In addition to two {@link Duration}
164     * parameters, there are 2 lambda parameters to enable this method to work with any JSON-RPC server. This version will tell
165     * the JSON-RPC Mapper to return an {@link JsonRpcResponse} with a result of type {@link Object}. If you need more precise
166     * control over the type, use {@link #waitForServer(Duration, Duration, Supplier, JavaType, TransientErrorMapper)}.
167     * @param timeout how long to wait
168     * @param retry delay between retries
169     * @param requestSupplier supplier of requests (needs to increment request ID at the very least)
170     * @param errorMapper function that maps non-fatal errors (i.e. cases to keep polling)
171     * @return A future that returns a successful
172     * @param <T> The desired result type to be returned when the server is running
173     */
174    public <T> CompletableFuture<T> waitForServer(Duration timeout, Duration retry, Supplier<JsonRpcRequest> requestSupplier, TransientErrorMapper<T> errorMapper) {
175        return waitForServer(timeout,retry, requestSupplier, typeForClass(Object.class), errorMapper);
176    }
177
178    // TODO: What happens if you cancel this future?
179    /**
180    * A wait-for-server routine that is agnostic about which RPC methods the server supports. In addition to two {@link Duration}
181    * parameters, there are 3 parameters (2 functions and a generic type specifier) to enable this method to work with any JSON-RPC server.
182    * @param timeout how long to wait
183    * @param retry delay between retries
184    * @param requestSupplier supplier of requests (needs to increment request ID at the very least)
185    * @param resultType the result type for the response
186    * @param errorMapper function that maps non-fatal errors (i.e. cases to keep polling)
187    * @return A future that returns a successful
188    * @param <T> The desired result type to be returned when the server is running
189    */
190    public <T> CompletableFuture<T> waitForServer(Duration timeout, Duration retry, Supplier<JsonRpcRequest> requestSupplier, JavaType resultType, TransientErrorMapper<T> errorMapper) {
191        CompletableFuture<T> future = new CompletableFuture<>();
192        getDefaultAsyncExecutor().execute(() -> {
193            log.debug("Waiting for server RPC ready...");
194            String status;          // Status message for logging
195            String statusLast = null;
196            long seconds = 0;
197            while (seconds < timeout.toSeconds()) {
198                JsonRpcResponse<T> r;
199                try {
200                    // All non-fatal exceptions will be mapped to a JsonRpcError with code -20000
201                    r = this.pollOnce(requestSupplier.get(), resultType, errorMapper).get();
202                } catch (InterruptedException | ExecutionException e) {
203                    // If a fatal error occurred, fail our future and abort this thread
204                    log.error("Fatal exception: ", e);
205                    future.completeExceptionally(e);
206                    return;
207                }
208                if (r.getResult() != null) {
209                    // We received a response with a result, server is ready and has returned a usable result
210                    log.debug("RPC Ready.");
211                    future.complete(r.getResult());
212                    return;
213                }
214                // We received a response with a non-fatal error, log it and wait to retry.
215                status = statusFromErrorResponse(r);
216                // Log status messages only once, if new or updated
217                if (!status.equals(statusLast)) {
218                    log.info("Waiting for server: RPC Status: " + status);
219                    statusLast = status;
220                }
221                try {
222                    // Damnit, IntelliJ we're not busy-waiting we're polling!
223                    Thread.sleep(retry.toMillis());
224                    seconds += retry.toSeconds();
225                } catch (InterruptedException e) {
226                    log.error(e.toString());
227                    Thread.currentThread().interrupt();
228                    future.completeExceptionally(e);
229                    return;
230                }
231            }
232            String timeoutMessage = String.format("waitForServer() timed out after %d seconds", timeout.toSeconds());
233            log.error(timeoutMessage);
234            future.completeExceptionally(new TimeoutException(timeoutMessage));
235        });
236        return future;
237    }
238
239    /**
240     * Functional interface for ignoring what are considered "transient" errors. The definition of what is transient
241     * may vary depending upon the application. Different implementations of this function can be created for
242     * different applications.
243     * <p>
244     * The {@code JsonRpcResponse} returned may be a "synthetic" response, that is generated by the client,
245     * not by the server. The synthetic response will look like this:
246     *     <ul>
247     *         <li>error.code: -20000</li>
248     *         <li>error.message: "Server temporarily unavailable"</li>
249     *         <li>error.data: Detailed string message, e.g. "Connection refused"</li>
250     *     </ul>
251     * @param <T> The expected result type
252     */
253    @FunctionalInterface
254    public interface TransientErrorMapper<T> {
255        /**
256         * @param request The request we're handling completions for
257         * @param response response if one was successfully returned (or null)
258         * @param throwable exception if the call failed (or null)
259         * @return A completed or failed future than can replace the input (response, throwable) pair
260         */
261        CompletableFuture<JsonRpcResponse<T>> map(JsonRpcRequest request, JsonRpcResponse<T> response, Throwable throwable);
262    }
263
264    /**
265     * Transient error mapper that is a no-op, i.e. it passes all errors through unchanged.
266     */
267    protected  <T> CompletableFuture<JsonRpcResponse<T>> identityTransientErrorMapper(JsonRpcRequest request, JsonRpcResponse<T> response, Throwable t) {
268        return response != null
269                ? CompletableFuture.completedFuture(response)
270                : CompletableFuture.failedFuture(t);
271    }
272
273    protected <T> JsonRpcResponse<T> temporarilyUnavailableResponse(JsonRpcRequest request,  Throwable t) {
274        return new JsonRpcResponse<T>(request, new JsonRpcError(-2000, "Server temporarily unavailable", t.getMessage()));
275    }
276
277    /**
278     * @param response A response where {@code getResult() == null}
279     * @return An error status string suitable for log messages
280     */
281    protected String statusFromErrorResponse(JsonRpcResponse<?> response) {
282        Objects.requireNonNull(response);
283        if (response.getResult() != null) {
284            throw new IllegalStateException("This should only be called for responses with null result");
285        }
286        if (response.getError() == null) {
287            return "Invalid response both result and error were null";
288        } else if (response.getError().getData() != null) {
289            // Has option data, possibly the -2000 special case
290            return response.getError().getData().toString();
291        } else {
292            return response.getError().getMessage();
293        }
294    }
295}