Skip to content

Commit

Permalink
Extract samplingSender and use it for gRPC (#637)
Browse files Browse the repository at this point in the history
  • Loading branch information
camdencheek authored Aug 11, 2023
1 parent d572353 commit 956d775
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 40 deletions.
7 changes: 6 additions & 1 deletion grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 46,13 @@ func (s *Server) StreamSearch(req *v1.SearchRequest, ss v1.WebserverService_Stre
onMatch := stream.SenderFunc(func(res *zoekt.SearchResult) {
ss.Send(res.ToProto())
})
sampler := stream.NewSamplingSender(onMatch)

return s.streamer.StreamSearch(ss.Context(), q, zoekt.SearchOptionsFromProto(req.GetOpts()), onMatch)
err = s.streamer.StreamSearch(ss.Context(), q, zoekt.SearchOptionsFromProto(req.GetOpts()), sampler)
if err == nil {
sampler.Flush()
}
return err
}

func (s *Server) List(ctx context.Context, req *v1.ListRequest) (*v1.ListResponse, error) {
Expand Down
101 changes: 62 additions & 39 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,55 76,20 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}()

agg := zoekt.SearchResult{}
aggCount := 0

send := func(zsr *zoekt.SearchResult) {

err := eventWriter.event(eventMatches, zsr)
if err != nil {
_ = eventWriter.event(eventError, err)
return
}
}

err = h.Searcher.StreamSearch(ctx, args.Q, args.Opts, SenderFunc(func(event *zoekt.SearchResult) {
// We don't want to send events over the wire if they don't contain file
// matches. Hence, in case we didn't find any results, we aggregate the stats
// and send them out in regular intervals.
if len(event.Files) == 0 {
aggCount

agg.Stats.Add(event.Stats)
agg.Progress = event.Progress

if aggCount%100 == 0 && !agg.Stats.Zero() {
send(&agg)
agg = zoekt.SearchResult{}
}

return
}
sampler := NewSamplingSender(SenderFunc(send))

// If we have aggregate stats, we merge them with the new event before sending
// it. We drop agg.Progress, because we assume that event.Progress reflects the
// latest status.
if !agg.Stats.Zero() {
event.Stats.Add(agg.Stats)
agg = zoekt.SearchResult{}
}
err = h.Searcher.StreamSearch(ctx, args.Q, args.Opts, sampler)

send(event)
}))

if err == nil && !agg.Stats.Zero() {
send(&zoekt.SearchResult{
Stats: agg.Stats,
Progress: zoekt.Progress{
Priority: math.Inf(-1),
MaxPendingPriority: math.Inf(-1),
},
})
if err == nil {
sampler.Flush()
}

if err != nil {
Expand Down Expand Up @@ -184,3 149,61 @@ func registerGob() {
})
rpc.RegisterGob()
}

// NewSamplingSender is a zoekt.Sender that samples stats events
// to avoid sending many empty stats events over the wire.
func NewSamplingSender(next zoekt.Sender) *samplingSender {
return &samplingSender{
next: next,
agg: zoekt.SearchResult{},
aggCount: 0,
}
}

type samplingSender struct {
next zoekt.Sender
agg zoekt.SearchResult
aggCount int
}

func (s *samplingSender) Send(event *zoekt.SearchResult) {
// We don't want to send events over the wire if they don't contain file
// matches. Hence, in case we didn't find any results, we aggregate the stats
// and send them out in regular intervals.
if len(event.Files) == 0 {
s.aggCount

s.agg.Stats.Add(event.Stats)
s.agg.Progress = event.Progress

if s.aggCount%100 == 0 && !s.agg.Stats.Zero() {
s.next.Send(&s.agg)
s.agg = zoekt.SearchResult{}
}

return
}

// If we have aggregate stats, we merge them with the new event before sending
// it. We drop agg.Progress, because we assume that event.Progress reflects the
// latest status.
if !s.agg.Stats.Zero() {
event.Stats.Add(s.agg.Stats)
s.agg = zoekt.SearchResult{}
}

s.next.Send(event)
}

// Flush sends any aggregated stats that we haven't sent yet
func (s *samplingSender) Flush() {
if !s.agg.Stats.Zero() {
s.next.Send(&zoekt.SearchResult{
Stats: s.agg.Stats,
Progress: zoekt.Progress{
Priority: math.Inf(-1),
MaxPendingPriority: math.Inf(-1),
},
})
}
}
65 changes: 65 additions & 0 deletions stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 195,68 @@ func (a adapter) StreamSearch(ctx context.Context, q query.Q, opts *zoekt.Search
sender.Send(sr)
return nil
}

func TestSamplingStream(t *testing.T) {
nonZeroStats := zoekt.Stats{
ContentBytesLoaded: 10,
}
filesEvent := &zoekt.SearchResult{
Files: make([]zoekt.FileMatch, 10),
Stats: nonZeroStats,
}
fileEvents := func(n int) []*zoekt.SearchResult {
res := make([]*zoekt.SearchResult, n)
for i := 0; i < n; i {
res[i] = filesEvent
}
return res
}
statsEvent := &zoekt.SearchResult{
Stats: nonZeroStats,
}
statsEvents := func(n int) []*zoekt.SearchResult {
res := make([]*zoekt.SearchResult, n)
for i := 0; i < n; i {
res[i] = statsEvent
}
return res
}
cases := []struct {
events []*zoekt.SearchResult
beforeFlushCount int
afterFlushCount int
}{
// These test cases assume that the sampler only forwards
// every 100 stats-only event. In case the sampling logic
// changes, these tests are not valuable.
{nil, 0, 0},
{fileEvents(1), 1, 1},
{fileEvents(2), 2, 2},
{fileEvents(200), 200, 200},
{append(fileEvents(1), statsEvents(1)...), 1, 2},
{append(fileEvents(1), statsEvents(2)...), 1, 2},
{append(fileEvents(1), statsEvents(99)...), 1, 2},
{append(fileEvents(1), statsEvents(100)...), 2, 2},
{statsEvents(500), 5, 5},
{statsEvents(501), 5, 6},
}

for _, tc := range cases {
count := 0
ss := NewSamplingSender(SenderFunc(func(*zoekt.SearchResult) {
count = 1
}))

for _, event := range tc.events {
ss.Send(event)
}
if count != tc.beforeFlushCount {
t.Fatalf("expected %d events, got %d", tc.beforeFlushCount, count)
}
ss.Flush()

if count != tc.afterFlushCount {
t.Fatalf("expected %d events, got %d", tc.afterFlushCount, count)
}
}
}

0 comments on commit 956d775

Please sign in to comment.