Skip to content
This repository has been archived by the owner on May 25, 2020. It is now read-only.

Commit

Permalink
Handle multiple orders/trades in same tx
Browse files Browse the repository at this point in the history
  • Loading branch information
thanhnguyennguyen committed Nov 11, 2019
1 parent 66cf894 commit b2bbacc
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 8 deletions.
2 changes: 1 addition & 1 deletion consensus/posv/posv.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 69,7 @@ type TomoXService interface {
GetTriegc() *prque.Prque
ApplyOrder(coinbase common.Address, chain consensus.ChainContext, statedb *state.StateDB, tomoXstatedb *tomox_state.TomoXStateDB, orderBook common.Hash, order *tomox_state.OrderItem) ([]map[string]string, []*tomox_state.OrderItem, error)
IsSDKNode() bool
SyncDataToSDKNode(txDataMatch tomox_state.TxDataMatch, txHash common.Hash, txMatchTime time.Time, statedb *state.StateDB) error
SyncDataToSDKNode(txDataMatch tomox_state.TxDataMatch, txHash common.Hash, txMatchTime time.Time, statedb *state.StateDB,dirtyOrderCount *uint64) error
RollbackReorgTxMatch(txhash common.Hash)
}

Expand Down
4 changes: 3 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2261,14 2261,16 @@ func (bc *BlockChain) logExchangeData(block *types.Block) {
// That's why we should put this log statement in an anonymous function
log.Debug("logExchangeData takes", "time", common.PrettyDuration(time.Since(start)), "blockNumber", block.NumberU64())
}()

for _, txMatchBatch := range txMatchBatchData {
dirtyOrderCount := uint64(0)
for _, txMatch := range txMatchBatch.Data {
// the smallest time unit in mongodb is millisecond
// hence, we should update time in millisecond
// old txData has been attached with nanosecond, to avoid hard fork, convert nanosecond to millisecond here
milliSecond := txMatchBatch.Timestamp / 1e6
txMatchTime := time.Unix(0, milliSecond * 1e6).UTC()
if err := tomoXService.SyncDataToSDKNode(txMatch, txMatchBatch.TxHash, txMatchTime, currentState); err != nil {
if err := tomoXService.SyncDataToSDKNode(txMatch, txMatchBatch.TxHash, txMatchTime, currentState, &dirtyOrderCount); err != nil {
log.Error("failed to SyncDataToSDKNode ", "blockNumber", block.Number(), "err", err)
return
}
Expand Down
16 changes: 10 additions & 6 deletions tomox/tomox.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 255,7 @@ func (tomox *TomoX) ProcessOrderPending(coinbase common.Address, chain consensus
// 2. txMatchData.Trades: includes information of matched orders.
// a. PutObject them to `trades` collection
// b. Update status of regrading orders to sdktypes.OrderStatusFilled
func (tomox *TomoX) SyncDataToSDKNode(txDataMatch tomox_state.TxDataMatch, txHash common.Hash, txMatchTime time.Time, statedb *state.StateDB) error {
func (tomox *TomoX) SyncDataToSDKNode(txDataMatch tomox_state.TxDataMatch, txHash common.Hash, txMatchTime time.Time, statedb *state.StateDB, dirtyOrderCount *uint64) error {
var (
// originTakerOrder: order get from db, nil if it doesn't exist
// takerOrderInTx: order decoded from txdata
Expand Down Expand Up @@ -298,10 298,12 @@ func (tomox *TomoX) SyncDataToSDKNode(txDataMatch tomox_state.TxDataMatch, txHas
if updatedTakerOrder.CreatedAt.IsZero() {
updatedTakerOrder.CreatedAt = txMatchTime
}
if !txMatchTime.After(updatedTakerOrder.UpdatedAt) {
log.Debug("Ignore old orders/trades", "txHash", txHash.Hex(), "txTime", txMatchTime)
if txMatchTime.Before(updatedTakerOrder.UpdatedAt) || (txMatchTime.Equal(updatedTakerOrder.UpdatedAt) && *dirtyOrderCount == 0) {
log.Debug("Ignore old orders/trades taker", "txHash", txHash.Hex(), "txTime", txMatchTime.UnixNano(), "updatedAt", updatedTakerOrder.UpdatedAt.UnixNano())
return nil
}
*dirtyOrderCount

tomox.UpdateOrderCache(updatedTakerOrder.BaseToken, updatedTakerOrder.QuoteToken, updatedTakerOrder.OrderID, txHash, lastState)
updatedTakerOrder.UpdatedAt = txMatchTime

Expand Down Expand Up @@ -386,7 388,8 @@ func (tomox *TomoX) SyncDataToSDKNode(txDataMatch tomox_state.TxDataMatch, txHas
makerOrders := db.GetListOrderByHashes(makerDirtyHashes)
log.Debug("Maker dirty orders", "len", len(makerOrders), "txhash", txHash.Hex())
for _, o := range makerOrders {
if !txMatchTime.After(o.UpdatedAt) {
if txMatchTime.Before(o.UpdatedAt) {
log.Debug("Ignore old orders/trades maker", "txHash", txHash.Hex(), "txTime", txMatchTime.UnixNano(), "updatedAt", updatedTakerOrder.UpdatedAt.UnixNano())
continue
}
lastState = tomox_state.OrderHistoryItem{
Expand Down Expand Up @@ -421,7 424,7 @@ func (tomox *TomoX) SyncDataToSDKNode(txDataMatch tomox_state.TxDataMatch, txHas
// updateRejectedOrders
for _, rejectedOrder := range rejectedOrders {
rejectedHashes = append(rejectedHashes, rejectedOrder.Hash.Hex())
if updatedTakerOrder.Hash == rejectedOrder.Hash && txMatchTime.After(updatedTakerOrder.UpdatedAt) {
if updatedTakerOrder.Hash == rejectedOrder.Hash && !txMatchTime.Before(updatedTakerOrder.UpdatedAt) {
// cache order history for handling reorg
orderHistoryRecord := tomox_state.OrderHistoryItem{
TxHash: updatedTakerOrder.TxHash,
Expand All @@ -438,7 441,8 @@ func (tomox *TomoX) SyncDataToSDKNode(txDataMatch tomox_state.TxDataMatch, txHas
}
dirtyRejectedOrders := db.GetListOrderByHashes(rejectedHashes)
for _, order := range dirtyRejectedOrders {
if !txMatchTime.After(order.UpdatedAt) {
if txMatchTime.Before(order.UpdatedAt) {
log.Debug("Ignore old orders/trades reject", "txHash", txHash.Hex(), "txTime", txMatchTime.UnixNano(), "updatedAt", updatedTakerOrder.UpdatedAt.UnixNano())
continue
}
// cache order history for handling reorg
Expand Down

0 comments on commit b2bbacc

Please sign in to comment.