001package org.consensusj.exchange;
002
003import org.consensusj.exchange.knowm.KnowmExchangeRateProvider;
004import org.knowm.xchange.Exchange;
005import org.knowm.xchange.currency.CurrencyPair;
006import org.knowm.xchange.dto.marketdata.Ticker;
007import org.slf4j.Logger;
008import org.slf4j.LoggerFactory;
009
010import javax.money.CurrencyUnit;
011import javax.money.convert.ExchangeRateProvider;
012import java.io.IOException;
013import java.util.ArrayList;
014import java.util.Arrays;
015import java.util.Collection;
016import java.util.HashMap;
017import java.util.List;
018import java.util.Map;
019import java.util.concurrent.CountDownLatch;
020import java.util.concurrent.Executors;
021import java.util.concurrent.ScheduledExecutorService;
022import java.util.concurrent.ScheduledFuture;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.TimeoutException;
025import java.util.stream.Collectors;
026
027/**
028 *  Base ExchangeRateProvider using XChange library
029 *  Currently supports current DEFERRED rates only
030 * @deprecated Will be removed in the next release.
031 */
032@Deprecated
033public final class BaseXChangeExchangeRateProvider extends KnowmExchangeRateProvider implements ExchangeRateProvider, ObservableExchangeRateProvider {
034    private static final Logger log = LoggerFactory.getLogger(BaseXChangeExchangeRateProvider.class);
035    private static final int initialDelay = 0;
036    private static final int period = 60;
037
038    private final ScheduledExecutorService stpe;
039    private volatile boolean started = false;
040    private volatile boolean stopping = false;
041    private ScheduledFuture<?> future;
042
043    /**
044     * Construct using an XChange Exchange class object for a set of currencies
045     * @param exchangeClassName Classname of XChange exchange we are wrapping
046     * @param scheduledExecutorService Executor service for running polling task
047     * @param pairs pairs to monitor
048     */
049    public BaseXChangeExchangeRateProvider(String exchangeClassName,
050                                              ScheduledExecutorService scheduledExecutorService,
051                                              Map<CurrencyUnit, String> tickerSymbolConversions,
052                                              Collection<CurrencyUnitPair> pairs) {
053        super(exchangeClassName, tickerSymbolConversions, pairs);
054        stpe = (scheduledExecutorService != null) ? scheduledExecutorService : Executors.newScheduledThreadPool(1);
055    }
056
057    public BaseXChangeExchangeRateProvider(String exchangeClassName,
058                                              ScheduledExecutorService scheduledExecutorService,
059                                              Map<CurrencyUnit, String> tickerSymbolConversions,
060                                              CurrencyUnitPair... pairs) {
061        this(exchangeClassName, scheduledExecutorService, tickerSymbolConversions, Arrays.asList(pairs));
062    }
063
064    /**
065     * Construct using an XChange Exchange class object for a set of currencies
066     * @param exchangeClass Class of XChange exchange we are wrapping
067     * @param pairs pairs to monitor
068     */
069    public BaseXChangeExchangeRateProvider(Class<? extends Exchange> exchangeClass,
070                                              CurrencyUnitPair... pairs) {
071        this(exchangeClass.getName(),
072                null,
073                null,
074                Arrays.asList(pairs));
075    }
076
077    /**
078     * Construct using an XChange Exchange class object for a set of currencies
079     * @param exchangeClass Class of XChange exchange we are wrapping
080     * @param scheduledExecutorService Executor service for running polling task
081     * @param pairs pairs to monitor
082     */
083    public BaseXChangeExchangeRateProvider(Class<? extends Exchange> exchangeClass,
084                                              ScheduledExecutorService scheduledExecutorService,
085                                              CurrencyUnitPair... pairs) {
086        this(exchangeClass.getName(), scheduledExecutorService, null, Arrays.asList(pairs));
087    }
088
089    public BaseXChangeExchangeRateProvider(Class<? extends Exchange> exchangeClass,
090                                              ScheduledExecutorService scheduledExecutorService,
091                                              String... pairs) {
092        this(exchangeClass.getName(), scheduledExecutorService, null, ExchangeUtils.pairsConvert(pairs));
093    }
094
095    public BaseXChangeExchangeRateProvider(Class<? extends Exchange> exchangeClass,
096                                              String... pairs) {
097        this(exchangeClass.getName(), null, null, ExchangeUtils.pairsConvert(pairs));
098    }
099
100    public BaseXChangeExchangeRateProvider(String exchangeClassName, ScheduledExecutorService scheduledExecutorService, String[] pairs) {
101        this(exchangeClassName, scheduledExecutorService, null, ExchangeUtils.pairsConvert(pairs));
102    }
103    
104
105
106    /**
107     * Initialize the exchange provider and start polling thread
108     */
109    @Override
110    public synchronized void start()  {
111        if (!started) {
112            initialize();
113            future = stpe.scheduleWithFixedDelay(this::poll, initialDelay, period, TimeUnit.SECONDS);
114        }
115    }
116
117    /**
118     * stop the polling thread
119     */
120    @Override
121    public synchronized void stop() {
122        if (started && !stopping) {
123            stopping = true;
124            final ScheduledFuture<?> handle = future;
125            Runnable task = () -> handle.cancel(true);
126            stpe.schedule(task, 0, TimeUnit.SECONDS);
127        }
128    }
129
130    /**
131     * Poll the exchange for updated Tickers
132     */
133    void poll() {
134        monitoredCurrencies.forEach((key, monitor) -> {
135            try {
136                monitor.setTicker(marketDataService.getTicker(monitor.exchangePair));
137            } catch (IOException e) {
138                // TODO: Exceptions should not be swallowed here (or at least not all exceptions)
139                // Some IOExceptions may warrant retries, but not all of them
140                // log and ignore IOException (we'll try polling again next interval)
141                // Actually I'm seeing that the CoinMarketCap ticker is returning IOException
142                // when it should return NotAvailableFromExchangeException
143                log.error("IOException in BaseXChangeExchangeRateProvider::poll: {}", e);
144            }
145            notifyExchangeRateObservers(monitor);
146        });
147    }
148
149    @Override
150    public void registerExchangeRateObserver(CurrencyUnitPair pair, ExchangeRateObserver observer) {
151        // TODO: validate rate as one this provider supports
152        MonitoredCurrency monitor = monitoredCurrencies.get(pair);
153        monitor.observerList.add(observer);
154        // If we've got data already, call observer immediately
155        if (monitor.isTickerAvailable()) {
156            notifyObserver(observer, pair, monitor);
157        }
158    }
159
160    private void notifyExchangeRateObservers(MonitoredCurrency monitor) {
161        monitor.observerList.forEach(observer -> notifyObserver(observer, monitor.pair, monitor));
162    }
163
164    private void notifyObserver(ExchangeRateObserver observer, CurrencyUnitPair pair, MonitoredCurrency monitor) {
165        try {
166            observer.onExchangeRateChange(buildExchangeRateChange(pair, monitor.getTicker()));
167        } catch (InterruptedException e) {
168            Thread.currentThread().interrupt(); // Restore interruption flag just in case
169        } catch (TimeoutException e) {
170            throw new RuntimeException(e);      // Unlikely to happen since ticker has usually been fetched
171        }
172    }
173
174}