001package org.consensusj.jsonrpc; 002 003import com.fasterxml.jackson.databind.DeserializationFeature; 004import com.fasterxml.jackson.databind.JavaType; 005import com.fasterxml.jackson.databind.JsonNode; 006import com.fasterxml.jackson.databind.ObjectMapper; 007import org.slf4j.Logger; 008import org.slf4j.LoggerFactory; 009 010import javax.net.ssl.SSLContext; 011import java.lang.reflect.Type; 012import java.time.Duration; 013import java.net.URI; 014import java.util.Collection; 015import java.util.List; 016import java.util.Objects; 017import java.util.concurrent.CompletableFuture; 018import java.util.concurrent.ExecutionException; 019import java.util.concurrent.TimeoutException; 020import java.util.function.Function; 021import java.util.function.Supplier; 022 023// We're overusing inheritance in this hierarchy. We are breaking Effective Java, Item 18: Favor composition over inheritance. 024// We're using inheritance to configure: 025// A. The set of JSON-RPC methods that are supported, e.g. `getblockcount()` (which creates potential conflicts with `send()` etc) 026// B. The JSON mapping implementation. Which for now and the foreseeable future is Jackson only. 027// 028// TODO: Maybe look at extracting a class with some common "mapper" functions between the two transport implementations 029// 030// The proper separation for (A) is probably a complete separation. There should be no required inheritance to implement 031// a client with a set of methods. Internally the client would have a transport and a mapper and those could optionally be made available 032// via some accessor methods if the client application deems necessary. 033// At the heart of (B) (at least as currently implemented) is the mapping from a Java (or Groovy) method name to (i) a JSON-RPC method 034// name, (ii) optional parameter type-conversion for JSON serialization in the request, and (iii) type mapping for the deserialization 035// of the `result` field in the JsonRpcResponse<RSLT>. It might be helpful to think of this as two functional mappings: 036// (1) Map Java method name and parameters to a JSON-RPC request (either map to set of Java Objects _or_ all the way to JSON) 037// (2) Map from received JSON-RPC response to JsonRpcResponse<RSLT> -- this response mapper function is configured as part of making the request. 038// 039// To abstract the specifics of Jackson from the (two) transport implementations. Basically methods/functions to 040// map from request to string/stream and to map from string/stream to response. The java.net.http implementation has already defined 041// some functional interfaces for this, so coming up with an interface that both the java.net.http implementation and the HttpUrlConnection 042// implementation can use will lead to this "SECOND STEP" 043// 044// Now that JsonRpcClient is a generic with <T extends Type>, we have loosened the Jackson coupling somewhat. 045// 046/** 047 * A strongly-typed, Jackson-based JSON-RPC client. {@link JsonRpcClient} provides many convenience send `default` methods in a 048 * JSON-library-independent way. DefaultRpcClient provides the needed implementation support for Jackson. This class implements 049 * the constructors, static fields, and getters, but delegates the core 050 * {@link JsonRpcTransport#sendRequestForResponseAsync(JsonRpcRequest, Type)} method to a {@link JsonRpcTransport} implementation component. 051 */ 052public class DefaultRpcClient implements JsonRpcClient<JavaType> { 053 private static final Logger log = LoggerFactory.getLogger(DefaultRpcClient.class); 054 private static final JsonRpcMessage.Version DEFAULT_JSON_RPC_VERSION = JsonRpcMessage.Version.V2; 055 056 /** 057 * Functional interface for creating JsonRpcTransport instances. This is used to prevent a circular 058 * dependency on {@link ObjectMapper}. 059 */ 060 @FunctionalInterface 061 public interface TransportFactory { 062 /** 063 * @param mapper mapper that is shared between {@code DefaultRpcClient} and the {@link JsonRpcTransport}. 064 * @return a transport instance 065 */ 066 JsonRpcTransport<JavaType> create(ObjectMapper mapper); 067 } 068 protected final JsonRpcMessage.Version jsonRpcVersion; 069 protected final ObjectMapper mapper; 070 private final JavaType defaultType; 071 private final JsonRpcTransport<JavaType> transport; 072 073 public DefaultRpcClient(URI server, final String rpcUser, final String rpcPassword) { 074 this(JsonRpcTransport.getDefaultSSLContext(), DEFAULT_JSON_RPC_VERSION, server, rpcUser, rpcPassword); 075 } 076 077 public DefaultRpcClient(JsonRpcMessage.Version jsonRpcVersion, URI server, final String rpcUser, final String rpcPassword) { 078 this(JsonRpcTransport.getDefaultSSLContext(), jsonRpcVersion, server, rpcUser, rpcPassword); 079 } 080 public DefaultRpcClient(SSLContext sslContext, JsonRpcMessage.Version jsonRpcVersion, URI server, final String rpcUser, final String rpcPassword) { 081 this((m) -> new JsonRpcClientJavaNet(m, sslContext, server, rpcUser, rpcPassword), jsonRpcVersion); 082 } 083 084 public DefaultRpcClient(TransportFactory transportFactory, JsonRpcMessage.Version jsonRpcVersion) { 085 this.jsonRpcVersion = jsonRpcVersion; 086 mapper = new ObjectMapper(); 087 // TODO: Provide external API to configure FAIL_ON_UNKNOWN_PROPERTIES 088 mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); 089 defaultType = mapper.getTypeFactory().constructType(Object.class); 090 transport = transportFactory.create(mapper); 091 } 092 093 @Override 094 public JsonRpcMessage.Version getJsonRpcVersion() { 095 return jsonRpcVersion; 096 } 097 098 @Override 099 public URI getServerURI() { 100 return transport.getServerURI(); 101 } 102 103 @Override 104 public <R> CompletableFuture<JsonRpcResponse<R>> sendRequestForResponseAsync(JsonRpcRequest request, JavaType responseType) { 105 return transport.sendRequestForResponseAsync(request, responseType); 106 } 107 108 /** 109 * Convenience method for requesting an asynchronous response with a {@link JsonNode} for the result. 110 * @param request The request to send 111 * @return A future JSON RPC Response with `result` of type {@code JsonNode} 112 */ 113 public CompletableFuture<JsonRpcResponse<JsonNode>> sendRequestForResponseAsync(JsonRpcRequest request) { 114 return sendRequestForResponseAsync(request, responseTypeFor(JsonNode.class)); 115 } 116 117 public ObjectMapper getMapper() { 118 return mapper; 119 } 120 121 @Override 122 public JavaType defaultType() { 123 return defaultType; 124 } 125 126 @Override 127 public JavaType responseTypeFor(JavaType resultType) { 128 return getMapper().getTypeFactory(). 129 constructParametricType(JsonRpcResponse.class, resultType); 130 } 131 132 @Override 133 public JavaType responseTypeFor(Class<?> resultType) { 134 return getMapper().getTypeFactory(). 135 constructParametricType(JsonRpcResponse.class, resultType); 136 } 137 138 @Override 139 public JavaType typeForClass(Class<?> clazz) { 140 return getMapper().constructType(clazz); 141 } 142 143 @Override 144 public JavaType collectionTypeForClasses(Class<? extends Collection> collection, Class<?> clazz) { 145 return getMapper().getTypeFactory() 146 .constructCollectionType(collection, clazz); 147 } 148 149 @Override 150 public JavaType collectionTypeForClasses(Class<? extends Collection> collection, JavaType type) { 151 return getMapper().getTypeFactory() 152 .constructCollectionType(collection, type); 153 } 154 155 public <T> CompletableFuture<JsonRpcResponse<T>> pollOnce(JsonRpcRequest request, JavaType resultType, TransientErrorMapper<T> errorMapper) { 156 CompletableFuture<JsonRpcResponse<T>> f = sendRequestForResponseAsync(request, responseTypeFor(resultType)); 157 // In Java 12+ this can be replaced with exceptionallyCompose() 158 return f.handle((r, t) -> errorMapper.map(request, r, t)) 159 .thenCompose(Function.identity()); 160 } 161 162 /** 163 * A wait-for-server routine that is agnostic about which RPC methods the server supports. In addition to two {@link Duration} 164 * parameters, there are 2 lambda parameters to enable this method to work with any JSON-RPC server. This version will tell 165 * the JSON-RPC Mapper to return an {@link JsonRpcResponse} with a result of type {@link Object}. If you need more precise 166 * control over the type, use {@link #waitForServer(Duration, Duration, Supplier, JavaType, TransientErrorMapper)}. 167 * @param timeout how long to wait 168 * @param retry delay between retries 169 * @param requestSupplier supplier of requests (needs to increment request ID at the very least) 170 * @param errorMapper function that maps non-fatal errors (i.e. cases to keep polling) 171 * @return A future that returns a successful 172 * @param <T> The desired result type to be returned when the server is running 173 */ 174 public <T> CompletableFuture<T> waitForServer(Duration timeout, Duration retry, Supplier<JsonRpcRequest> requestSupplier, TransientErrorMapper<T> errorMapper) { 175 return waitForServer(timeout,retry, requestSupplier, typeForClass(Object.class), errorMapper); 176 } 177 178 // TODO: What happens if you cancel this future? 179 /** 180 * A wait-for-server routine that is agnostic about which RPC methods the server supports. In addition to two {@link Duration} 181 * parameters, there are 3 parameters (2 functions and a generic type specifier) to enable this method to work with any JSON-RPC server. 182 * @param timeout how long to wait 183 * @param retry delay between retries 184 * @param requestSupplier supplier of requests (needs to increment request ID at the very least) 185 * @param resultType the result type for the response 186 * @param errorMapper function that maps non-fatal errors (i.e. cases to keep polling) 187 * @return A future that returns a successful 188 * @param <T> The desired result type to be returned when the server is running 189 */ 190 public <T> CompletableFuture<T> waitForServer(Duration timeout, Duration retry, Supplier<JsonRpcRequest> requestSupplier, JavaType resultType, TransientErrorMapper<T> errorMapper) { 191 CompletableFuture<T> future = new CompletableFuture<>(); 192 getDefaultAsyncExecutor().execute(() -> { 193 log.debug("Waiting for server RPC ready..."); 194 String status; // Status message for logging 195 String statusLast = null; 196 long seconds = 0; 197 while (seconds < timeout.toSeconds()) { 198 JsonRpcResponse<T> r; 199 try { 200 // All non-fatal exceptions will be mapped to a JsonRpcError with code -20000 201 r = this.pollOnce(requestSupplier.get(), resultType, errorMapper).get(); 202 } catch (InterruptedException | ExecutionException e) { 203 // If a fatal error occurred, fail our future and abort this thread 204 log.error("Fatal exception: ", e); 205 future.completeExceptionally(e); 206 return; 207 } 208 if (r.getResult() != null) { 209 // We received a response with a result, server is ready and has returned a usable result 210 log.debug("RPC Ready."); 211 future.complete(r.getResult()); 212 return; 213 } 214 // We received a response with a non-fatal error, log it and wait to retry. 215 status = statusFromErrorResponse(r); 216 // Log status messages only once, if new or updated 217 if (!status.equals(statusLast)) { 218 log.info("Waiting for server: RPC Status: " + status); 219 statusLast = status; 220 } 221 try { 222 // Damnit, IntelliJ we're not busy-waiting we're polling! 223 Thread.sleep(retry.toMillis()); 224 seconds += retry.toSeconds(); 225 } catch (InterruptedException e) { 226 log.error(e.toString()); 227 Thread.currentThread().interrupt(); 228 future.completeExceptionally(e); 229 return; 230 } 231 } 232 String timeoutMessage = String.format("waitForServer() timed out after %d seconds", timeout.toSeconds()); 233 log.error(timeoutMessage); 234 future.completeExceptionally(new TimeoutException(timeoutMessage)); 235 }); 236 return future; 237 } 238 239 /** 240 * Functional interface for ignoring what are considered "transient" errors. The definition of what is transient 241 * may vary depending upon the application. Different implementations of this function can be created for 242 * different applications. 243 * <p> 244 * The {@code JsonRpcResponse} returned may be a "synthetic" response, that is generated by the client, 245 * not by the server. The synthetic response will look like this: 246 * <ul> 247 * <li>error.code: -20000</li> 248 * <li>error.message: "Server temporarily unavailable"</li> 249 * <li>error.data: Detailed string message, e.g. "Connection refused"</li> 250 * </ul> 251 * @param <T> The expected result type 252 */ 253 @FunctionalInterface 254 public interface TransientErrorMapper<T> { 255 /** 256 * @param request The request we're handling completions for 257 * @param response response if one was successfully returned (or null) 258 * @param throwable exception if the call failed (or null) 259 * @return A completed or failed future than can replace the input (response, throwable) pair 260 */ 261 CompletableFuture<JsonRpcResponse<T>> map(JsonRpcRequest request, JsonRpcResponse<T> response, Throwable throwable); 262 } 263 264 /** 265 * Transient error mapper that is a no-op, i.e. it passes all errors through unchanged. 266 */ 267 protected <T> CompletableFuture<JsonRpcResponse<T>> identityTransientErrorMapper(JsonRpcRequest request, JsonRpcResponse<T> response, Throwable t) { 268 return response != null 269 ? CompletableFuture.completedFuture(response) 270 : CompletableFuture.failedFuture(t); 271 } 272 273 protected <T> JsonRpcResponse<T> temporarilyUnavailableResponse(JsonRpcRequest request, Throwable t) { 274 return new JsonRpcResponse<T>(request, new JsonRpcError(-2000, "Server temporarily unavailable", t.getMessage())); 275 } 276 277 /** 278 * @param response A response where {@code getResult() == null} 279 * @return An error status string suitable for log messages 280 */ 281 protected String statusFromErrorResponse(JsonRpcResponse<?> response) { 282 Objects.requireNonNull(response); 283 if (response.getResult() != null) { 284 throw new IllegalStateException("This should only be called for responses with null result"); 285 } 286 if (response.getError() == null) { 287 return "Invalid response both result and error were null"; 288 } else if (response.getError().getData() != null) { 289 // Has option data, possibly the -2000 special case 290 return response.getError().getData().toString(); 291 } else { 292 return response.getError().getMessage(); 293 } 294 } 295}