Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unstable #28

Merged
merged 8 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Fixed CPU usage caused by polling inboxes
  • Loading branch information
anthdm committed Mar 6, 2023
commit 7a054c882e26c6480adcb7df5e9f34c89d4797a2
1 change: 1 addition & 0 deletions actor/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 53,6 @@ func (in *Inbox) Stop() error {
}

func (in *Inbox) Send(msg Envelope) {
in.ggq.Awake()
in.ggq.Write(msg)
}
25 changes: 23 additions & 2 deletions ggq/ggq.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 2,8 @@ package ggq

import (
"runtime"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/anthdm/hollywood/log"
Expand Down Expand Up @@ -39,11 39,13 @@ type GGQ[T any] struct {
_ [cacheLinePadding - unsafe.Sizeof(atomic.Uint32{})]byte
state atomic.Uint32
_ [cacheLinePadding - unsafe.Sizeof(atomic.Uint32{})]byte
isIdling atomic.Bool
buffer []slot[T]
_ [cacheLinePadding]byte
mask uint32
consumer Consumer[T]
itemBuffer []T
cond *sync.Cond
}

func New[T any](size uint32, consumer Consumer[T]) *GGQ[T] {
Expand All @@ -55,6 57,7 @@ func New[T any](size uint32, consumer Consumer[T]) *GGQ[T] {
mask: size - 1,
consumer: consumer,
itemBuffer: make([]T, size 1),
cond: sync.NewCond(nil),
}
}

Expand Down Expand Up @@ -86,7 89,13 @@ func (q *GGQ[T]) ReadN() (T, bool) {
} else if upper := q.written.Load(); lower <= upper {
runtime.Gosched()
} else if !q.state.CompareAndSwap(stateClosed, stateRunning) {
time.Sleep(time.Microsecond)
var mu sync.Mutex
q.cond.L = &mu
q.isIdling.Store(true)
mu.Lock()
q.cond.Wait()
mu.Unlock()
q.isIdling.Store(false)
} else {
break
}
Expand All @@ -95,6 104,17 @@ func (q *GGQ[T]) ReadN() (T, bool) {
return t, true
}

// Awake the queue if its in the idle state.
func (q *GGQ[T]) Awake() {
if q.isIdling.Load() {
q.cond.Signal()
}
}

func (q *GGQ[T]) IsIdle() bool {
return q.isIdling.Load()
}

func (q *GGQ[T]) Consume(lower, upper uint32) {
consumed := 0
for ; lower <= upper; lower {
Expand Down Expand Up @@ -139,6 159,7 @@ func (q *GGQ[T]) Read() (T, bool) {

func (q *GGQ[T]) Close() {
q.state.Store(stateClosed)
q.cond.Signal()
}

func isPOW2(n uint32) bool {
Expand Down
14 changes: 7 additions & 7 deletions ggq/ggq_test.go
Original file line number Diff line number Diff line change
@@ -1,24 1,24 @@
package ggq

import (
"fmt"
"testing"
)

type consumer[T any] struct{}

func (c *consumer[T]) Consume(t []T) {
// fmt.Println(len(t))
fmt.Println(t)
}

func TestSingleMessageNotConsuming(t *testing.T) {
q := New[int](1024, &consumer[int]{})

go func() {
for i := 0; i < 1; i {
q.Write(i)
}
q.Close()
}()
q.cond.Signal()
for i := 0; i < 10; i {
q.Write(i)
}
q.Close()

q.ReadN()
}
Expand Down