Skip to content

Commit

Permalink
feat+cleanup: cleaner task handling for relayer dispatch and snapshot…
Browse files Browse the repository at this point in the history
… finalization
  • Loading branch information
anomit committed Nov 2, 2023
1 parent 858e8ba commit e830b95
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 130 deletions.
2 changes: 1 addition & 1 deletion go/caching/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type DbCache interface {
GetStoredProjects(ctx context.Context) ([]string, error)
CheckIfProjectExists(ctx context.Context, projectID string) (bool, error)
StoreProjects(background context.Context, projects []string) error
AddSnapshotterStatusReport(ctx context.Context, epochId int, projectId string, report *datamodel.SnapshotterStatusReport, incrCount bool) error
AddSnapshotterStatusReport(ctx context.Context, epochId int, projectId string, report *datamodel.SnapshotterStatusReport) error
StoreLastFinalizedEpoch(ctx context.Context, projectID string, epochId int) error
StoreFinalizedSnapshot(ctx context.Context, msg *datamodel.PowerloomSnapshotFinalizedMessage) error
GetFinalizedSnapshotAtEpochID(ctx context.Context, projectID string, epochId int) (string, error)
Expand Down
74 changes: 28 additions & 46 deletions go/caching/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,22 @@ func (r *RedisCache) GetUnfinalizedSnapshotAtEpochID(ctx context.Context, projec
key := fmt.Sprintf(redisutils.REDIS_KEY_PROJECT_UNFINALIZED_SNAPSHOT_CIDS, projectID)
height := strconv.Itoa(epochId)

log.WithField("projectID", projectID).
l := log.WithField("projectID", projectID).
WithField("epochId", epochId).
WithField("key", key).
Debug("getting snapshotCid from redis at given epochId from the given projectId")
WithField("key", key)
l.Debug("getting snapshotCid from redis at given epochId from the given projectId")

res, err := r.readClient.ZRangeByScoreWithScores(ctx, key, &redis.ZRangeBy{
Min: height,
Max: height,
}).Result()
l.WithField("redis result ", res).Debug("Got result from redis")
if err != nil {
if errors.Is(err, redis.Nil) {
return nil, nil
}

log.Error("could not Get snapshot cid from redis error: ", err)
l.Error("could not Get snapshot cid from redis error: ", err)

return nil, err
}
Expand All @@ -91,17 +92,17 @@ func (r *RedisCache) GetUnfinalizedSnapshotAtEpochID(ctx context.Context, projec

val, ok := res[0].Member.(string)
if !ok {
log.Error("CRITICAL: could not convert snapshot cid data stored in redis to string")
l.Error("CRITICAL: could not convert snapshot cid data stored in redis to string")
}

err = json.Unmarshal([]byte(val), p)
if err != nil {
log.WithError(err).Error("CRITICAL: could not unmarshal snapshot cid data stored in redis")
l.WithError(err).Error("CRITICAL: could not unmarshal snapshot cid data stored in redis")

return nil, err
}

log.WithField("snapshotCid", p.SnapshotCID).WithField("epochId", epochId).Debug("got snapshot at given epochId")
l.WithField("snapshotCid", p.SnapshotCID).WithField("epochId", epochId).Debug("got snapshot at given epochId")

return p, nil
}
Expand Down Expand Up @@ -156,29 +157,9 @@ func (r *RedisCache) StoreProjects(background context.Context, projects []string
}

// AddSnapshotterStatusReport adds the snapshotter's status report to the given project and epoch ID.
func (r *RedisCache) AddSnapshotterStatusReport(ctx context.Context, epochId int, projectId string, report *datamodel.SnapshotterStatusReport, incrCount bool) error {
func (r *RedisCache) AddSnapshotterStatusReport(ctx context.Context, epochId int, projectId string, report *datamodel.SnapshotterStatusReport) error {
key := fmt.Sprintf(redisutils.REDIS_KEY_SNAPSHOTTER_STATUS_REPORT, projectId)

storedReport := new(datamodel.SnapshotterStatusReport)

reportJsonString, err := r.readClient.HGet(ctx, key, strconv.Itoa(epochId)).Result()
if err == nil || reportJsonString != "" {
_ = json.Unmarshal([]byte(reportJsonString), storedReport)
}

if report != nil {
if storedReport.SubmittedSnapshotCid != "" {
report.SubmittedSnapshotCid = storedReport.SubmittedSnapshotCid
}

if storedReport.Reason != "" {
report.Reason = storedReport.Reason
}

if storedReport.FinalizedSnapshotCid != "" {
report.FinalizedSnapshotCid = storedReport.FinalizedSnapshotCid
}

reportJson, err := json.Marshal(report)
if err != nil {
log.WithError(err).Error("failed to marshal snapshotter status report")
Expand All @@ -193,21 +174,21 @@ func (r *RedisCache) AddSnapshotterStatusReport(ctx context.Context, epochId int
return err
}

switch {
case report.State == datamodel.MissedSnapshotSubmission:
key = fmt.Sprintf(redisutils.REDIS_KEY_TOTAL_MISSED_SNAPSHOT_COUNT, projectId)
case report.State == datamodel.IncorrectSnapshotSubmission:
key = fmt.Sprintf(redisutils.REDIS_KEY_TOTAL_INCORRECT_SNAPSHOT_COUNT, projectId)
incrKey := ""
if report.State == datamodel.SuccessfulSnapshotSubmission {
incrKey = redisutils.REDIS_KEY_TOTAL_SUCCESSFUL_SNAPSHOT_COUNT
} else if report.State == datamodel.IncorrectSnapshotSubmission {
incrKey = redisutils.REDIS_KEY_TOTAL_INCORRECT_SNAPSHOT_COUNT
} else if report.State == datamodel.MissedSnapshotSubmission {
incrKey = redisutils.REDIS_KEY_TOTAL_MISSED_SNAPSHOT_COUNT
}
} else {
key = fmt.Sprintf(redisutils.REDIS_KEY_TOTAL_SUCCESSFUL_SNAPSHOT_COUNT, projectId)
}

if incrCount {
err = r.writeClient.Incr(ctx, key).Err()
if err != nil {
log.WithError(err).Error("failed to increment total snapshot count against key " + key)
if incrKey != "" {
err = r.writeClient.Incr(ctx, fmt.Sprintf(incrKey, projectId)).Err()
if err != nil {
log.WithError(err).Error("failed to increment snapshotter status report count in redis")
}
}

}

log.Debug("added snapshotter status report in redis")
Expand All @@ -221,9 +202,9 @@ func (r *RedisCache) StoreLastFinalizedEpoch(ctx context.Context, projectID stri
l := log.WithField("key", key)

lastEpochStr, err := r.readClient.Get(ctx, key).Result()
if err != nil {
l.WithError(err).Error("failed to get last finalized epoch from redis")
}
// if err != nil {
// l.WithError(err).Error("failed to get last finalized epoch from redis")
// }

lastEpoch := 0

Expand All @@ -247,7 +228,7 @@ func (r *RedisCache) StoreLastFinalizedEpoch(ctx context.Context, projectID stri
return err
}

l.WithField("projectID", projectID).WithField("epochId", epochId).Debug("stored last finalized epoch in redis")
l.Debug("stored last finalized epoch in redis")

return nil
}
Expand All @@ -272,10 +253,11 @@ func (r *RedisCache) StoreFinalizedSnapshot(ctx context.Context, msg *datamodel.
// get all the members
res, err := r.writeClient.ZRemRangeByScore(ctx, key, "-inf", strconv.Itoa(msg.EpochID-10240)).Result()
if err != nil {
log.WithField("epoch ID", msg.EpochID).WithError(err).Error("failed to remove finalized snapshots zset cache between -inf to epochId-10240")
log.WithField("epoch ID", msg.EpochID).WithField("Redis Op result:", res).WithError(err).Error("failed to remove finalized snapshots zset cache between -inf to epochId-10240")
// ignore error
return nil
}
log.WithField("epoch ID", msg.EpochID).WithField("Redis Op result:", res).Debug("removed finalized snapshots zset cache between -inf to epochId-10240")
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions go/goutils/datamodel/data_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import "github.com/ethereum/go-ethereum/signer/core/apitypes"
type SnapshotSubmissionState string

const (
MissedSnapshotSubmission SnapshotSubmissionState = "MISSED_SNAPSHOT"
IncorrectSnapshotSubmission SnapshotSubmissionState = "SUBMITTED_INCORRECT_SNAPSHOT"
MissedSnapshotSubmission SnapshotSubmissionState = "MISSED_SNAPSHOT"
IncorrectSnapshotSubmission SnapshotSubmissionState = "SUBMITTED_INCORRECT_SNAPSHOT"
OnlyFinalizedSnapshotSubmission SnapshotSubmissionState = "ONLY_FINALIZED_SNAPSHOT_RECIEVED"
SuccessfulSnapshotSubmission SnapshotSubmissionState = "SUCCESSFUL_SNAPSHOT_SUBMISSION"
)

type SnapshotterStateUpdate struct {
Expand Down
4 changes: 0 additions & 4 deletions go/goutils/ipfsutils/ipfs_utils.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package ipfsutils

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net"
Expand All @@ -17,7 +15,6 @@ import (
"github.com/swagftw/gi"
"golang.org/x/time/rate"

"audit-protocol/goutils/datamodel"
"audit-protocol/goutils/httpclient"
"audit-protocol/goutils/settings"
)
Expand Down Expand Up @@ -196,7 +193,6 @@ func ParseURL(ipfsUrl string) (string, error) {
return ipfsUrl, nil
}


// GetSnapshotFromIPFS returns the snapshot from IPFS.
func (client *IpfsClient) GetSnapshotFromIPFS(snapshotCID string, outputPath string) error {
err := client.readClientRateLimiter.Wait(context.Background())
Expand Down
Loading

0 comments on commit e830b95

Please sign in to comment.