Created
September 3, 2021 01:32
-
-
Save gloriousCode/e209074b514dfffc8b3f14d325e8527e to your computer and use it in GitHub Desktop.
changes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/engine/order_manager.go b/engine/order_manager.go | |
index 859e32b3c..b00df09a7 100644 | |
--- a/engine/order_manager.go | |
+++ b/engine/order_manager.go | |
@@ -240,8 +240,8 @@ func (m *OrderManager) GetOrderInfo(exchangeName, orderID string, cp currency.Pa | |
return order.Detail{}, err | |
} | |
- err = m.orderStore.upsert(&result) | |
- if err != nil && err != ErrOrdersAlreadyExists { | |
+ _, err = m.orderStore.upsert(&result) | |
+ if err != nil { | |
return order.Detail{}, err | |
} | |
@@ -598,11 +598,11 @@ func (m *OrderManager) processOrders() { | |
filter := &order.Filter{ | |
Exchange: exchanges[i].GetName(), | |
} | |
- ordersToCheck, err := m.orderStore.getActiveOrders(filter) | |
- order.FilterOrdersByCurrencies(&ordersToCheck, pairs) | |
- checkedOrderIDs := make(map[string]bool, len(ordersToCheck)) | |
- for x := range ordersToCheck { | |
- checkedOrderIDs[ordersToCheck[x].InternalOrderID] = false | |
+ orders, err := m.orderStore.getActiveOrders(filter) | |
+ order.FilterOrdersByCurrencies(&orders, pairs) | |
+ requiresProcessing := make(map[string]bool, len(orders)) | |
+ for x := range orders { | |
+ requiresProcessing[orders[x].InternalOrderID] = true | |
} | |
req := order.GetOrdersRequest{ | |
@@ -620,73 +620,49 @@ func (m *OrderManager) processOrders() { | |
err) | |
continue | |
} | |
+ if len(orders) == 0 && len(result) == 0 { | |
+ continue | |
+ } | |
for z := range result { | |
- ord := &result[z] | |
- if !m.Exists(ord) { | |
- err = m.Add(ord) | |
- if err != nil { | |
- log.Errorf(log.OrderMgr, | |
- "Order manager: Exchange %s unable to add order ID=%v internal ID=%v pair=%v price=%.8f amount=%.8f side=%v type=%v: %s", | |
- ord.Exchange, ord.ID, ord.InternalOrderID, ord.Pair, ord.Price, ord.Amount, ord.Side, ord.Type, err) | |
- continue | |
- } | |
- msg := fmt.Sprintf("Order manager: Exchange %s added order ID=%v internal ID=%v pair=%v price=%.8f amount=%.8f side=%v type=%v.", | |
- ord.Exchange, ord.ID, ord.InternalOrderID, ord.Pair, ord.Price, ord.Amount, ord.Side, ord.Type) | |
- log.Debugf(log.OrderMgr, "%v", msg) | |
- m.orderStore.commsManager.PushEvent(base.Event{ | |
- Type: "order", | |
- Message: msg, | |
- }) | |
- checkedOrderIDs[ord.InternalOrderID] = true | |
- continue | |
- } else { | |
- err = m.UpdateExistingOrder(ord) | |
- if err != nil { | |
- log.Errorf(log.OrderMgr, | |
- "Order manager: Exchange %s unable to update order ID=%v internal ID=%v pair=%v price=%.8f amount=%.8f side=%v type=%v: %s", | |
- ord.Exchange, ord.ID, ord.InternalOrderID, ord.Pair, ord.Price, ord.Amount, ord.Side, ord.Type, err) | |
- continue | |
- } | |
- ord, _ = m.GetByExchangeAndID(ord.Exchange, ord.ID) | |
- checkedOrderIDs[ord.InternalOrderID] = true | |
- continue | |
+ err = m.UpsertOrder(&result[z]) | |
+ if err != nil { | |
+ log.Error(log.OrderMgr, err) | |
} | |
+ requiresProcessing[result[z].InternalOrderID] = false | |
} | |
if !exchanges[i].GetBase().GetSupportedFeatures().RESTCapabilities.GetOrder { | |
continue | |
} | |
- for x := range ordersToCheck { | |
- curTime := time.Now() | |
- // Only check if at least 1 minute not updated | |
- if curTime.Sub(ordersToCheck[x].LastUpdated) < time.Minute { | |
- checkedOrderIDs[ordersToCheck[x].InternalOrderID] = true | |
- } else if checkedOrderIDs[ordersToCheck[x].InternalOrderID] == false { | |
- log.Debugf(log.OrderMgr, "Order manager: Exchange %s Unchecked order ID=%v internal ID=%v pair=%v price=%.8f amount=%.8f side=%v type=%v", | |
- ordersToCheck[x].Exchange, ordersToCheck[x].ID, ordersToCheck[x].InternalOrderID, ordersToCheck[x].Pair, ordersToCheck[x].Price, | |
- ordersToCheck[x].Amount, ordersToCheck[x].Side, ordersToCheck[x].Type) | |
- go m.FetchAndUpdateExchangeOrder(exchanges[i], &ordersToCheck[x], supportedAssets[y], curTime) | |
- } | |
+ | |
+ go m.processMatchingOrders(exchanges[i], orders, requiresProcessing) | |
+ } | |
+ } | |
+} | |
+ | |
+func (m *OrderManager) processMatchingOrders(exch exchange.IBotExchange, orders []order.Detail, requiresProcessing map[string]bool) { | |
+ for x := range orders { | |
+ if time.Since(orders[x].LastUpdated) < time.Minute { | |
+ continue | |
+ } | |
+ if requiresProcessing[orders[x].InternalOrderID] { | |
+ err := m.FetchAndUpdateExchangeOrder(exch, &orders[x], orders[x].AssetType) | |
+ if err != nil { | |
+ log.Error(log.OrderMgr, err) | |
} | |
} | |
} | |
} | |
-func (m *OrderManager) FetchAndUpdateExchangeOrder(exch exchange.IBotExchange, ord *order.Detail, assetType asset.Item, curTime time.Time) { | |
+// FetchAndUpdateExchangeOrder calls the exchange to upsert an order to the order store | |
+func (m *OrderManager) FetchAndUpdateExchangeOrder(exch exchange.IBotExchange, ord *order.Detail, assetType asset.Item) error { | |
fetchedOrder, err := exch.GetOrderInfo(ord.ID, ord.Pair, assetType) | |
if err != nil { | |
- log.Errorf(log.OrderMgr, "Unable to get additional order info: %s", err.Error()) | |
ord.Status = order.UnknownStatus | |
- return | |
- } | |
- fetchedOrder.LastUpdated = curTime | |
- err = m.UpdateExistingOrder(&fetchedOrder) | |
- if err != nil { | |
- log.Errorf(log.OrderMgr, | |
- "Order manager: Exchange %s unable to update order ID=%v internal ID=%v pair=%v price=%.8f amount=%.8f side=%v type=%v: %s", | |
- ord.Exchange, ord.ID, ord.InternalOrderID, ord.Pair, ord.Price, ord.Amount, ord.Side, ord.Type, err) | |
- return | |
+ return err | |
} | |
+ fetchedOrder.LastUpdated = time.Now() | |
+ return m.UpsertOrder(&fetchedOrder) | |
} | |
// Exists checks whether an order exists in the order store | |
@@ -747,7 +723,34 @@ func (m *OrderManager) UpsertOrder(od *order.Detail) error { | |
if atomic.LoadInt32(&m.started) == 0 { | |
return fmt.Errorf("order manager %w", ErrSubSystemNotStarted) | |
} | |
- return m.orderStore.upsert(od) | |
+ if od == nil { | |
+ return errNilOrder | |
+ } | |
+ var msg string | |
+ defer func(message string) { | |
+ m.orderStore.commsManager.PushEvent(base.Event{ | |
+ Type: "order", | |
+ Message: message, | |
+ }) | |
+ }(msg) | |
+ | |
+ newOrder, err := m.orderStore.upsert(od) | |
+ if err != nil { | |
+ msg = fmt.Sprintf( | |
+ "Order manager: Exchange %s unable to upsert order ID=%v internal ID=%v pair=%v price=%.8f amount=%.8f side=%v type=%v: %s", | |
+ od.Exchange, od.ID, od.InternalOrderID, od.Pair, od.Price, od.Amount, od.Side, od.Type, err) | |
+ return err | |
+ } | |
+ | |
+ status := "updated" | |
+ if newOrder { | |
+ status = "added" | |
+ } | |
+ msg = fmt.Sprintf("Order manager: Exchange %s %s order ID=%v internal ID=%v pair=%v price=%.8f amount=%.8f side=%v type=%v.", | |
+ od.Exchange, status, od.ID, od.InternalOrderID, od.Pair, od.Price, od.Amount, od.Side, od.Type) | |
+ log.Infof(log.OrderMgr, "%s", msg) | |
+ | |
+ return nil | |
} | |
// get returns all orders for all exchanges | |
@@ -815,27 +818,30 @@ func (s *store) modifyExisting(id string, mod *order.Modify) error { | |
// upsert (1) checks if such an exchange exists in the exchangeManager, (2) checks if | |
// order exists and updates/creates it. | |
-func (s *store) upsert(od *order.Detail) error { | |
+func (s *store) upsert(od *order.Detail) (newOrder bool, err error) { | |
+ if od == nil { | |
+ return false, errNilOrder | |
+ } | |
lName := strings.ToLower(od.Exchange) | |
- _, err := s.exchangeManager.GetExchangeByName(lName) | |
+ _, err = s.exchangeManager.GetExchangeByName(lName) | |
if err != nil { | |
- return err | |
+ return false, err | |
} | |
s.m.Lock() | |
defer s.m.Unlock() | |
r, ok := s.Orders[lName] | |
if !ok { | |
s.Orders[lName] = []*order.Detail{od} | |
- return nil | |
+ return true, nil | |
} | |
for x := range r { | |
if r[x].ID == od.ID { | |
r[x].UpdateOrderFromDetail(od) | |
- return nil | |
+ return false, nil | |
} | |
} | |
s.Orders[lName] = append(s.Orders[lName], od) | |
- return nil | |
+ return true, nil | |
} | |
// getByExchange returns orders by exchange | |
@@ -956,35 +962,37 @@ func (s *store) getActiveOrders(f *order.Filter) ([]order.Detail, error) { | |
s.m.RLock() | |
defer s.m.RUnlock() | |
- var os []order.Detail | |
- if f == nil { | |
+ var orders []order.Detail | |
+ switch { | |
+ case f == nil: | |
for _, e := range s.Orders { | |
for i := range e { | |
if !e[i].IsActive() { | |
continue | |
} | |
- os = append(os, e[i].Copy()) | |
+ orders = append(orders, e[i].Copy()) | |
} | |
} | |
- } else if f.Exchange != "" { | |
+ case f.Exchange != "": | |
// optimization if Exchange is filtered | |
if e, ok := s.Orders[strings.ToLower(f.Exchange)]; ok { | |
for i := range e { | |
if !e[i].IsActive() || !e[i].MatchFilter(f) { | |
continue | |
} | |
- os = append(os, e[i].Copy()) | |
+ orders = append(orders, e[i].Copy()) | |
} | |
} | |
- } else { | |
+ default: | |
for _, e := range s.Orders { | |
for i := range e { | |
if !e[i].IsActive() || !e[i].MatchFilter(f) { | |
continue | |
} | |
- os = append(os, e[i].Copy()) | |
+ orders = append(orders, e[i].Copy()) | |
} | |
} | |
} | |
- return os, nil | |
+ | |
+ return orders, nil | |
} | |
diff --git a/engine/order_manager_types.go b/engine/order_manager_types.go | |
index 711a993fd..df2d32619 100644 | |
--- a/engine/order_manager_types.go | |
+++ b/engine/order_manager_types.go | |
@@ -22,6 +22,7 @@ var ( | |
errNilCommunicationsManager = errors.New("cannot start with nil communications manager") | |
// ErrOrderIDCannotBeEmpty occurs when an order does not have an ID | |
ErrOrderIDCannotBeEmpty = errors.New("orderID cannot be empty") | |
+ errNilOrder = errors.New("nil order received") | |
) | |
type orderManagerConfig struct { | |
diff --git a/exchanges/order/orders.go b/exchanges/order/orders.go | |
index 2ee6fbe29..f91fdc896 100644 | |
--- a/exchanges/order/orders.go | |
+++ b/exchanges/order/orders.go | |
@@ -419,9 +419,9 @@ func (d *Detail) IsActive() bool { | |
d.Status == AutoDeleverage || d.Status == Pending | |
} | |
-// IsActive returns true if an order has a status that indicates it is | |
+// IsInactive returns true if an order has a status that indicates it is | |
// currently not available on the exchange | |
-func (d *Detail) IsInActive() bool { | |
+func (d *Detail) IsInactive() bool { | |
if d.Amount > 0 && d.Amount == d.ExecutedAmount { | |
return true | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment