Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/grpcsync: Provide an internal-only pub-sub type API #6167

Merged
merged 17 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
apply comments
  • Loading branch information
my4-dev committed Apr 8, 2023
commit c48bce59e36f10cd1b9deb9400ae980be08c9f8a
57 changes: 27 additions & 30 deletions internal/grpcsync/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 31,10 @@ type Subscriber interface {
}

// PubSub is a simple one-to-many publish-subscribe system that supports messages
// of arbitrary type.
// of arbitrary type.
//
// Publisher invokes the Publish() method to publish new messages, while
// subscribers interested in receiving these messages register a callback
// subscribers interested in receiving these messages register a callback
// via the Subscribe() method. It guarantees that messages are delivered in
// the same order in which they were published.
//
Expand All @@ -45,58 45,58 @@ type PubSub struct {
cancel context.CancelFunc

// Access to the below fields are guarded by this mutex.
mu sync.Mutex
msg interface{}
subscribers map[Watcher]bool
stopped bool
mu sync.Mutex
msg interface{}
subscribers map[Subscriber]bool
stopped bool
}

// NewPubSub returns a new PubSub instance to track msg changes.
// NewPubSub returns a new PubSub instance.
func NewPubSub() *PubSub {
ctx, cancel := context.WithCancel(context.Background())
return &PubSub{
cs: NewCallbackSerializer(ctx),
cancel: cancel,
watchers: map[Watcher]bool{},
cs: NewCallbackSerializer(ctx),
cancel: cancel,
subscribers: map[Subscriber]bool{},
}
}

// Subscribe adds the provided watcher to the set of watchers in PubSub.
// The Publish() callback will be invoked asynchronously with the current
// msg of the tracked entity to begin with, and subsequently for every msg
// change.
// Subscribe registers the provided Subscriber to the PubSub.
//
// Returns a function to remove the provided watcher from the set of watchers.
// The caller of this method is responsible for invoking this function when it
// no longer needs to monitor the msg changes on the channel.
func (ps *PubSub) Subscribe(watcher Watcher) func() {
// If the PubSub contains a previously published message, the Subscriber's
// OnMessage() callback will be invoked asynchronously with the existing
// message to begin with, and subsequently for every newly published message.
//
// The caller is responsible for invoking the returned function to unsubscribe
// itself from the PubSub.
func (ps *PubSub) Subscribe(sub Subscriber) func() {
ps.mu.Lock()
defer ps.mu.Unlock()

if ps.stopped {
return func() {}
}

ps.watchers[watcher] = true
ps.subscribers[sub] = true

if ps.msg != nil {
msg := ps.msg
ps.cs.Schedule(func(context.Context) {
ps.mu.Lock()
defer ps.mu.Unlock()
watcher.OnChange(msg)
sub.OnMessage(msg)
})
}

return func() {
ps.mu.Lock()
defer ps.mu.Unlock()
delete(ps.watchers, watcher)
delete(ps.subscribers, sub)
}
}

// Publish updates the msg of the entity being tracked, and
// invokes the Publish callback of all registered watchers.
// Publish publishes the provided message to the PubSub, and invokes
// callbacks registered by subscribers asynchronously.
func (ps *PubSub) Publish(msg interface{}) {
ps.mu.Lock()
defer ps.mu.Unlock()
Expand All @@ -106,21 106,18 @@ func (ps *PubSub) Publish(msg interface{}) {
}

ps.msg = msg

for watcher := range ps.watchers {
// Prevent the watcher which is passed to the closure function
// from being changed while this loop.
w := watcher
for sub := range ps.subscribers {
s := sub
ps.cs.Schedule(func(context.Context) {
ps.mu.Lock()
defer ps.mu.Unlock()
w.OnChange(msg)
s.OnMessage(msg)
})
}
}

// Stop shuts down the PubSub and releases any resources allocated by it.
// It is guaranteed that no Watcher callbacks would be invoked once this
// It is guaranteed that no subscriber callbacks would be invoked once this
// method returns.
func (ps *PubSub) Stop() {
ps.mu.Lock()
Expand Down
64 changes: 32 additions & 32 deletions internal/grpcsync/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 26,17 @@ import (
"github.com/google/go-cmp/cmp"
)

type mockWatcher struct {
type mockSubscriber struct {
easwars marked this conversation as resolved.
Show resolved Hide resolved
mu sync.Mutex
msgs []string
wg *sync.WaitGroup
easwars marked this conversation as resolved.
Show resolved Hide resolved
}

func (mw *mockWatcher) OnChange(target interface{}) {
mw.mu.Lock()
defer mw.mu.Unlock()
mw.msgs = append(mw.msgs, target.(string))
mw.wg.Done()
func (ms *mockSubscriber) OnMessage(msg interface{}) {
ms.mu.Lock()
defer ms.mu.Unlock()
ms.msgs = append(ms.msgs, msg.(string))
ms.wg.Done()
}

easwars marked this conversation as resolved.
Show resolved Hide resolved
func (s) TestPubSub(t *testing.T) {
easwars marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -45,75 45,75 @@ func (s) TestPubSub(t *testing.T) {

wg := sync.WaitGroup{}
expectedMsg := []string{}
mw := &mockWatcher{
ms := &mockSubscriber{
msgs: []string{},
wg: &wg,
}
pubsub.Subscribe(mw)
if diff := cmp.Diff(mw.msgs, expectedMsg); diff != "" {
t.Errorf("Difference between mw.msgs and expectedMsg for initial situation. diff(-want, got):\n%s", diff)
pubsub.Subscribe(ms)
if diff := cmp.Diff(ms.msgs, expectedMsg); diff != "" {
t.Errorf("Difference between ms.msgs and expectedMsg for initial situation. diff(-want, got):\n%s", diff)
}

easwars marked this conversation as resolved.
Show resolved Hide resolved
// Update the target and verify that Publish is called again.
pubsub.Publish("set")
wg.Add(1)
wg.Wait()
expectedMsg = append(expectedMsg, "set")
if diff := cmp.Diff(mw.msgs, expectedMsg); diff != "" {
t.Errorf("Difference between mw.msgs and expectedMsg for updated watcher callback. diff(-want, got):\n%s", diff)
if diff := cmp.Diff(ms.msgs, expectedMsg); diff != "" {
t.Errorf("Difference between ms.msgs and expectedMsg for updated subscriber callback. diff(-want, got):\n%s", diff)
}

easwars marked this conversation as resolved.
Show resolved Hide resolved
// Add another watcher and verify that its Publish is called with the current target.
expectedMsg2 := []string{}
mw2 := &mockWatcher{
ms2 := &mockSubscriber{
msgs: []string{},
wg: &wg,
}
cancelFunc2 := pubsub.Subscribe(mw2)
cancelFunc2 := pubsub.Subscribe(ms2)
wg.Add(1)
wg.Wait()
expectedMsg2 = append(expectedMsg2, "set")
if diff := cmp.Diff(mw.msgs, expectedMsg); diff != "" {
t.Errorf("Difference between mw.msgs and expectedMsg after adding second watcher. diff(-want, got):\n%s", diff)
if diff := cmp.Diff(ms.msgs, expectedMsg); diff != "" {
t.Errorf("Difference between ms.msgs and expectedMsg after adding second subscriber. diff(-want, got):\n%s", diff)
}
if diff := cmp.Diff(mw2.msgs, expectedMsg2); diff != "" {
t.Errorf("Difference between mw2.targets and expectedMsg2 after adding second watcher. diff(-want, got):\n%s", diff)
if diff := cmp.Diff(ms2.msgs, expectedMsg2); diff != "" {
t.Errorf("Difference between ms2.msgs and expectedMsg2 after adding second subscriber. diff(-want, got):\n%s", diff)
}

// Update the target again and verify that both watchers receive the update.
// Update the msg again and verify that both subscribers receive the update.
pubsub.Publish("set2")
wg.Add(2)
wg.Wait()
expectedMsg = append(expectedMsg, "set2")
expectedMsg2 = append(expectedMsg2, "set2")
if diff := cmp.Diff(mw.msgs, expectedMsg); diff != "" {
t.Errorf("Difference between mw.msgs and expectedMsg after sending message to both watchers. diff(-want, got):\n%s", diff)
if diff := cmp.Diff(ms.msgs, expectedMsg); diff != "" {
t.Errorf("Difference between ms.msgs and expectedMsg after sending message to both watchers. diff(-want, got):\n%s", diff)
}
if diff := cmp.Diff(mw2.msgs, expectedMsg2); diff != "" {
t.Errorf("Difference between mw2.targets and expectedMsg2 after sending message to both watchers. diff(-want, got):\n%s", diff)
if diff := cmp.Diff(ms2.msgs, expectedMsg2); diff != "" {
t.Errorf("Difference between ms2.msgs and expectedMsg2 after sending message to both watchers. diff(-want, got):\n%s", diff)
}

// Remove the second watcher and verify that its callback is no longer received.
// Remove the second subscriber and verify that its callback is no longer received.
cancelFunc2()
pubsub.Publish("set3")
wg.Add(1)
wg.Wait()
expectedMsg = append(expectedMsg, "set3")
if diff := cmp.Diff(mw.msgs, expectedMsg); diff != "" {
t.Errorf("Difference between mw.msgs and expectedMsg after removing second watcher. diff(-want, got):\n%s", diff)
if diff := cmp.Diff(ms.msgs, expectedMsg); diff != "" {
t.Errorf("Difference between ms.msgs and expectedMsg after removing second subscriber. diff(-want, got):\n%s", diff)
}
if diff := cmp.Diff(mw2.msgs, expectedMsg2); diff != "" {
t.Errorf("Difference between mw2.targets and expectedMsg2 after removing second watcher. diff(-want, got):\n%s", diff)
if diff := cmp.Diff(ms2.msgs, expectedMsg2); diff != "" {
t.Errorf("Difference between ms2.msgs and expectedMsg2 after removing second subscriber. diff(-want, got):\n%s", diff)
}

// Stop the pubsub and verify that no more callbacks are received.
pubsub.Stop()
pubsub.Publish("set4")
time.Sleep(10 * time.Millisecond)
if diff := cmp.Diff(mw.msgs, expectedMsg); diff != "" {
t.Errorf("Difference between mw.msgs and expectedMsg after stopping pubsub. diff(-want, got):\n%s", diff)
if diff := cmp.Diff(ms.msgs, expectedMsg); diff != "" {
t.Errorf("Difference between ms.msgs and expectedMsg after stopping pubsub. diff(-want, got):\n%s", diff)
}
if diff := cmp.Diff(mw2.msgs, expectedMsg2); diff != "" {
t.Errorf("Difference between mw2.targets and expectedMsg2 after stopping pubsub. diff(-want, got):\n%s", diff)
if diff := cmp.Diff(ms2.msgs, expectedMsg2); diff != "" {
t.Errorf("Difference between ms2.msgs and expectedMsg2 after stopping pubsub. diff(-want, got):\n%s", diff)
}
}