diff --git a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingTradeService.java b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingTradeService.java index 9af3df1397..ea491fc6c8 100644 --- a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingTradeService.java +++ b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingTradeService.java @@ -193,12 +193,14 @@ public Observable getPositionChanges(Instrument instrument) { } } - public Single placeMarketOrder(MarketOrder marketOrder) { - return placeOrder(marketOrder); + @Override + public Single placeMarketOrder(MarketOrder order, Object... args) { + return placeOrder(order); } - public Single placeLimitOrder(LimitOrder limitOrder) { - return placeOrder(limitOrder); + @Override + public Single placeLimitOrder(LimitOrder order, Object... args) { + return placeOrder(order); } public Single placeOrder(Order order) { @@ -261,7 +263,8 @@ private Observable placeOrderInternal(Order order) { }); } - public Single changeOrder(LimitOrder limitOrder, CancelOrderParams... orderParams) { + @Override + public Single changeOrder(LimitOrder limitOrder, Object... args) { if (binanceUserTradeStreamingService.isAuthorized()) { if (exchange.isFuturesEnabled()) { return binanceUserTradeStreamingService @@ -297,8 +300,7 @@ public Single changeOrder(LimitOrder limitOrder, CancelOrderParams... o .subscribeChannel( String.valueOf(System.nanoTime()), "order.cancelReplace", - limitOrder, - orderParams[0]) + limitOrder) .flatMap( node -> { TypeReference< @@ -330,13 +332,13 @@ public Single changeOrder(LimitOrder limitOrder, CancelOrderParams... o } else { throw new UnsupportedOperationException("Only spot and futures supported"); } - } else { throw new UnsupportedOperationException("binanceUserTradeStreamingService not authorized"); } } - public Single cancelOrder(CancelOrderParams orderParams) { + @Override + public Single cancelOrder(CancelOrderParams orderParams, Object... args) { if (binanceUserTradeStreamingService.isAuthorized()) { if (exchange.isFuturesEnabled() || exchange.isSpotEnabled()) { Observable observable = diff --git a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUserTradeStreamingService.java b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUserTradeStreamingService.java index 3f611d22d5..19597fd550 100644 --- a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUserTradeStreamingService.java +++ b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUserTradeStreamingService.java @@ -231,7 +231,8 @@ public String getSubscribeMessage(String channelName, Object... args) throws IOE case "order.cancelReplace": { LimitOrder limitOrder = (LimitOrder) args[1]; - BinanceCancelOrderParams params = (BinanceCancelOrderParams) args[2]; + BinanceCancelOrderParams params = + new BinanceCancelOrderParams(limitOrder.getInstrument(), limitOrder.getId(), limitOrder.getUserReference()); Long cancelOrderId = null; if (params.getOrderId() != null && !params.getOrderId().isEmpty()) { cancelOrderId = Long.valueOf(params.getOrderId()); diff --git a/xchange-stream-binance/src/test/java/info/bitrich/xchangestream/binance/examples/BinanceFutureStreamWebsocketTradeTest.java b/xchange-stream-binance/src/test/java/info/bitrich/xchangestream/binance/examples/BinanceFutureStreamWebsocketTradeTest.java index d98624c2db..1296ce56a2 100644 --- a/xchange-stream-binance/src/test/java/info/bitrich/xchangestream/binance/examples/BinanceFutureStreamWebsocketTradeTest.java +++ b/xchange-stream-binance/src/test/java/info/bitrich/xchangestream/binance/examples/BinanceFutureStreamWebsocketTradeTest.java @@ -64,8 +64,6 @@ public void websocketTrade() throws InterruptedException, IOException { while (!exchange.isAlive()) { Thread.sleep(100L); } - BinanceStreamingTradeService binanceStreamingTradeService = - ((BinanceStreamingTradeService) exchange.getStreamingTradeService()); BigDecimal minAmount = exchange.getExchangeMetaData().getInstruments().get(instrument2).getMinimumAmount(); Ticker ticker = exchange.getMarketDataService().getTicker(instrument2); @@ -84,7 +82,7 @@ public void websocketTrade() throws InterruptedException, IOException { .userReference(limitOrderUserId) .build(); Disposable limitOrderDisposable = - binanceStreamingTradeService + exchange.getStreamingTradeService() .placeLimitOrder(limitOrder) .subscribe( result -> { @@ -102,7 +100,7 @@ public void websocketTrade() throws InterruptedException, IOException { .userReference(limitOrderUserId) .build(); Disposable changeOrderDisposable = - binanceStreamingTradeService + exchange.getStreamingTradeService() .changeOrder(changeOrder) .subscribe( result -> { @@ -115,7 +113,7 @@ public void websocketTrade() throws InterruptedException, IOException { LOG.info("changeOrder disposed: {}", changeOrderDisposable.isDisposed()); Disposable cancelOrderDisposable = - binanceStreamingTradeService + exchange.getStreamingTradeService() .cancelOrder(new BinanceCancelOrderParams(instrument2, null, limitOrderUserId)) .subscribe( result -> { @@ -133,7 +131,7 @@ public void websocketTrade() throws InterruptedException, IOException { .userReference(marketOrderUserId) .build(); Disposable marketOrderDisposable = - binanceStreamingTradeService + exchange.getStreamingTradeService() .placeMarketOrder(marketOrder) .doOnError(error -> LOG.error("placeMarketOrder error", error)) .subscribe( diff --git a/xchange-stream-binance/src/test/java/info/bitrich/xchangestream/binance/examples/BinanceSpotStreamWebsocketTradeTest.java b/xchange-stream-binance/src/test/java/info/bitrich/xchangestream/binance/examples/BinanceSpotStreamWebsocketTradeTest.java index f511aecb43..64152adada 100644 --- a/xchange-stream-binance/src/test/java/info/bitrich/xchangestream/binance/examples/BinanceSpotStreamWebsocketTradeTest.java +++ b/xchange-stream-binance/src/test/java/info/bitrich/xchangestream/binance/examples/BinanceSpotStreamWebsocketTradeTest.java @@ -64,8 +64,6 @@ public void websocketTrade() throws InterruptedException, IOException { while (!exchange.isAlive()) { Thread.sleep(100L); } - BinanceStreamingTradeService binanceStreamingTradeService = - ((BinanceStreamingTradeService) exchange.getStreamingTradeService()); BigDecimal minAmount = exchange.getExchangeMetaData().getInstruments().get(instrument2).getMinimumAmount(); Ticker ticker = exchange.getMarketDataService().getTicker(instrument2); @@ -84,7 +82,7 @@ public void websocketTrade() throws InterruptedException, IOException { .userReference(limitOrderUserId) .build(); Disposable limitOrderDisposable = - binanceStreamingTradeService + exchange.getStreamingTradeService() .placeLimitOrder(limitOrder) .subscribe( result -> { @@ -103,8 +101,8 @@ public void websocketTrade() throws InterruptedException, IOException { .userReference(limitOrderUserId) .build(); Disposable changeOrderDisposable = - binanceStreamingTradeService - .changeOrder(changeOrder, cancelOrderParams) + exchange.getStreamingTradeService() + .changeOrder(changeOrder) .subscribe( result -> { if (logOutput) { @@ -116,7 +114,7 @@ public void websocketTrade() throws InterruptedException, IOException { LOG.info("changeOrder disposed: {}", changeOrderDisposable.isDisposed()); Disposable cancelOrderDisposable = - binanceStreamingTradeService + exchange.getStreamingTradeService() .cancelOrder(cancelOrderParams) .subscribe( result -> { @@ -134,7 +132,7 @@ public void websocketTrade() throws InterruptedException, IOException { .userReference(marketOrderUserId) .build(); Disposable marketOrderDisposable = - binanceStreamingTradeService + exchange.getStreamingTradeService() .placeMarketOrder(marketOrder) .doOnError(error -> LOG.error("placeMarketOrder error", error)) .subscribe( diff --git a/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingTradeService.java b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingTradeService.java index c25cbd6387..4291b89110 100644 --- a/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingTradeService.java +++ b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingTradeService.java @@ -46,7 +46,8 @@ public BybitStreamingTradeService( this.userTradeService = userTradeService; } - public Single placeMarketOrder(MarketOrder order) { + @Override + public Single placeMarketOrder(MarketOrder order, Object... args) { BybitCategory category = BybitAdapters.getCategory(order.getInstrument()); Observable observable = userTradeService @@ -68,7 +69,8 @@ public Single placeMarketOrder(MarketOrder order) { .toSingle(); } - public Single placeLimitOrder(LimitOrder order) { + @Override + public Single placeLimitOrder(LimitOrder order, Object... args) { BybitCategory category = BybitAdapters.getCategory(order.getInstrument()); Observable observable = userTradeService @@ -90,7 +92,8 @@ public Single placeLimitOrder(LimitOrder order) { .toSingle(); } - public Single changeOrder(LimitOrder order) { + @Override + public Single changeOrder(LimitOrder order, Object... args) { BybitCategory category = BybitAdapters.getCategory(order.getInstrument()); Observable observable = userTradeService @@ -146,7 +149,8 @@ public Single> batchChangeOrder(List orders) { } } - public Single cancelOrder(CancelOrderParams params) { + @Override + public Single cancelOrder(CancelOrderParams params, Object... args) { BybitCancelOrderParams bybitParams = (BybitCancelOrderParams) params; BybitCategory category = BybitAdapters.getCategory(bybitParams.getInstrument()); Observable observable = diff --git a/xchange-stream-bybit/src/test/java/info/bitrich/xchangestream/bybit/example/BybitStreamSpotWebsocketTradeExample.java b/xchange-stream-bybit/src/test/java/info/bitrich/xchangestream/bybit/example/BybitStreamSpotWebsocketTradeExample.java index 423260f0f1..dfe0313625 100644 --- a/xchange-stream-bybit/src/test/java/info/bitrich/xchangestream/bybit/example/BybitStreamSpotWebsocketTradeExample.java +++ b/xchange-stream-bybit/src/test/java/info/bitrich/xchangestream/bybit/example/BybitStreamSpotWebsocketTradeExample.java @@ -4,6 +4,7 @@ import static info.bitrich.xchangestream.bybit.example.BaseBybitExchange.connectMainApi; import info.bitrich.xchangestream.bybit.BybitStreamingExchange; +import info.bitrich.xchangestream.core.StreamingExchange; import io.reactivex.rxjava3.disposables.CompositeDisposable; import io.reactivex.rxjava3.disposables.Disposable; import java.io.IOException; @@ -28,19 +29,21 @@ public class BybitStreamSpotWebsocketTradeExample { private static final Logger LOG = LoggerFactory.getLogger(BybitStreamSpotWebsocketTradeExample.class); static Instrument instrument = new CurrencyPair("XRP/USDT"); - static BybitStreamingExchange exchange; + static BybitStreamingExchange bybitExchange; + static StreamingExchange exchange; public static void main(String[] args) throws IOException { - exchange = (BybitStreamingExchange) connectMainApi(BybitCategory.SPOT, true); + exchange = connectMainApi(BybitCategory.SPOT, true); + bybitExchange = (BybitStreamingExchange) exchange; try { - while (!exchange.isAlive()) { + while (!bybitExchange.isAlive()) { TimeUnit.MILLISECONDS.sleep(100); } // main(not demo) api only websocketTradeExample(); Thread.sleep(1000); websocketBatchTradeExample(); - exchange.disconnect().blockingAwait(); + bybitExchange.disconnect().blockingAwait(); } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { @@ -52,7 +55,7 @@ private static void websocketBatchTradeExample() throws IOException, Interrupted CompositeDisposable compositeDisposable = new CompositeDisposable(); BigDecimal minAmount = exchange.getExchangeMetaData().getInstruments().get(instrument).getMinimumAmount(); - Ticker ticker = exchange.getMarketDataService().getTicker(instrument); + Ticker ticker = bybitExchange.getMarketDataService().getTicker(instrument); minAmount = getMinAmount( new BigDecimal("12"), @@ -104,7 +107,7 @@ private static void websocketBatchTradeExample() throws IOException, Interrupted .userReference(limitOrder2UserId) .build(); compositeDisposable.add( - exchange + bybitExchange .getStreamingTradeService() .batchChangeOrder(List.of(changeOrder1, changeOrder2)) .subscribe( @@ -117,7 +120,7 @@ private static void websocketBatchTradeExample() throws IOException, Interrupted ordersToCancel.add(new BybitCancelOrderParams(instrument, "", limitOrder1UserId)); ordersToCancel.add(new BybitCancelOrderParams(instrument, "", limitOrder2UserId)); compositeDisposable.add( - exchange + bybitExchange .getStreamingTradeService() .batchCancelOrder(ordersToCancel) .subscribe( @@ -132,7 +135,7 @@ private static void websocketBatchTradeExample() throws IOException, Interrupted private static void websocketTradeExample() throws IOException, InterruptedException { BigDecimal minAmount = exchange.getExchangeMetaData().getInstruments().get(instrument).getMinimumAmount(); - Ticker ticker = exchange.getMarketDataService().getTicker(instrument); + Ticker ticker = bybitExchange.getMarketDataService().getTicker(instrument); minAmount = getMinAmount( new BigDecimal("12"), diff --git a/xchange-stream-core/src/main/java/info/bitrich/xchangestream/core/StreamingTradeService.java b/xchange-stream-core/src/main/java/info/bitrich/xchangestream/core/StreamingTradeService.java index 426896d75c..9de5b5c7f5 100644 --- a/xchange-stream-core/src/main/java/info/bitrich/xchangestream/core/StreamingTradeService.java +++ b/xchange-stream-core/src/main/java/info/bitrich/xchangestream/core/StreamingTradeService.java @@ -1,14 +1,18 @@ package info.bitrich.xchangestream.core; import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Single; import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.dto.Order; import org.knowm.xchange.dto.account.OpenPosition; +import org.knowm.xchange.dto.trade.LimitOrder; +import org.knowm.xchange.dto.trade.MarketOrder; import org.knowm.xchange.dto.trade.UserTrade; import org.knowm.xchange.exceptions.ExchangeSecurityException; import org.knowm.xchange.exceptions.NotYetImplementedForExchangeException; import org.knowm.xchange.instrument.Instrument; import org.knowm.xchange.service.trade.TradeService; +import org.knowm.xchange.service.trade.params.CancelOrderParams; public interface StreamingTradeService { @@ -81,4 +85,50 @@ default Observable getUserTrades() { default Observable getPositionChanges(Instrument instrument) { throw new NotYetImplementedForExchangeException("getPositionChanges"); } + + /** + * Places a market order on the exchange. A market order is an order to buy or sell a financial instrument immediately at the best available current price. Specific exchanges may require additional + * optional parameters to process the market order. + * + * @param marketOrder The details of the market order, including the instrument, amount, and side (buy/sell). + * @param args Optional additional arguments that may be required by specific exchanges. + * @return A {@link Single} that emits the operation result code, typically 0 for success. + */ + default Single placeMarketOrder(MarketOrder marketOrder, Object... args) { + throw new NotYetImplementedForExchangeException("placeMarketOrder"); + } + + /** + * Places a limit order on the exchange. A limit order specifies the price at which the user wants to buy or sell a financial instrument, potentially with additional optional parameters. + * + * @param limitOrder The details of the limit order, including instrument, price, volume, and side (buy/sell). + * @param args Optional additional arguments that may be required by specific exchanges. + * @return A {@link Single} that emits the operation result code, typically 0 for success. + */ + default Single placeLimitOrder(LimitOrder limitOrder, Object... args) { + throw new NotYetImplementedForExchangeException("placeLimitOrder"); + } + + /** + * Modifies an existing limit order on the exchange. This can include changes to the price, volume, or other adjustable parameters of the order, depending on the exchange's specific functionality + * and constraints. + * + * @param order The limit order to be modified, including the updated details such as price or/and volume. + * @param args Optional additional arguments that may be required by specific exchanges. + * @return A {@link Single} that emits the operation result code, typically 0 for success. + */ + default Single changeOrder(LimitOrder order, Object... args) { + throw new NotYetImplementedForExchangeException("changeOrder"); + } + + /** + * Cancels an existing order on the exchange. The operation requires specific parameters describing the order to be canceled, such as order ID or user reference ID. + * + * @param params An object implementing {@link CancelOrderParams} that contains order ID or user reference ID. + * @param args Optional additional arguments that may be required by specific exchanges. + * @return A {@link Single} that emits the operation result code, typically 0 for success. + */ + default Single cancelOrder(CancelOrderParams params, Object... args) { + throw new NotYetImplementedForExchangeException("cancelOrder"); + } } diff --git a/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingTradeService.java b/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingTradeService.java index ce3c953334..0e31a9d8ce 100644 --- a/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingTradeService.java +++ b/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingTradeService.java @@ -108,7 +108,8 @@ public Observable getPositionChanges(Instrument instrument) { }); } - public Single placeLimitOrder(LimitOrder order) { + @Override + public Single placeLimitOrder(LimitOrder order, Object... args) { if (privateStreamingService.isLoginDone()) { Observable observable = privateStreamingService @@ -137,7 +138,8 @@ public Single placeLimitOrder(LimitOrder order) { } } - public Single placeMarketOrder(MarketOrder order) { + @Override + public Single placeMarketOrder(MarketOrder order, Object... args) { if (privateStreamingService.isLoginDone()) { Observable observable = privateStreamingService @@ -166,7 +168,8 @@ public Single placeMarketOrder(MarketOrder order) { } } - public Single changeOrder(LimitOrder order) { + @Override + public Single changeOrder(LimitOrder order, Object... args) { if (privateStreamingService.isLoginDone()) { Observable observable = privateStreamingService @@ -195,7 +198,8 @@ public Single changeOrder(LimitOrder order) { } } - public Single cancelOrder(CancelOrderParams params) { + @Override + public Single cancelOrder(CancelOrderParams params, Object... args) { if (privateStreamingService.isLoginDone()) { Observable observable = privateStreamingService diff --git a/xchange-stream-okex/src/test/java/info/bitrich/xchangestream/okex/OkexWebsocketTradeTest.java b/xchange-stream-okex/src/test/java/info/bitrich/xchangestream/okex/OkexWebsocketTradeTest.java index e5e55ab195..8ca511e996 100644 --- a/xchange-stream-okex/src/test/java/info/bitrich/xchangestream/okex/OkexWebsocketTradeTest.java +++ b/xchange-stream-okex/src/test/java/info/bitrich/xchangestream/okex/OkexWebsocketTradeTest.java @@ -4,6 +4,7 @@ import static org.knowm.xchange.dto.Order.OrderType.BID; import info.bitrich.xchangestream.core.StreamingExchange; +import info.bitrich.xchangestream.core.StreamingTradeService; import io.reactivex.rxjava3.disposables.Disposable; import java.io.IOException; import java.math.BigDecimal; @@ -90,8 +91,8 @@ public void websocketSpotTradeTest() throws IOException, InterruptedException { } private void tradeTest(Instrument instrument) throws IOException, InterruptedException { - OkexStreamingTradeService tradeService = - (OkexStreamingTradeService) exchange.getStreamingTradeService(); + StreamingTradeService tradeService = + exchange.getStreamingTradeService(); Ticker ticker = exchange.getMarketDataService().getTicker(instrument); BigDecimal minAmount = exchange.getExchangeMetaData().getInstruments().get(instrument).getMinimumAmount();