Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/grpcsync: Provide an internal-only pub-sub type API #6167

Merged
merged 17 commits into from
Jun 30, 2023

Conversation

my4-dev
Copy link
Contributor

@my4-dev my4-dev commented Apr 1, 2023

This PR adds an internal-only generic pub-sub type API.

Please refer to this conversation in #6036 .

RELEASE NOTES: none

@easwars easwars self-assigned this Apr 3, 2023
@easwars easwars added the Type: Feature New features or improvements in behavior label Apr 3, 2023
@easwars easwars added this to the 1.55 Release milestone Apr 3, 2023
internal/genericpubsub/genericpubsub.go Outdated Show resolved Hide resolved
internal/genericpubsub/genericpubsub.go Outdated Show resolved Hide resolved
@easwars
Copy link
Contributor

easwars commented Apr 3, 2023

@my4-dev : Thanks for doing this.

@my4-dev
Copy link
Contributor Author

my4-dev commented Apr 6, 2023

I've finished incorporating our ideas.
Please take a look at this. Thank you.

internal/grpcsync/pubsub.go Outdated Show resolved Hide resolved
internal/grpcsync/pubsub.go Outdated Show resolved Hide resolved
internal/grpcsync/pubsub.go Outdated Show resolved Hide resolved
internal/grpcsync/pubsub.go Outdated Show resolved Hide resolved
internal/grpcsync/pubsub.go Outdated Show resolved Hide resolved
internal/grpcsync/pubsub.go Outdated Show resolved Hide resolved
my4-dev and others added 3 commits April 8, 2023 18:09
@my4-dev
Copy link
Contributor Author

my4-dev commented Apr 10, 2023

@easwars : Thank you for your suggestion. I've corrected.

Copy link
Contributor

@easwars easwars left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code is quite close to what I had in mind initially. I'll start reviewing the tests soon.

internal/grpcsync/pubsub.go Outdated Show resolved Hide resolved
internal/grpcsync/pubsub.go Outdated Show resolved Hide resolved
internal/grpcsync/pubsub.go Outdated Show resolved Hide resolved
internal/grpcsync/pubsub_test.go Outdated Show resolved Hide resolved
internal/grpcsync/pubsub.go Outdated Show resolved Hide resolved
internal/grpcsync/pubsub_test.go Outdated Show resolved Hide resolved
internal/grpcsync/pubsub_test.go Outdated Show resolved Hide resolved
@dfawley
Copy link
Member

dfawley commented May 31, 2023

@arvindbr8 is out of town until when @easwars gets back so reassigning to @easwars.

@dfawley dfawley modified the milestones: 1.56 Release, 1.57 Release Jun 2, 2023
Copy link
Contributor

@easwars easwars left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Just some minor nits this time around.

for i := 0; i < numPublished; i {
select {
case <-ts1.onMsgCh:
case <-time.After(defaultTestShortTimeout):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one should probably use the defaultTestTimeout instead.

// Register another subscriber and ensure that it receives the last published message.
ts2 := newTestSubscriber(numPublished)
pubsub.Subscribe(ts2)
wantMsgs2 := []int{numPublished - 1}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be wantMsgs2 := wantMsgs1[len(wantMsgs1)-1:]

for i := 0; i < numPublished; i {
select {
case <-ts1.onMsgCh:
case <-time.After(defaultTestShortTimeout):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be defaultTestTimeout and the same applies to the next goroutine. We generally use defaultTestShortTimeout only in places where we are waiting for events which we expect to not happen in the test. For example, if we are not expecting a message to arrive from the server, we block for this short timeout and if we receive a message from the server in that timeframe, then we declare an error.

@my4-dev
Copy link
Contributor Author

my4-dev commented Jun 27, 2023

@easwars : Thank you for approving this PR! I've fixed pubsub_test according to your advice.

@easwars
Copy link
Contributor

easwars commented Jun 27, 2023

@my4-dev : Thanks for taking care of all the comments.

@arvindbr8 : Could you please be the second reviewer on this one. Thanks.

@easwars easwars assigned arvindbr8 and unassigned my4-dev Jun 27, 2023
Copy link
Member

@arvindbr8 arvindbr8 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few comments. Could you also please add comments to the test cases. It would really help with readability for other users.

}()

wg.Wait()
if isTimeout {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we fail later in the case of a timeout? I believe its better to t.Fatal whenever the reader goroutine times out and fail fast.

Consider the worst case where onMessage() callback is never invoked. We would have to wait for 10 * 10 ms for the test to fail, where as it could have failed in the first run.

Let me know if there was a different consideration for why we need the isTimeout boolean to check this.


Alternatively, you could use a t.Errorf() when a reader loop times out and then do this after the wg.Wait()

if t.Failed() {
	t.FailNow()
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whatever we choose, there are trade-offs.

t.Fatal
If we invoke t.Fatal() in the goroutine which is called by the main goroutine, the main isn't cancelled.
So, we can't use this in sub goroutine.

t.Errof & t.Failed()
It is more like Golang than using boolean flag. But many error messages would be displayed if there is a timeout in loops and it would make logs difficult to read.

Could you please share your opinion about that🙇‍♂️

go func() {
for i := 0; i < numPublished; i {
select {
case <-ts1.onMsgCh:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, we are trying to check the subscriber receieves the messages in the order they are published. And we know the message that is published in each iteration (which is i := range numPublished). Why dont we just check for the message received here and fail if its not the one that is expected?

that way we can nix wantMsg1 and gotMsg1 variables and also fail faster.

I usually like the idea of always failing fast as possible in test assertions. Here lets say that the first message itself came out of order, we would have to wait until all the messages are received to fail the test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, the one sub goroutine is used to call Publish() and the other is used to check to see if ts1.OnMessage() function was called the correct number of times.
Both are processed asynchronously.

Also, if we tried to fail faster, it would be more complicated than before.
(I haven't come up with the idea to do yet.)

t.Fatalf("Received messages is %v, want %v", gotMsgs1, wantMsgs1)
}

// Register another subscriber and ensure that it receives the last published message.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

receives the last published message

-- which we know in this case if numPublished - 1 right? If you go by my comment above, we could also think about nixing wantMsgs2 right?

select {
case <-ts1.onMsgCh:
case <-time.After(defaultTestTimeout):
errCh <- fmt.Errorf("")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since errCh is initialized with size 1, Wouldn't this be a blocking write if there are multiple timeouts since the channel is only read after wg.Wait()?

I would recommend not using an errCh but instead use t.Error() or t.Fatal() in this case? using t.Error might help you check if both subscribers failed in this case.

Copy link
Contributor Author

@my4-dev my4-dev Jun 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this be a blocking write if there are multiple timeouts since the channel is only read after wg.Wait()?

This is correct! So we have to fix this problem.

But, as I said the above comment, I'm hesitating using t.Errorf() or t.Fatalf() .

}
}

func (s) TestPubSub_PublishMsgs_BeforeRegisterSub(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not already tests in the previous test case here? Maybe remove that check from the previous test case if you would like to have separate test cases for both

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They have different preconditions.
Previous one is tested by calling pubsub.Subscribe() at the status that some test subsribers have already added to pubsub.
The other one is tested at the status that no subscriber has added to pubsub.

Considering above, should we remove either?

Copy link
Member

@arvindbr8 arvindbr8 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the test cases are WAI. I'm approving it and taking my comments as an action item.

@arvindbr8 arvindbr8 merged commit 51042db into grpc:master Jun 30, 2023
10 checks passed
@my4-dev my4-dev deleted the generic-pub-sub-api branch July 1, 2023 06:26
@github-actions github-actions bot locked as resolved and limited conversation to collaborators Dec 29, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Type: Feature New features or improvements in behavior
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants