diff --git a/server/market/bookrouter.go b/server/market/bookrouter.go index c82b54b447..dbf9516003 100644 --- a/server/market/bookrouter.go +++ b/server/market/bookrouter.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "sync" + "sync/atomic" "decred.org/dcrdex/dex" "decred.org/dcrdex/dex/msgjson" @@ -150,7 +151,6 @@ type BookSource interface { type subscribers struct { mtx sync.RWMutex conns map[uint64]comms.Link - seq uint64 } // add adds a new subscriber. @@ -171,37 +171,33 @@ func (s *subscribers) remove(id uint64) bool { return true } -// nextSeq gets the next sequence number by incrementing the counter. This -// should be used when the book and orders are modified. Currently this applies -// to the routes: book_order, unbook_order, update_remaining, and epoch_order, -// plus suspend if the book is also being purged (persist=false). -func (s *subscribers) nextSeq() uint64 { - s.mtx.Lock() - defer s.mtx.Unlock() - s.seq++ - return s.seq -} - -// lastSeq gets the last retrieved sequence number. -func (s *subscribers) lastSeq() uint64 { - s.mtx.RLock() - defer s.mtx.RUnlock() - return s.seq -} - // msgBook is a local copy of the order book information. The orders are saved // as msgjson.BookOrderNote structures. type msgBook struct { - name string - // mtx guards orders and epochIdx - mtx sync.RWMutex - running bool + running atomic.Bool // whether msgBook is initialized + name string + subs *subscribers + source BookSource + baseID uint32 + quoteID uint32 + + // mtx ensures orders, epochIdx and seq are changed atomically with respect + // to each other. + mtx sync.RWMutex + // seq is tracking current(latest) order book version. See nextSeq for info + // on what version change actually means. + seq uint64 orders map[order.OrderID]*msgjson.BookOrderNote epochIdx int64 - subs *subscribers - source BookSource - baseID uint32 - quoteID uint32 +} + +// nextSeq gets the next sequence number by incrementing the latest book version +// counter. This should be used when the book and orders are modified. Currently, +// this applies to these routes: book_order, unbook_order, update_remaining, and +// epoch_order, plus suspend if the book is also being purged (persist=false). +func (book *msgBook) nextSeq() uint64 { + book.seq++ + return book.seq } func (book *msgBook) setEpoch(idx int64) { @@ -219,7 +215,7 @@ func (book *msgBook) epoch() int64 { // insert adds the information for a new order into the order book. If the order // is already found, it is inserted, but an error is logged since update should // be used in that case. -func (book *msgBook) insert(lo *order.LimitOrder) *msgjson.BookOrderNote { +func (book *msgBook) insert(lo *order.LimitOrder) (note *msgjson.BookOrderNote) { msgOrder := limitOrderToMsgOrder(lo, book.name) book.mtx.Lock() defer book.mtx.Unlock() @@ -229,6 +225,7 @@ func (book *msgBook) insert(lo *order.LimitOrder) *msgjson.BookOrderNote { //panic("bad insert") } book.orders[lo.ID()] = msgOrder + msgOrder.Seq = book.nextSeq() return msgOrder } @@ -245,14 +242,16 @@ func (book *msgBook) update(lo *order.LimitOrder) *msgjson.BookOrderNote { //panic("bad update") } book.orders[lo.ID()] = msgOrder + msgOrder.Seq = book.nextSeq() return msgOrder } // Remove the order from the order book. -func (book *msgBook) remove(lo *order.LimitOrder) { +func (book *msgBook) remove(lo *order.LimitOrder) (nextSeq uint64) { book.mtx.Lock() defer book.mtx.Unlock() delete(book.orders, lo.ID()) + return book.nextSeq() } // addBulkOrders adds the lists of orders to the order book, and records the @@ -261,6 +260,7 @@ func (book *msgBook) addBulkOrders(epoch int64, orderSets ...[]*order.LimitOrder book.mtx.Lock() defer book.mtx.Unlock() book.epochIdx = epoch + // book.seq starts with 0 here. for _, set := range orderSets { for _, lo := range set { book.orders[lo.ID()] = limitOrderToMsgOrder(lo, book.name) @@ -333,20 +333,17 @@ func (r *BookRouter) runBook(ctx context.Context, book *msgBook) { // Get the initial book. feed := book.source.OrderFeed() book.addBulkOrders(book.source.Book()) + book.running.Store(true) // can serve order book to clients now subs := book.subs defer func() { + book.running.Store(false) // can stop serving order book to clients now book.mtx.Lock() - book.running = false book.orders = make(map[order.OrderID]*msgjson.BookOrderNote) book.mtx.Unlock() log.Infof("Book router terminating for market %q", book.name) }() - book.mtx.Lock() - book.running = true - book.mtx.Unlock() - out: for { select { @@ -376,9 +373,7 @@ out: if !ok { panic("non-limit order received with bookAction") } - n := book.insert(lo) - n.Seq = subs.nextSeq() - note = n + note = book.insert(lo) case sigDataUnbookedOrder: route = msgjson.UnbookOrderRoute @@ -386,10 +381,10 @@ out: if !ok { panic("non-limit order received with unbookAction") } - book.remove(lo) + nextSeq := book.remove(lo) oid := sigData.order.ID() note = &msgjson.UnbookOrderNote{ - Seq: subs.nextSeq(), + Seq: nextSeq, MarketID: book.name, OrderID: oid[:], } @@ -405,7 +400,6 @@ out: OrderNote: bookNote.OrderNote, Remaining: lo.Remaining(), } - n.Seq = subs.nextSeq() note = n case sigDataEpochReport: @@ -449,7 +443,9 @@ out: epochNote.TargetID = o.TargetOrderID[:] } - epochNote.Seq = subs.nextSeq() + book.mtx.Lock() // book snapshot can be taken concurrently + epochNote.Seq = book.nextSeq() + book.mtx.Unlock() epochNote.MarketID = book.name epochNote.Epoch = uint64(sigData.epochIdx) c := sigData.order.Commitment() @@ -490,9 +486,9 @@ out: } // Only set Seq if there is a book update. if !sigData.persistBook { - susp.Seq = subs.nextSeq() // book purge - book.mtx.Lock() - book.orders = make(map[order.OrderID]*msgjson.BookOrderNote) + book.mtx.Lock() // book snapshot can be taken concurrently + book.seq = book.nextSeq() + book.orders = make(map[order.OrderID]*msgjson.BookOrderNote) // book purge book.mtx.Unlock() // The router is "running" although the market is suspended. } @@ -559,21 +555,26 @@ func (r *BookRouter) sendBook(conn comms.Link, book *msgBook, msgID uint64) { } } +// msgOrderBook returns current (latest) order book snapshot. func (r *BookRouter) msgOrderBook(book *msgBook) *msgjson.OrderBook { - book.mtx.RLock() // book.orders and book.running - if !book.running { - book.mtx.RUnlock() + // Don't want to block client requests for too long, so return fast if order book + // isn't initialized yet instead of proceeding to wait on mutex below for who + // knows how long. + if !book.running.Load() { return nil } + + book.mtx.RLock() + defer book.mtx.RUnlock() + ords := make([]*msgjson.BookOrderNote, 0, len(book.orders)) for _, o := range book.orders { ords = append(ords, o) } epochIdx := book.epochIdx // instead of book.epoch() while already locked - book.mtx.RUnlock() return &msgjson.OrderBook{ - Seq: book.subs.lastSeq(), + Seq: book.seq, MarketID: book.name, Epoch: uint64(epochIdx), Orders: ords,