001package org.consensusj.exchange; 002 003import org.consensusj.exchange.knowm.KnowmExchangeRateProvider; 004import org.knowm.xchange.Exchange; 005import org.knowm.xchange.currency.CurrencyPair; 006import org.knowm.xchange.dto.marketdata.Ticker; 007import org.slf4j.Logger; 008import org.slf4j.LoggerFactory; 009 010import javax.money.CurrencyUnit; 011import javax.money.convert.ExchangeRateProvider; 012import java.io.IOException; 013import java.util.ArrayList; 014import java.util.Arrays; 015import java.util.Collection; 016import java.util.HashMap; 017import java.util.List; 018import java.util.Map; 019import java.util.concurrent.CountDownLatch; 020import java.util.concurrent.Executors; 021import java.util.concurrent.ScheduledExecutorService; 022import java.util.concurrent.ScheduledFuture; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.TimeoutException; 025import java.util.stream.Collectors; 026 027/** 028 * Base ExchangeRateProvider using XChange library 029 * Currently supports current DEFERRED rates only 030 * @deprecated Will be removed in the next release. 031 */ 032@Deprecated 033public final class BaseXChangeExchangeRateProvider extends KnowmExchangeRateProvider implements ExchangeRateProvider, ObservableExchangeRateProvider { 034 private static final Logger log = LoggerFactory.getLogger(BaseXChangeExchangeRateProvider.class); 035 private static final int initialDelay = 0; 036 private static final int period = 60; 037 038 private final ScheduledExecutorService stpe; 039 private volatile boolean started = false; 040 private volatile boolean stopping = false; 041 private ScheduledFuture<?> future; 042 043 /** 044 * Construct using an XChange Exchange class object for a set of currencies 045 * @param exchangeClassName Classname of XChange exchange we are wrapping 046 * @param scheduledExecutorService Executor service for running polling task 047 * @param pairs pairs to monitor 048 */ 049 public BaseXChangeExchangeRateProvider(String exchangeClassName, 050 ScheduledExecutorService scheduledExecutorService, 051 Map<CurrencyUnit, String> tickerSymbolConversions, 052 Collection<CurrencyUnitPair> pairs) { 053 super(exchangeClassName, tickerSymbolConversions, pairs); 054 stpe = (scheduledExecutorService != null) ? scheduledExecutorService : Executors.newScheduledThreadPool(1); 055 } 056 057 public BaseXChangeExchangeRateProvider(String exchangeClassName, 058 ScheduledExecutorService scheduledExecutorService, 059 Map<CurrencyUnit, String> tickerSymbolConversions, 060 CurrencyUnitPair... pairs) { 061 this(exchangeClassName, scheduledExecutorService, tickerSymbolConversions, Arrays.asList(pairs)); 062 } 063 064 /** 065 * Construct using an XChange Exchange class object for a set of currencies 066 * @param exchangeClass Class of XChange exchange we are wrapping 067 * @param pairs pairs to monitor 068 */ 069 public BaseXChangeExchangeRateProvider(Class<? extends Exchange> exchangeClass, 070 CurrencyUnitPair... pairs) { 071 this(exchangeClass.getName(), 072 null, 073 null, 074 Arrays.asList(pairs)); 075 } 076 077 /** 078 * Construct using an XChange Exchange class object for a set of currencies 079 * @param exchangeClass Class of XChange exchange we are wrapping 080 * @param scheduledExecutorService Executor service for running polling task 081 * @param pairs pairs to monitor 082 */ 083 public BaseXChangeExchangeRateProvider(Class<? extends Exchange> exchangeClass, 084 ScheduledExecutorService scheduledExecutorService, 085 CurrencyUnitPair... pairs) { 086 this(exchangeClass.getName(), scheduledExecutorService, null, Arrays.asList(pairs)); 087 } 088 089 public BaseXChangeExchangeRateProvider(Class<? extends Exchange> exchangeClass, 090 ScheduledExecutorService scheduledExecutorService, 091 String... pairs) { 092 this(exchangeClass.getName(), scheduledExecutorService, null, ExchangeUtils.pairsConvert(pairs)); 093 } 094 095 public BaseXChangeExchangeRateProvider(Class<? extends Exchange> exchangeClass, 096 String... pairs) { 097 this(exchangeClass.getName(), null, null, ExchangeUtils.pairsConvert(pairs)); 098 } 099 100 public BaseXChangeExchangeRateProvider(String exchangeClassName, ScheduledExecutorService scheduledExecutorService, String[] pairs) { 101 this(exchangeClassName, scheduledExecutorService, null, ExchangeUtils.pairsConvert(pairs)); 102 } 103 104 105 106 /** 107 * Initialize the exchange provider and start polling thread 108 */ 109 @Override 110 public synchronized void start() { 111 if (!started) { 112 initialize(); 113 future = stpe.scheduleWithFixedDelay(this::poll, initialDelay, period, TimeUnit.SECONDS); 114 } 115 } 116 117 /** 118 * stop the polling thread 119 */ 120 @Override 121 public synchronized void stop() { 122 if (started && !stopping) { 123 stopping = true; 124 final ScheduledFuture<?> handle = future; 125 Runnable task = () -> handle.cancel(true); 126 stpe.schedule(task, 0, TimeUnit.SECONDS); 127 } 128 } 129 130 /** 131 * Poll the exchange for updated Tickers 132 */ 133 void poll() { 134 monitoredCurrencies.forEach((key, monitor) -> { 135 try { 136 monitor.setTicker(marketDataService.getTicker(monitor.exchangePair)); 137 } catch (IOException e) { 138 // TODO: Exceptions should not be swallowed here (or at least not all exceptions) 139 // Some IOExceptions may warrant retries, but not all of them 140 // log and ignore IOException (we'll try polling again next interval) 141 // Actually I'm seeing that the CoinMarketCap ticker is returning IOException 142 // when it should return NotAvailableFromExchangeException 143 log.error("IOException in BaseXChangeExchangeRateProvider::poll: {}", e); 144 } 145 notifyExchangeRateObservers(monitor); 146 }); 147 } 148 149 @Override 150 public void registerExchangeRateObserver(CurrencyUnitPair pair, ExchangeRateObserver observer) { 151 // TODO: validate rate as one this provider supports 152 MonitoredCurrency monitor = monitoredCurrencies.get(pair); 153 monitor.observerList.add(observer); 154 // If we've got data already, call observer immediately 155 if (monitor.isTickerAvailable()) { 156 notifyObserver(observer, pair, monitor); 157 } 158 } 159 160 private void notifyExchangeRateObservers(MonitoredCurrency monitor) { 161 monitor.observerList.forEach(observer -> notifyObserver(observer, monitor.pair, monitor)); 162 } 163 164 private void notifyObserver(ExchangeRateObserver observer, CurrencyUnitPair pair, MonitoredCurrency monitor) { 165 try { 166 observer.onExchangeRateChange(buildExchangeRateChange(pair, monitor.getTicker())); 167 } catch (InterruptedException e) { 168 Thread.currentThread().interrupt(); // Restore interruption flag just in case 169 } catch (TimeoutException e) { 170 throw new RuntimeException(e); // Unlikely to happen since ticker has usually been fetched 171 } 172 } 173 174}