Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Improvements for SubscribeTransactionStatuses statuses handling #6736

Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
2e8d95b
change tx result retrieving
Guitarheroua Nov 15, 2024
de3851a
Fixed unit tests
Guitarheroua Nov 19, 2024
dfb40cf
Merge branch 'master' into AndriiSlisarchuk/6574-improve-tx-sub-statuses
Guitarheroua Nov 19, 2024
edb0636
fixed comment
Guitarheroua Nov 19, 2024
23a2214
simplify unit test
Guitarheroua Nov 19, 2024
e3a43a0
remove second block extraction
Guitarheroua Nov 19, 2024
ef8136d
removed unnecessary comment
Guitarheroua Nov 19, 2024
543fb9e
added missing documentation
Guitarheroua Nov 19, 2024
9ee3d6a
Merge branch 'master' into AndriiSlisarchuk/6574-improve-tx-sub-statuses
Guitarheroua Nov 20, 2024
b49bb7c
Merge branch 'master' into AndriiSlisarchuk/6574-improve-tx-sub-statuses
Guitarheroua Dec 2, 2024
2baa1a3
Merge branch 'master' into AndriiSlisarchuk/6574-improve-tx-sub-statuses
Guitarheroua Dec 4, 2024
8c0c247
Fixed remark
Guitarheroua Dec 4, 2024
2ecd739
Merge branch 'master' into AndriiSlisarchuk/6574-improve-tx-sub-statuses
Guitarheroua Dec 5, 2024
8f0e956
Merge branch 'master' into AndriiSlisarchuk/6574-improve-tx-sub-statuses
Guitarheroua Dec 12, 2024
35d3da9
Merge branch 'master' into AndriiSlisarchuk/6574-improve-tx-sub-statuses
Guitarheroua Dec 17, 2024
86e6096
Merge branch 'master' into AndriiSlisarchuk/6574-improve-tx-sub-statuses
peterargue Dec 17, 2024
5ba75c8
Merge branch 'master' into AndriiSlisarchuk/6574-improve-tx-sub-statuses
Guitarheroua Dec 19, 2024
2a1f2f8
Merge branch 'master' into AndriiSlisarchuk/6574-improve-tx-sub-statuses
Guitarheroua Dec 27, 2024
40a3323
Merge branch 'master' into AndriiSlisarchuk/6574-improve-tx-sub-statuses
Guitarheroua Dec 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 18 additions & 21 deletions engine/access/rpc/backend/backend_stream_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type backendSubscribeTransactions struct {
type TransactionSubscriptionMetadata struct {
*access.TransactionResult
txReferenceBlockID flow.Identifier
blockWithTx *flow.Header
blockWithTx *flow.Block
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have checked underlying code path and couldn"t find a place why we essentially need a flow.Block. My suggestion here would be to stick to flow.Header and actually refactor usages of this field to accept flow.Header instead of flow.Block. I think that we should carry only the minimal needed information since extra info increases mental complexity and makes the APIs more convoluted and less intuitive.
Suppose I see a method signature:

GetTransactionResultFromExecutionNode(ctx context.Context,
	block *flow.Block,
	transactionID flow.Identifier,
	requiredEventEncodingVersion entities.EventEncodingVersion)

when I see such signature I assume that it does something with the block payload since it passes a block rather than a header and the difference between block and header is exactly the payload, in fact it is misleading since it utilizes only fields of flow.Header. To avoid such misleading APIs I would propose to make a refactoring to clean this.

txExecuted bool
eventEncodingVersion entities.EventEncodingVersion
}
Expand Down Expand Up @@ -126,7 +126,7 @@ func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *Tran
// When a block with the transaction is available, it is possible to receive a new transaction status while
// searching for the transaction result. Otherwise, it remains unchanged. So, if the old and new transaction
// statuses are the same, the current transaction status should be retrieved.
txInfo.Status, err = b.txLocalDataProvider.DeriveTransactionStatus(txInfo.blockWithTx.Height, txInfo.txExecuted)
txInfo.Status, err = b.txLocalDataProvider.DeriveTransactionStatus(txInfo.BlockHeight, txInfo.txExecuted)
}
if err != nil {
if !errors.Is(err, state.ErrUnknownSnapshotReference) {
Expand Down Expand Up @@ -229,7 +229,7 @@ func (b *backendSubscribeTransactions) checkBlockReady(height uint64) error {
func (b *backendSubscribeTransactions) searchForTransactionBlockInfo(
height uint64,
txInfo *TransactionSubscriptionMetadata,
) (*flow.Header, flow.Identifier, uint64, flow.Identifier, error) {
) (*flow.Block, flow.Identifier, uint64, flow.Identifier, error) {
block, err := b.txLocalDataProvider.blocks.ByHeight(height)
if err != nil {
return nil, flow.ZeroID, 0, flow.ZeroID, fmt.Errorf("error looking up block: %w", err)
Expand All @@ -241,37 +241,34 @@ func (b *backendSubscribeTransactions) searchForTransactionBlockInfo(
}

if collectionID != flow.ZeroID {
return block.Header, block.ID(), height, collectionID, nil
return block, block.ID(), height, collectionID, nil
}

return nil, flow.ZeroID, 0, flow.ZeroID, nil
}

// searchForTransactionResult searches for the transaction result of a block. It retrieves the execution result for the specified block ID.
// Expected errors:
// - codes.Internal if an internal error occurs while retrieving execution result.
// searchForTransactionResult searches for the transaction result of a block. It retrieves the transaction result from
// storage and, in case of failure, attempts to fetch the transaction result directly from the execution node.
// This is necessary to ensure data availability despite sync storage latency.
//
// No errors expected during normal operations.
func (b *backendSubscribeTransactions) searchForTransactionResult(
ctx context.Context,
txInfo *TransactionSubscriptionMetadata,
) (*access.TransactionResult, error) {
_, err := b.executionResults.ByBlockID(txInfo.BlockID)
txResult, err := b.backendTransactions.GetTransactionResultFromStorage(ctx, txInfo.blockWithTx, txInfo.TransactionID, txInfo.eventEncodingVersion)
peterargue marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
Guitarheroua marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at this point you can receive multiple sentinel errors and I assume an exception as well. We are basically sweeping all errors under one if statement but I would suggest to explicitly filter if it"s an expected error. Otherwise we might get into a situation where a node observes a critical failure(lets say a storage failure where the badger DB is corrupted) but we are still trying to operate on best-effort scenario which is unacceptable.

if errors.Is(err, storage.ErrNotFound) {
return nil, nil
}
return nil, fmt.Errorf("failed to get execution result for block %s: %w", txInfo.BlockID, err)
// If any error occurs with local storage - request transaction result from EN
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as I said in previously comment, "any" error is a bit to extreme

txResult, err = b.backendTransactions.GetTransactionResultFromExecutionNode(
ctx,
txInfo.blockWithTx,
txInfo.TransactionID,
txInfo.eventEncodingVersion,
)
}

txResult, err := b.backendTransactions.GetTransactionResult(
ctx,
txInfo.TransactionID,
txInfo.BlockID,
txInfo.CollectionID,
txInfo.eventEncodingVersion,
)

if err != nil {
// if either the storage or execution node reported no results or there were not enough execution results
// if either the storage or execution node reported no results
if status.Code(err) == codes.NotFound {
// No result yet, indicate that it has not been executed
return nil, nil
Expand Down
59 changes: 39 additions & 20 deletions engine/access/rpc/backend/backend_stream_transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"testing"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/dgraph-io/badger/v2"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
Expand All @@ -22,6 +25,7 @@ import (
connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock"
"github.com/onflow/flow-go/engine/access/subscription"
subscriptionmock "github.com/onflow/flow-go/engine/access/subscription/mock"
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
Expand All @@ -30,6 +34,7 @@ import (
syncmock "github.com/onflow/flow-go/module/state_synchronization/mock"
protocolint "github.com/onflow/flow-go/state/protocol"
protocol "github.com/onflow/flow-go/state/protocol/mock"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
storagemock "github.com/onflow/flow-go/storage/mock"
"github.com/onflow/flow-go/utils/unittest"
Expand Down Expand Up @@ -98,9 +103,6 @@ func (s *TransactionStatusSuite) SetupTest() {
s.tempSnapshot = &protocol.Snapshot{}
s.db, s.dbDir = unittest.TempBadgerDB(s.T())

params := protocol.NewParams(s.T())
s.state.On("Params").Return(params)

s.blocks = storagemock.NewBlocks(s.T())
s.headers = storagemock.NewHeaders(s.T())
s.transactions = storagemock.NewTransactions(s.T())
Expand All @@ -121,11 +123,24 @@ func (s *TransactionStatusSuite) SetupTest() {
s.blockTracker = subscriptionmock.NewBlockTracker(s.T())
s.resultsMap = map[flow.Identifier]*flow.ExecutionResult{}

s.execClient.On("GetTransactionResult", mock.Anything, mock.Anything).Return(nil, status.Error(codes.NotFound, "not found")).Maybe()
s.connectionFactory.On("GetExecutionAPIClient", mock.Anything).Return(s.execClient, &mocks.MockCloser{}, nil).Maybe()

// generate blockCount consecutive blocks with associated seal, result and execution data
s.rootBlock = unittest.BlockFixture()
rootResult := unittest.ExecutionResultFixture(unittest.WithBlock(&s.rootBlock))
s.resultsMap[s.rootBlock.ID()] = rootResult

params := protocol.NewParams(s.T())
params.On("FinalizedRoot").Return(s.rootBlock.Header).Maybe()
s.state.On("Params").Return(params).Maybe()

var receipts flow.ExecutionReceiptList
executionNodes := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution))
receipts = unittest.ReceiptsForBlockFixture(&s.rootBlock, executionNodes.NodeIDs())
s.receipts.On("ByBlockID", mock.AnythingOfType("flow.Identifier")).Return(receipts, nil).Maybe()
s.finalSnapshot.On("Identities", mock.Anything).Return(executionNodes, nil).Maybe()

var err error
s.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(s.db, module.ConsumeProgressLastFullBlockHeight),
Expand Down Expand Up @@ -219,6 +234,14 @@ func (s *TransactionStatusSuite) backendParams() Params {
TxResultQueryMode: IndexQueryModeLocalOnly,
EventsIndex: index.NewEventsIndex(s.indexReporter, s.events),
LastFullBlockHeight: s.lastFullBlockHeight,
ExecNodeIdentitiesProvider: commonrpc.NewExecutionNodeIdentitiesProvider(
s.log,
s.state,
s.receipts,
nil,
nil,
),
ConnFactory: s.connectionFactory,
}
}

Expand Down Expand Up @@ -246,36 +269,20 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusHappyCase() {
finalizedHeader := s.finalizedBlock.Header
return finalizedHeader.Height, nil
}, nil)
s.blocks.On("ByID", mock.AnythingOfType("flow.Identifier")).Return(func(blockID flow.Identifier) (*flow.Block, error) {
for _, block := range s.blockMap {
if block.ID() == blockID {
return block, nil
}
}

return nil, nil
}, nil)
s.sealedSnapshot.On("Head").Return(func() *flow.Header {
return s.sealedBlock.Header
}, nil)
s.state.On("Sealed").Return(s.sealedSnapshot, nil)
s.results.On("ByBlockID", mock.AnythingOfType("flow.Identifier")).Return(mocks.StorageMapGetter(s.resultsMap))

// Generate sent transaction with ref block of the current finalized block
transaction := unittest.TransactionFixture()
transaction.SetReferenceBlockID(s.finalizedBlock.ID())
s.transactions.On("ByID", mock.AnythingOfType("flow.Identifier")).Return(&transaction.TransactionBody, nil)

col := flow.CollectionFromTransactions([]*flow.Transaction{&transaction})
guarantee := col.Guarantee()
light := col.Light()
txId := transaction.ID()
txResult := flow.LightTransactionResult{
TransactionID: txId,
Failed: false,
ComputationUsed: 0,
}

eventsForTx := unittest.EventsFixture(1, flow.EventAccountCreated)
eventMessages := make([]*entities.Event, 1)
for j, event := range eventsForTx {
Expand All @@ -288,11 +295,21 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusHappyCase() {
mock.AnythingOfType("flow.Identifier"),
).Return(eventsForTx, nil)

hasTransactionResultInStorage := false
s.transactionResults.On(
"ByBlockIDTransactionID",
mock.AnythingOfType("flow.Identifier"),
mock.AnythingOfType("flow.Identifier"),
).Return(&txResult, nil)
).Return(func(blockID flow.Identifier, transactionID flow.Identifier) (*flow.LightTransactionResult, error) {
if hasTransactionResultInStorage {
return &flow.LightTransactionResult{
TransactionID: txId,
Failed: false,
ComputationUsed: 0,
}, nil
}
return nil, storage.ErrNotFound
}).Twice()

// Create a special common function to read subscription messages from the channel and check converting it to transaction info
// and check results for correctness
Expand Down Expand Up @@ -328,6 +345,8 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusHappyCase() {
// 3. Add one more finalized block on top of the transaction block and add execution results to storage
finalizedResult := unittest.ExecutionResultFixture(unittest.WithBlock(s.finalizedBlock))
s.resultsMap[s.finalizedBlock.ID()] = finalizedResult
// init transaction result for storage
hasTransactionResultInStorage = true
s.addNewFinalizedBlock(s.finalizedBlock.Header, true)
checkNewSubscriptionMessage(sub, flow.TransactionStatusExecuted)

Expand Down
12 changes: 9 additions & 3 deletions engine/access/rpc/backend/backend_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,14 +651,14 @@ func (b *backendTransactions) lookupTransactionResult(
var err error
switch b.txResultQueryMode {
case IndexQueryModeExecutionNodesOnly:
txResult, err = b.getTransactionResultFromExecutionNode(ctx, block, txID, requiredEventEncodingVersion)
txResult, err = b.GetTransactionResultFromExecutionNode(ctx, block, txID, requiredEventEncodingVersion)
case IndexQueryModeLocalOnly:
txResult, err = b.GetTransactionResultFromStorage(ctx, block, txID, requiredEventEncodingVersion)
case IndexQueryModeFailover:
txResult, err = b.GetTransactionResultFromStorage(ctx, block, txID, requiredEventEncodingVersion)
if err != nil {
// If any error occurs with local storage - request transaction result from EN
txResult, err = b.getTransactionResultFromExecutionNode(ctx, block, txID, requiredEventEncodingVersion)
txResult, err = b.GetTransactionResultFromExecutionNode(ctx, block, txID, requiredEventEncodingVersion)
}
default:
return nil, status.Errorf(codes.Internal, "unknown transaction result query mode: %v", b.txResultQueryMode)
Expand Down Expand Up @@ -742,7 +742,13 @@ func (b *backendTransactions) registerTransactionForRetry(tx *flow.TransactionBo
b.retry.RegisterTransaction(referenceBlock.Height, tx)
}

func (b *backendTransactions) getTransactionResultFromExecutionNode(
// GetTransactionResultFromExecutionNode retrieves the result of a specified transaction
// from the execution nodes for a given block.
//
// Error returns:
// - `codes.NotFound` if no execution receipt were found.
// - Returns internal errors if event conversion or status derivation fails.
func (b *backendTransactions) GetTransactionResultFromExecutionNode(
ctx context.Context,
block *flow.Block,
transactionID flow.Identifier,
Expand Down
Loading