Skip to content

Commit

Permalink
Actually starting to use the deallocator to clean up services
Browse files Browse the repository at this point in the history
**- What I did**

This patch actually stars to use the deallocator to clean up services, as
well as service-level resources (as of now, only networks).

**- How I did it**

A previous patch (#2759) introduced a
new component, the deallocator, responsible for cleaning up services and
service-level resources. This patch is actually starting to make use of that
component.

Since the deallocator used to rely on the reaper deleting the tasks belonging
to services that had been marked for removal, a previous version of this patch
was modifying the task reaper quite heavily to also keep track of such services
(needed since otherwise the reaper would fail to clean up all of them, instead
keeping some for history tracking purposes). However, it soon appeared that
this was not the best approach:
 * this creates a hidden coupling between the reaper and the deallocator
 * it"s also not the best user experience to suddenly remove all of a service"s
   task history while shutting down, for not apparent reason to the user

For these reasons, this patch instead amends the deallocator to also look at tasks status when keeping track of how many alive tasks remain to a service.

**- How to test it**

Updated tests.

**- Description for the changelog**

Services & networks are no longer deleted immediately when a user requests their
deletion; instead, they are deleted when all their tasks are actually shut down.

Signed-off-by: Jean Rouge <[email protected]>
  • Loading branch information
wk8 committed Nov 15, 2018
1 parent bc032e2 commit 2ba1cd9
Show file tree
Hide file tree
Showing 15 changed files with 369 additions and 124 deletions.
4 changes: 4 additions & 0 deletions cmd/swarmctl/network/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func printNetworkSummary(network *api.Network) {
common.FprintfIfNotEmpty(w, "ID\t: %s\n", network.ID)
common.FprintfIfNotEmpty(w, "Name\t: %s\n", spec.Annotations.Name)

if network.PendingDelete {
common.FprintfIfNotEmpty(w, "[Network %s marked for removal]\n", spec.Annotations.Name)
}

fmt.Fprintln(w, "Spec:\t")
if len(spec.Annotations.Labels) > 0 {
fmt.Fprintln(w, " Labels:\t")
Expand Down
18 changes: 12 additions & 6 deletions cmd/swarmctl/service/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@ func printServiceSummary(service *api.Service, running int) {
w := tabwriter.NewWriter(os.Stdout, 8, 8, 8, ' ', 0)
defer w.Flush()

task := service.Spec.Task
spec := service.Spec
common.FprintfIfNotEmpty(w, "ID\t: %s\n", service.ID)
common.FprintfIfNotEmpty(w, "Name\t: %s\n", service.Spec.Annotations.Name)
if len(service.Spec.Annotations.Labels) > 0 {
common.FprintfIfNotEmpty(w, "Name\t: %s\n", spec.Annotations.Name)

if service.PendingDelete {
common.FprintfIfNotEmpty(w, "[Service %s marked for removal]\n", spec.Annotations.Name)
}

if len(spec.Annotations.Labels) > 0 {
fmt.Fprintln(w, "Labels\t")
for k, v := range service.Spec.Annotations.Labels {
for k, v := range spec.Annotations.Labels {
fmt.Fprintf(w, " %s\t: %s\n", k, v)
}
}
Expand All @@ -51,7 +56,8 @@ func printServiceSummary(service *api.Service, running int) {

fmt.Fprintln(w, "Template\t")
fmt.Fprintln(w, " Container\t")
ctr := service.Spec.Task.GetContainer()
task := spec.Task
ctr := task.GetContainer()
common.FprintfIfNotEmpty(w, " Image\t: %s\n", ctr.Image)
common.FprintfIfNotEmpty(w, " Command\t: %q\n", strings.Join(ctr.Command, " "))
common.FprintfIfNotEmpty(w, " Args\t: [%s]\n", strings.Join(ctr.Args, ", "))
Expand Down Expand Up @@ -90,7 +96,7 @@ func printServiceSummary(service *api.Service, running int) {
printResources(w, res.Limits)
}
}
if len(service.Spec.Task.Networks) > 0 {
if len(spec.Task.Networks) > 0 {
fmt.Fprint(w, " Networks:")
for _, n := range service.Spec.Task.Networks {
fmt.Fprintf(w, " %s", n.Target)
Expand Down
13 changes: 10 additions & 3 deletions manager/controlapi/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,10 @@ func (s *Server) removeNetwork(id string) error {
return status.Errorf(codes.Internal, "could not find services using network %s: %v", id, err)
}

if len(services) != 0 {
return status.Errorf(codes.FailedPrecondition, "network %s is in use by service %s", id, services[0].ID)
for _, service := range services {
if !service.PendingDelete {
return status.Errorf(codes.FailedPrecondition, "network %s is in use by service %s", id, service.ID)
}
}

tasks, err := store.FindTasks(tx, store.ByReferencedNetworkID(id))
Expand All @@ -214,7 +216,12 @@ func (s *Server) removeNetwork(id string) error {
}
}

return store.DeleteNetwork(tx, id)
network := store.GetNetwork(tx, id)
if network == nil {
return status.Errorf(codes.NotFound, "network %s not found", id)
}
network.PendingDelete = true
return store.UpdateNetwork(tx, network)
})
}

Expand Down
21 changes: 20 additions & 1 deletion manager/controlapi/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,30 @@ func TestRemoveNetworkWithAttachedService(t *testing.T) {
assert.NoError(t, err)
assert.NotEqual(t, nr.Network, nil)
assert.NotEqual(t, nr.Network.ID, "")
createServiceInNetwork(t, ts, "name", "image", nr.Network.ID, 1)
createServiceInNetwork(t, ts, "service1", "image", nr.Network.ID, 1)
_, err = ts.Client.RemoveNetwork(context.Background(), &api.RemoveNetworkRequest{NetworkID: nr.Network.ID})
assert.Error(t, err)
}

func TestRemoveNetworkWithAttachedServiceMarkedForRemoval(t *testing.T) {
ts := newTestServer(t)
defer ts.Stop()
nr, err := ts.Client.CreateNetwork(context.Background(), &api.CreateNetworkRequest{
Spec: createNetworkSpec("testnet5"),
})
assert.NoError(t, err)
assert.NotEqual(t, nr.Network, nil)
assert.NotEqual(t, nr.Network.ID, "")
service := createServiceInNetwork(t, ts, "service2", "image", nr.Network.ID, 1)
// then let's delete the service
r, err := ts.Client.RemoveService(context.Background(), &api.RemoveServiceRequest{ServiceID: service.ID})
assert.NoError(t, err)
assert.NotNil(t, r)
// now we should be able to delete the network
_, err = ts.Client.RemoveNetwork(context.Background(), &api.RemoveNetworkRequest{NetworkID: nr.Network.ID})
assert.NoError(t, err)
}

func TestCreateNetworkInvalidDriver(t *testing.T) {
ts := newTestServer(t)
defer ts.Stop()
Expand Down
18 changes: 14 additions & 4 deletions manager/controlapi/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,7 @@ func (s *Server) GetService(ctx context.Context, request *api.GetServiceRequest)
// - Returns `NotFound` if the Service is not found.
// - Returns `InvalidArgument` if the ServiceSpec is malformed.
// - Returns `Unimplemented` if the ServiceSpec references unimplemented features.
// - Returns `FailedPrecondition` if the Service is marked for removal
// - Returns an error if the update fails.
func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRequest) (*api.UpdateServiceResponse, error) {
if request.ServiceID == "" || request.ServiceVersion == nil {
Expand Down Expand Up @@ -751,6 +752,12 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe
return status.Errorf(codes.NotFound, "service %s not found", request.ServiceID)
}

// we couldn't do this any sooner, as we do need to be holding the lock
// when checking for this flag
if service.PendingDelete {
return status.Errorf(codes.FailedPrecondition, "service %s is marked for removal", request.ServiceID)
}

// It's not okay to update Service.Spec.Networks on its own.
// However, if Service.Spec.Task.Networks is also being
// updated, that's okay (for example when migrating from the
Expand Down Expand Up @@ -844,12 +851,15 @@ func (s *Server) RemoveService(ctx context.Context, request *api.RemoveServiceRe
}

err := s.store.Update(func(tx store.Tx) error {
return store.DeleteService(tx, request.ServiceID)
service := store.GetService(tx, request.ServiceID)
if service == nil {
return status.Errorf(codes.NotFound, "service %s not found", request.ServiceID)
}
// mark service for removal
service.PendingDelete = true
return store.UpdateService(tx, service)
})
if err != nil {
if err == store.ErrNotExist {
return nil, status.Errorf(codes.NotFound, "service %s not found", request.ServiceID)
}
return nil, err
}
return &api.RemoveServiceResponse{}, nil
Expand Down
13 changes: 13 additions & 0 deletions manager/controlapi/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,19 @@ func TestUpdateService(t *testing.T) {
assert.Equal(t, codes.InvalidArgument, testutils.ErrorCode(err))
}

func TestUpdateServiceMarkedForRemoval(t *testing.T) {
ts := newTestServer(t)
defer ts.Stop()

service := createService(t, ts, "name", "image", 1)
r, err := ts.Client.RemoveService(context.Background(), &api.RemoveServiceRequest{ServiceID: service.ID})
assert.NoError(t, err)
assert.NotNil(t, r)

_, err = ts.Client.UpdateService(context.Background(), &api.UpdateServiceRequest{ServiceID: service.ID, Spec: &service.Spec, ServiceVersion: &service.Meta.Version})
assert.Equal(t, codes.FailedPrecondition, testutils.ErrorCode(err))
}

func TestServiceUpdateRejectNetworkChange(t *testing.T) {
ts := newTestServer(t)
defer ts.Stop()
Expand Down
81 changes: 61 additions & 20 deletions manager/deallocator/deallocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package deallocator

import (
"context"
"sync"

"github.com/docker/go-events"
"github.com/docker/swarmkit/api"
Expand Down Expand Up @@ -32,6 +33,9 @@ import (
type Deallocator struct {
store *store.MemoryStore

// closeOnce ensures that stopChan is closed only once
closeOnce sync.Once

// for services that are shutting down, we keep track of how many
// tasks still exist for them
services map[string]*serviceWithTaskCounts
Expand Down Expand Up @@ -94,6 +98,7 @@ func (deallocator *Deallocator) Run(ctx context.Context) error {

return
},
api.EventUpdateTask{},
api.EventDeleteTask{},
api.EventUpdateService{},
api.EventUpdateNetwork{})
Expand Down Expand Up @@ -129,7 +134,7 @@ func (deallocator *Deallocator) Run(ctx context.Context) error {
for {
select {
case event := <-eventsChan:
if updated, err := deallocator.processNewEvent(ctx, event); err == nil {
if updated, err := deallocator.handleEvent(ctx, event); err == nil {
deallocator.notifyEventChan(updated)
} else {
log.G(ctx).WithError(err).Errorf("error processing deallocator event %#v", event)
Expand All @@ -142,11 +147,16 @@ func (deallocator *Deallocator) Run(ctx context.Context) error {
}
}

// Stop stops the deallocator's routine
// FIXME (jrouge): see the comment on TaskReaper.Stop() and see when to properly stop this
// plus unit test on this!
// Stop stops the deallocator's routine and wait for the main loop to exit
// Stop can be called in two cases. One when the manager is
// shutting down, and the other when the manager (the leader) is
// becoming a follower. Since these two instances could race with
// each other, we use closeOnce here to ensure that TaskReaper.Stop()
// is called only once to avoid a panic.
func (deallocator *Deallocator) Stop() {
close(deallocator.stopChan)
deallocator.closeOnce.Do(func() {
close(deallocator.stopChan)
})
<-deallocator.doneChan
}

Expand Down Expand Up @@ -180,11 +190,21 @@ func (deallocator *Deallocator) processService(ctx context.Context, service *api
// better to clean up resources that shouldn't be cleaned up yet
// than ending up with a service and some resources lost in limbo forever
return true, deallocator.deallocateService(ctx, service)
} else if len(tasks) == 0 {
}

remainingTasks := 0
for _, task := range tasks {
if isTaskStillAlive(task) {
remainingTasks++
}
}

if remainingTasks == 0 {
// no tasks remaining for this service, we can clean it up
return true, deallocator.deallocateService(ctx, service)
}
deallocator.services[service.ID] = &serviceWithTaskCounts{service: service, taskCount: len(tasks)}

deallocator.services[service.ID] = &serviceWithTaskCounts{service: service, taskCount: remainingTasks}
return false, nil
}

Expand Down Expand Up @@ -263,24 +283,16 @@ func (deallocator *Deallocator) processNetwork(ctx context.Context, tx store.Tx,
return
}

// Processes new events, and dispatches to the right method depending on what
// Handles new events, and dispatches to the right method depending on what
// type of event it is.
// The boolean part of the return tuple indicates whether anything was actually
// removed from the store
func (deallocator *Deallocator) processNewEvent(ctx context.Context, event events.Event) (bool, error) {
func (deallocator *Deallocator) handleEvent(ctx context.Context, event events.Event) (bool, error) {
switch typedEvent := event.(type) {
case api.EventUpdateTask:
return deallocator.processTaskEvent(ctx, typedEvent.Task, typedEvent.OldTask)
case api.EventDeleteTask:
serviceID := typedEvent.Task.ServiceID

if serviceWithCount, present := deallocator.services[serviceID]; present {
if serviceWithCount.taskCount <= 1 {
delete(deallocator.services, serviceID)
return deallocator.processService(ctx, serviceWithCount.service)
}
serviceWithCount.taskCount--
}

return false, nil
return deallocator.processTaskEvent(ctx, nil, typedEvent.Task)
case api.EventUpdateService:
return deallocator.processService(ctx, typedEvent.Service)
case api.EventUpdateNetwork:
Expand All @@ -289,3 +301,32 @@ func (deallocator *Deallocator) processNewEvent(ctx context.Context, event event
return false, nil
}
}

// Common logic for handling task update/delete events
// oldTask is the task object as it was before its update or deletion
// newTask is nil for delete events, and the new object for updates
func (deallocator *Deallocator) processTaskEvent(ctx context.Context, newTask, oldTask *api.Task) (bool, error) {
serviceID := oldTask.ServiceID
serviceWithCount, present := deallocator.services[serviceID]

if present && isTaskStillAlive(oldTask) && (newTask == nil || !isTaskStillAlive(newTask)) {
// this task belongs to a service that's shutting down, and in addition,
// prior to its update or deletion it was still alive, but now it's
// not alive any more, so we decrement the counter of alive tasks for
// this service

if serviceWithCount.taskCount <= 1 {
delete(deallocator.services, serviceID)
return deallocator.processService(ctx, serviceWithCount.service)
}
serviceWithCount.taskCount--
}

return false, nil
}

// simple helper function to distinguish tasks that are still running
// from ones that are done
func isTaskStillAlive(task *api.Task) bool {
return task.Status.State <= api.TaskStateRunning
}
Loading

0 comments on commit 2ba1cd9

Please sign in to comment.