Skip to content

Commit

Permalink
A few optimizations (eatonphil#2)
Browse files Browse the repository at this point in the history
* Profile

* Increase max append entries size

* Debounce appendEntries

* Get rid of race condition in sim

* More playing around

* Drop profile

* Fix kvapi example
  • Loading branch information
eatonphil authored May 5, 2023
1 parent 73f2dc8 commit 255d6c5
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 71 deletions.
6 changes: 3 additions & 3 deletions cmd/kvapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 74,7 @@ func (hs httpServer) setHandler(w http.ResponseWriter, r *http.Request) {
return
}

_, err = hs.raft.Apply(buf.Bytes())
_, err = hs.raft.Apply([][]byte{buf.Bytes()})
if err != nil {
log.Printf("Could not write key-value: %s", err)
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
Expand Down Expand Up @@ -133,9 133,9 @@ func getConfig() config {
if arg == "--cluster" {
cluster := os.Args[i 2]
var clusterEntry goraft.ClusterMember
for _, part := range strings.Split(cluster, ";") {
for i, part := range strings.Split(cluster, ";") {
idAddress := strings.Split(part, ",")
clusterEntry.Id = idAddress[0]
clusterEntry.Id = uint64(i)
clusterEntry.Address = idAddress[1]
cfg.cluster = append(cfg.cluster, clusterEntry)
}
Expand Down
7 changes: 6 additions & 1 deletion cmd/sim/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 4,14 @@ go 1.20

replace github.com/eatonphil/goraft => ../../

require github.com/eatonphil/goraft v0.0.0-00010101000000-000000000000
require (
github.com/eatonphil/goraft v0.0.0-00010101000000-000000000000
github.com/pkg/profile v1.7.0
)

require (
github.com/felixge/fgprof v0.9.3 // indirect
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd // indirect
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect
github.com/sasha-s/go-deadlock v0.3.1 // indirect
)
25 changes: 25 additions & 0 deletions cmd/sim/go.sum
Original file line number Diff line number Diff line change
@@ -1,4 1,29 @@
github.com/chzyer/logex v1.1.10/go.mod h1: Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3 rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI c5H38=
github.com/felixge/fgprof v0.9.3 h1:VvyZxILNuCiUCSXtPtYmmtGvb65nqXh2QFWc0Wpf2/g=
github.com/felixge/fgprof v0.9.3/go.mod h1:RdbpDgzqYVh/T9fPELJyV7EYJuHB55UTEULNun8eiPw=
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd h1:1FjCyPC syAzJ5/2S8fqdZK1R22vvA0J7JZKcuOIQ7Y=
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg=
github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
github.com/pkg/profile v1.7.0 h1:hnbDkaNWPCLMO9wGLdBFTIZvzDrDfBM2072E1S9gJkA=
github.com/pkg/profile v1.7.0/go.mod h1:8Uer0jas47ZQMJ7VD OHknK4YDY07LPUC6dEvqDjvNo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sasha-s/go-deadlock v0.3.1 h1:sqv7fDNShgjcaxkO0JNcOAlr8B9 cV5Ey/OB71efZx0=
github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF rwdDfMAkV7OtwuqBVzrE8GR6GFx wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1 rVB AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO kdMU MU=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5 mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E C64Yfv1cQ7kz7rIZviUmN EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E C64Yfv1cQ7kz7rIZviUmN EgEM=
104 changes: 70 additions & 34 deletions cmd/sim/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 8,7 @@ import (
"time"

"github.com/eatonphil/goraft"
// "github.com/pkg/profile"
)

type kvStateMachine struct {
Expand Down Expand Up @@ -82,6 83,7 @@ func randomString() string {
}

func main() {
//defer profile.Start(profile.MemProfile).Stop()
rand.Seed(0)

cluster := []goraft.ClusterMember{
Expand Down Expand Up @@ -127,67 129,101 @@ outer:
time.Sleep(time.Second)
}

N_ENTRIES := 3_000
N_CLIENTS := 1
N_ENTRIES := 8_000 / N_CLIENTS
BATCH_SIZE := goraft.MAX_APPEND_ENTRIES_BATCH / N_CLIENTS
fmt.Printf("Clients: %d. Entries: %d. Batch: %d.\n", N_CLIENTS, N_ENTRIES, BATCH_SIZE)

var wg sync.WaitGroup
wg.Add(N_CLIENTS)
var randKey, randValue string
t := time.Now()
var total time.Duration
for i := 0; i < N_ENTRIES; i {
if i%1000 == 0 && i > 0 {
fmt.Printf("%d entries inserted in %s.\n", i, time.Now().Sub(t))
t = time.Now()
}
key := randomString()
value := randomString()
var mu sync.Mutex
for j := 0; j < N_CLIENTS; j {
go func(j int) {
defer wg.Done()

//t := time.Now()
var entries [][]byte
for i := 0; i < N_ENTRIES; i {
// if i00 == 0 && i > 0 {
// fmt.Printf("%d entries inserted in %s.\n", i, time.Now().Sub(t))
// t = time.Now()
// }
key := randomString()
value := randomString()

if rand.Intn(100) > 90 || i == 0 && j == 0 {
randKey = key
randValue = value
}

if rand.Intn(100) > 90 || i == 0 {
randKey = key
randValue = value
}
entries = append(entries, kvsmMessage_Set(key, value))

foundALeader:
for {
for _, s := range []*goraft.Server{s1, s2, s3} {
t := time.Now()
_, err := s.Apply(kvsmMessage_Set(key, value))
total = time.Now().Sub(t)
if err == goraft.ErrApplyToLeader {
if len(entries) < BATCH_SIZE && i < N_ENTRIES-1 {
continue
} else if err != nil {
panic(err)
} else {
break foundALeader
}
foundALeader:
for {
for _, s := range []*goraft.Server{s1, s2, s3} {
t := time.Now()
_, err := s.Apply(entries)
if err == goraft.ErrApplyToLeader {
continue
} else if err != nil {
panic(err)
} else {
diff := time.Now().Sub(t)
mu.Lock()
total = diff
mu.Unlock()
fmt.Printf("Client: %d. %d entries (%d of %d: %d%%) inserted. Latency: %s. Throughput: %f entries/s.\n",
j,
len(entries),
i 1,
N_ENTRIES,
((i 1) * 100)/N_ENTRIES,
diff,
float64(len(entries)) / float64(diff / time.Second),
)
break foundALeader
}
}
time.Sleep(time.Second)
}

entries = [][]byte{}
}
}(j)
}

wg.Wait()
fmt.Printf("Total time: %s. Average insert/second: %s. Throughput: %f entries/s.\n", total, total/time.Duration(N_ENTRIES), float64(N_ENTRIES)/float64(total/time.Second))

for _, s := range []*goraft.Server{s1, s2, s3} {
for !s.AllCommitted() {
time.Sleep(time.Second)
fmt.Println("Waiting for commits to be applied.")
}

goraft.Assert("Quorum reached", s1.Entries() == s2.Entries() || s1.Entries() == s3.Entries() || s2.Entries() == s3.Entries(), true)
}
fmt.Printf("Total time: %s. Average insert/second: %s. Throughput: %f entries/s.\n", total, total/time.Duration(N_ENTRIES), float64(N_ENTRIES)/float64(total/time.Second))

var v []byte
var err error
for _, s := range []*goraft.Server{s1, s2, s3} {
v, err = s.Apply(kvsmMessage_Get(randKey))
res, err := s.Apply([][]byte{kvsmMessage_Get(randKey)})
if err == goraft.ErrApplyToLeader {
continue
} else if err != nil {
panic(err)
} else {
v = res[0].Result
break
}
}

// Ooph, but need to wait for the other state machines to catch up.
time.Sleep(2 * time.Second)

for _, sm := range []*kvStateMachine{sm1, sm2, sm3} {
goraft.Assert("Each node state machine is up-to-date", string(v), sm.kv[randKey])
goraft.Assert("Each node state machine is up-to-date", randValue, sm.kv[randKey])
}

goraft.Assert("Quorum reached", s1.Entries() == s2.Entries() || s1.Entries() == s3.Entries() || s2.Entries() == s3.Entries(), true)

fmt.Printf("%s = %s, expected: %s\n", randKey, string(v), randValue)
}
Loading

0 comments on commit 255d6c5

Please sign in to comment.