Skip to content

Commit

Permalink
queue operations context: add context cancelation and record error
Browse files Browse the repository at this point in the history
this addresses an important case where a (preexisting) handler is
wrapped in another handler

canceling the context when a queue operation is performed signals to
wrappers that the handler chain should stop (and also protects against
accidental further processing, i.e. with a kube client)

recording the error on the operations context (which is similar to
context.Context) gives the caller a chance to do something with the
error before returning
  • Loading branch information
ecordell committed Sep 22, 2022
1 parent 71e2069 commit 45eab07
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 12 deletions.
10 changes: 5 additions & 5 deletions component/ensure_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 23,7 @@ type Annotator[T any] interface {
// will create a component object and ensure it has the computed spec.
type EnsureComponentByHash[K KubeObject, A Annotator[A]] struct {
*HashableComponent[K]
ctrls *typedctx.Key[queue.Interface]
ctrls queue.OperationsContext
nn typedctx.MustValueContext[types.NamespacedName]
applyObject func(ctx context.Context, apply A) (K, error)
deleteObject func(ctx context.Context, nn types.NamespacedName) error
Expand All @@ -36,7 36,7 @@ var _ handler.ContextHandler = &EnsureComponentByHash[*corev1.Service, *applycor
func NewEnsureComponentByHash[K KubeObject, A Annotator[A]](
component *HashableComponent[K],
owner typedctx.MustValueContext[types.NamespacedName],
ctrls *typedctx.Key[queue.Interface],
ctrls queue.OperationsContext,
applyObj func(ctx context.Context, apply A) (K, error),
deleteObject func(ctx context.Context, nn types.NamespacedName) error,
newObj func(ctx context.Context) A,
Expand All @@ -57,7 57,7 @@ func (e *EnsureComponentByHash[K, A]) Handle(ctx context.Context) {
newObj := e.newObj(ctx)
hash, err := e.Hash(newObj)
if err != nil {
e.ctrls.MustValue(ctx).RequeueErr(err)
e.ctrls.RequeueErr(ctx, err)
return
}
newObj = newObj.WithAnnotations(map[string]string{e.HashAnnotationKey: hash})
Expand All @@ -80,7 80,7 @@ func (e *EnsureComponentByHash[K, A]) Handle(ctx context.Context) {
// apply if no matching KubeObject in cluster
_, err = e.applyObject(ctx, newObj)
if err != nil {
e.ctrls.MustValue(ctx).RequeueErr(err)
e.ctrls.RequeueErr(ctx, err)
return
}
}
Expand All @@ -92,7 92,7 @@ func (e *EnsureComponentByHash[K, A]) Handle(ctx context.Context) {
Namespace: o.GetNamespace(),
Name: o.GetName(),
}); err != nil {
e.ctrls.MustValue(ctx).RequeueErr(err)
e.ctrls.RequeueErr(ctx, err)
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion component/ensure_component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 144,7 @@ func TestEnsureServiceHandler(t *testing.T) {
}),
hash.NewObjectHash(), hashKey),
ctxOwner,
queueOps.Key,
queueOps,
func(ctx context.Context, apply *applycorev1.ServiceApplyConfiguration) (*corev1.Service, error) {
applyCalled = true
return nil, nil
Expand Down
5 changes: 3 additions & 2 deletions manager/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 32,11 @@ import (
"k8s.io/controller-manager/controller"
controllerhealthz "k8s.io/controller-manager/pkg/healthz"

"github.com/go-logr/logr"

"github.com/authzed/controller-idioms/cachekeys"
"github.com/authzed/controller-idioms/queue"
"github.com/authzed/controller-idioms/typed"
"github.com/go-logr/logr"
)

// SyncFunc is a function called when an event needs processing
Expand Down Expand Up @@ -162,7 163,7 @@ func (c *OwnedResourceController) processNext(ctx context.Context) bool {
c.Queue.AddAfter(key, after)
}

ctx = c.OperationsContext.WithValue(ctx, queue.NewOperations(done, requeue))
ctx = c.OperationsContext.WithValue(ctx, queue.NewOperations(done, requeue, cancel))

c.sync(ctx, *gvr, namespace, name)
done()
Expand Down
33 changes: 31 additions & 2 deletions queue/controls.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 20,9 @@ import (
"context"
"time"

"github.com/authzed/controller-idioms/typedctx"
"github.com/go-logr/logr"

"github.com/authzed/controller-idioms/typedctx"
)

//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate
Expand Down Expand Up @@ -58,10 59,15 @@ func (h OperationsContext) RequeueAPIErr(ctx context.Context, err error) {
h.MustValue(ctx).RequeueAPIErr(err)
}

func NewOperations(done func(), requeueAfter func(time.Duration)) *Operations {
func (h OperationsContext) Error(ctx context.Context) error {
return h.MustValue(ctx).Error()
}

func NewOperations(done func(), requeueAfter func(time.Duration), cancel context.CancelFunc) *Operations {
return &Operations{
done: done,
requeueAfter: requeueAfter,
cancel: cancel,
}
}

Expand All @@ -74,32 80,50 @@ type Interface interface {
Requeue()
RequeueErr(err error)
RequeueAPIErr(err error)
Error() error
}

// Operations deals with the current queue key and provides controls for
// requeueing or stopping reconciliation.
type Operations struct {
done func()
requeueAfter func(duration time.Duration)
cancel context.CancelFunc
err error
}

// Done marks the current key as finished. Note that processing should stop
// as soon as possible after calling `Done`, since marking it as done frees the
// queue to potentially process the same key again.
func (c *Operations) Done() {
defer c.cancel()
c.done()
}

// RequeueAfter requeues the current key after duration.
func (c *Operations) RequeueAfter(duration time.Duration) {
defer c.cancel()
c.requeueAfter(duration)
}

// Requeue requeues the current key immediately.
func (c *Operations) Requeue() {
defer c.cancel()
c.requeueAfter(0)
}

// RequeueErr sets err on the object and requeues the current key.
func (c *Operations) RequeueErr(err error) {
defer c.cancel()
c.err = err
c.requeueAfter(0)
}

// RequeueAPIErr checks to see if `err` is a kube api error with retry data.
// If so, it requeues after the wait period, otherwise, it requeues immediately.
func (c *Operations) RequeueAPIErr(err error) {
defer c.cancel()
c.err = err
retry, after := ShouldRetry(err)
if retry && after > 0 {
c.RequeueAfter(after)
Expand All @@ -109,3 133,8 @@ func (c *Operations) RequeueAPIErr(err error) {
}
c.Done()
}

// Error returns the last recorded error, if any
func (c *Operations) Error() error {
return c.err
}
4 changes: 2 additions & 2 deletions queue/controls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 25,7 @@ func ExampleNewOperations() {
queue.Done(key)
}, func(duration time.Duration) {
queue.AddAfter(key, duration)
})
}, cancel)

// typically called from a handler
handler.NewHandlerFromFunc(func(ctx context.Context) {
Expand Down Expand Up @@ -57,7 57,7 @@ func ExampleNewQueueOperationsCtx() {
queue.Done(key)
}, func(duration time.Duration) {
queue.AddAfter(key, duration)
}))
}, cancel))

// queue controls are passed via context
handler.NewHandlerFromFunc(func(ctx context.Context) {
Expand Down
65 changes: 65 additions & 0 deletions queue/fake/zz_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 45eab07

Please sign in to comment.