Skip to content

Instantly share code, notes, and snippets.

@gloriousCode
Created September 3, 2021 01:32
Show Gist options
  • Save gloriousCode/e209074b514dfffc8b3f14d325e8527e to your computer and use it in GitHub Desktop.
Save gloriousCode/e209074b514dfffc8b3f14d325e8527e to your computer and use it in GitHub Desktop.
changes
diff --git a/engine/order_manager.go b/engine/order_manager.go
index 859e32b3c..b00df09a7 100644
--- a/engine/order_manager.go
+++ b/engine/order_manager.go
@@ -240,8 +240,8 @@ func (m *OrderManager) GetOrderInfo(exchangeName, orderID string, cp currency.Pa
return order.Detail{}, err
}
- err = m.orderStore.upsert(&result)
- if err != nil && err != ErrOrdersAlreadyExists {
+ _, err = m.orderStore.upsert(&result)
+ if err != nil {
return order.Detail{}, err
}
@@ -598,11 +598,11 @@ func (m *OrderManager) processOrders() {
filter := &order.Filter{
Exchange: exchanges[i].GetName(),
}
- ordersToCheck, err := m.orderStore.getActiveOrders(filter)
- order.FilterOrdersByCurrencies(&ordersToCheck, pairs)
- checkedOrderIDs := make(map[string]bool, len(ordersToCheck))
- for x := range ordersToCheck {
- checkedOrderIDs[ordersToCheck[x].InternalOrderID] = false
+ orders, err := m.orderStore.getActiveOrders(filter)
+ order.FilterOrdersByCurrencies(&orders, pairs)
+ requiresProcessing := make(map[string]bool, len(orders))
+ for x := range orders {
+ requiresProcessing[orders[x].InternalOrderID] = true
}
req := order.GetOrdersRequest{
@@ -620,73 +620,49 @@ func (m *OrderManager) processOrders() {
err)
continue
}
+ if len(orders) == 0 && len(result) == 0 {
+ continue
+ }
for z := range result {
- ord := &result[z]
- if !m.Exists(ord) {
- err = m.Add(ord)
- if err != nil {
- log.Errorf(log.OrderMgr,
- "Order manager: Exchange %s unable to add order ID=%v internal ID=%v pair=%v price=%.8f amount=%.8f side=%v type=%v: %s",
- ord.Exchange, ord.ID, ord.InternalOrderID, ord.Pair, ord.Price, ord.Amount, ord.Side, ord.Type, err)
- continue
- }
- msg := fmt.Sprintf("Order manager: Exchange %s added order ID=%v internal ID=%v pair=%v price=%.8f amount=%.8f side=%v type=%v.",
- ord.Exchange, ord.ID, ord.InternalOrderID, ord.Pair, ord.Price, ord.Amount, ord.Side, ord.Type)
- log.Debugf(log.OrderMgr, "%v", msg)
- m.orderStore.commsManager.PushEvent(base.Event{
- Type: "order",
- Message: msg,
- })
- checkedOrderIDs[ord.InternalOrderID] = true
- continue
- } else {
- err = m.UpdateExistingOrder(ord)
- if err != nil {
- log.Errorf(log.OrderMgr,
- "Order manager: Exchange %s unable to update order ID=%v internal ID=%v pair=%v price=%.8f amount=%.8f side=%v type=%v: %s",
- ord.Exchange, ord.ID, ord.InternalOrderID, ord.Pair, ord.Price, ord.Amount, ord.Side, ord.Type, err)
- continue
- }
- ord, _ = m.GetByExchangeAndID(ord.Exchange, ord.ID)
- checkedOrderIDs[ord.InternalOrderID] = true
- continue
+ err = m.UpsertOrder(&result[z])
+ if err != nil {
+ log.Error(log.OrderMgr, err)
}
+ requiresProcessing[result[z].InternalOrderID] = false
}
if !exchanges[i].GetBase().GetSupportedFeatures().RESTCapabilities.GetOrder {
continue
}
- for x := range ordersToCheck {
- curTime := time.Now()
- // Only check if at least 1 minute not updated
- if curTime.Sub(ordersToCheck[x].LastUpdated) < time.Minute {
- checkedOrderIDs[ordersToCheck[x].InternalOrderID] = true
- } else if checkedOrderIDs[ordersToCheck[x].InternalOrderID] == false {
- log.Debugf(log.OrderMgr, "Order manager: Exchange %s Unchecked order ID=%v internal ID=%v pair=%v price=%.8f amount=%.8f side=%v type=%v",
- ordersToCheck[x].Exchange, ordersToCheck[x].ID, ordersToCheck[x].InternalOrderID, ordersToCheck[x].Pair, ordersToCheck[x].Price,
- ordersToCheck[x].Amount, ordersToCheck[x].Side, ordersToCheck[x].Type)
- go m.FetchAndUpdateExchangeOrder(exchanges[i], &ordersToCheck[x], supportedAssets[y], curTime)
- }
+
+ go m.processMatchingOrders(exchanges[i], orders, requiresProcessing)
+ }
+ }
+}
+
+func (m *OrderManager) processMatchingOrders(exch exchange.IBotExchange, orders []order.Detail, requiresProcessing map[string]bool) {
+ for x := range orders {
+ if time.Since(orders[x].LastUpdated) < time.Minute {
+ continue
+ }
+ if requiresProcessing[orders[x].InternalOrderID] {
+ err := m.FetchAndUpdateExchangeOrder(exch, &orders[x], orders[x].AssetType)
+ if err != nil {
+ log.Error(log.OrderMgr, err)
}
}
}
}
-func (m *OrderManager) FetchAndUpdateExchangeOrder(exch exchange.IBotExchange, ord *order.Detail, assetType asset.Item, curTime time.Time) {
+// FetchAndUpdateExchangeOrder calls the exchange to upsert an order to the order store
+func (m *OrderManager) FetchAndUpdateExchangeOrder(exch exchange.IBotExchange, ord *order.Detail, assetType asset.Item) error {
fetchedOrder, err := exch.GetOrderInfo(ord.ID, ord.Pair, assetType)
if err != nil {
- log.Errorf(log.OrderMgr, "Unable to get additional order info: %s", err.Error())
ord.Status = order.UnknownStatus
- return
- }
- fetchedOrder.LastUpdated = curTime
- err = m.UpdateExistingOrder(&fetchedOrder)
- if err != nil {
- log.Errorf(log.OrderMgr,
- "Order manager: Exchange %s unable to update order ID=%v internal ID=%v pair=%v price=%.8f amount=%.8f side=%v type=%v: %s",
- ord.Exchange, ord.ID, ord.InternalOrderID, ord.Pair, ord.Price, ord.Amount, ord.Side, ord.Type, err)
- return
+ return err
}
+ fetchedOrder.LastUpdated = time.Now()
+ return m.UpsertOrder(&fetchedOrder)
}
// Exists checks whether an order exists in the order store
@@ -747,7 +723,34 @@ func (m *OrderManager) UpsertOrder(od *order.Detail) error {
if atomic.LoadInt32(&m.started) == 0 {
return fmt.Errorf("order manager %w", ErrSubSystemNotStarted)
}
- return m.orderStore.upsert(od)
+ if od == nil {
+ return errNilOrder
+ }
+ var msg string
+ defer func(message string) {
+ m.orderStore.commsManager.PushEvent(base.Event{
+ Type: "order",
+ Message: message,
+ })
+ }(msg)
+
+ newOrder, err := m.orderStore.upsert(od)
+ if err != nil {
+ msg = fmt.Sprintf(
+ "Order manager: Exchange %s unable to upsert order ID=%v internal ID=%v pair=%v price=%.8f amount=%.8f side=%v type=%v: %s",
+ od.Exchange, od.ID, od.InternalOrderID, od.Pair, od.Price, od.Amount, od.Side, od.Type, err)
+ return err
+ }
+
+ status := "updated"
+ if newOrder {
+ status = "added"
+ }
+ msg = fmt.Sprintf("Order manager: Exchange %s %s order ID=%v internal ID=%v pair=%v price=%.8f amount=%.8f side=%v type=%v.",
+ od.Exchange, status, od.ID, od.InternalOrderID, od.Pair, od.Price, od.Amount, od.Side, od.Type)
+ log.Infof(log.OrderMgr, "%s", msg)
+
+ return nil
}
// get returns all orders for all exchanges
@@ -815,27 +818,30 @@ func (s *store) modifyExisting(id string, mod *order.Modify) error {
// upsert (1) checks if such an exchange exists in the exchangeManager, (2) checks if
// order exists and updates/creates it.
-func (s *store) upsert(od *order.Detail) error {
+func (s *store) upsert(od *order.Detail) (newOrder bool, err error) {
+ if od == nil {
+ return false, errNilOrder
+ }
lName := strings.ToLower(od.Exchange)
- _, err := s.exchangeManager.GetExchangeByName(lName)
+ _, err = s.exchangeManager.GetExchangeByName(lName)
if err != nil {
- return err
+ return false, err
}
s.m.Lock()
defer s.m.Unlock()
r, ok := s.Orders[lName]
if !ok {
s.Orders[lName] = []*order.Detail{od}
- return nil
+ return true, nil
}
for x := range r {
if r[x].ID == od.ID {
r[x].UpdateOrderFromDetail(od)
- return nil
+ return false, nil
}
}
s.Orders[lName] = append(s.Orders[lName], od)
- return nil
+ return true, nil
}
// getByExchange returns orders by exchange
@@ -956,35 +962,37 @@ func (s *store) getActiveOrders(f *order.Filter) ([]order.Detail, error) {
s.m.RLock()
defer s.m.RUnlock()
- var os []order.Detail
- if f == nil {
+ var orders []order.Detail
+ switch {
+ case f == nil:
for _, e := range s.Orders {
for i := range e {
if !e[i].IsActive() {
continue
}
- os = append(os, e[i].Copy())
+ orders = append(orders, e[i].Copy())
}
}
- } else if f.Exchange != "" {
+ case f.Exchange != "":
// optimization if Exchange is filtered
if e, ok := s.Orders[strings.ToLower(f.Exchange)]; ok {
for i := range e {
if !e[i].IsActive() || !e[i].MatchFilter(f) {
continue
}
- os = append(os, e[i].Copy())
+ orders = append(orders, e[i].Copy())
}
}
- } else {
+ default:
for _, e := range s.Orders {
for i := range e {
if !e[i].IsActive() || !e[i].MatchFilter(f) {
continue
}
- os = append(os, e[i].Copy())
+ orders = append(orders, e[i].Copy())
}
}
}
- return os, nil
+
+ return orders, nil
}
diff --git a/engine/order_manager_types.go b/engine/order_manager_types.go
index 711a993fd..df2d32619 100644
--- a/engine/order_manager_types.go
+++ b/engine/order_manager_types.go
@@ -22,6 +22,7 @@ var (
errNilCommunicationsManager = errors.New("cannot start with nil communications manager")
// ErrOrderIDCannotBeEmpty occurs when an order does not have an ID
ErrOrderIDCannotBeEmpty = errors.New("orderID cannot be empty")
+ errNilOrder = errors.New("nil order received")
)
type orderManagerConfig struct {
diff --git a/exchanges/order/orders.go b/exchanges/order/orders.go
index 2ee6fbe29..f91fdc896 100644
--- a/exchanges/order/orders.go
+++ b/exchanges/order/orders.go
@@ -419,9 +419,9 @@ func (d *Detail) IsActive() bool {
d.Status == AutoDeleverage || d.Status == Pending
}
-// IsActive returns true if an order has a status that indicates it is
+// IsInactive returns true if an order has a status that indicates it is
// currently not available on the exchange
-func (d *Detail) IsInActive() bool {
+func (d *Detail) IsInactive() bool {
if d.Amount > 0 && d.Amount == d.ExecutedAmount {
return true
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment