Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roachprod: avoid syncing all the time #137956

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cmd/roachprod/cli/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 602,7 @@ if the user would like to update the keys on the remote hosts.

Args: cobra.ExactArgs(1),
Run: wrap(func(cmd *cobra.Command, args []string) (retErr error) {
return roachprod.SetupSSH(context.Background(), config.Logger, args[0])
return roachprod.SetupSSH(context.Background(), config.Logger, args[0], true /* sync */)
}),
}
}
Expand Down
84 changes: 75 additions & 9 deletions pkg/roachprod/cloud/cluster_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 12,7 @@ import (
"regexp"
"sort"
"strings"
"sync"
"text/tabwriter"
"time"

Expand Down Expand Up @@ -323,7 324,14 @@ type ClusterCreateOpts struct {
// an additional "workload node". This node often times does not need the same CPU count as
// the rest of the cluster. i.e. it is overkill for a 3 node 32 CPU cluster to have a 32 CPU
// workload node, but a 50 node 8 CPU cluster might find a 8 CPU workload node inadequate.
func CreateCluster(l *logger.Logger, opts []*ClusterCreateOpts) error {
func CreateCluster(l *logger.Logger, opts []*ClusterCreateOpts) (*Cluster, error) {

c := &Cluster{
Name: opts[0].CreateOpts.ClusterName,
CreatedAt: time.Now(),
Lifetime: opts[0].CreateOpts.Lifetime,
}

// Keep track of the total number of nodes created, as we append all cluster names
// with the node count.
var nodesCreated int
Expand All @@ -334,7 342,7 @@ func CreateCluster(l *logger.Logger, opts []*ClusterCreateOpts) error {
for _, o := range opts {
providerCount := len(o.CreateOpts.VMProviders)
if providerCount == 0 {
return errors.New("no VMProviders configured")
return c, errors.New("no VMProviders configured")
}

// Allocate vm names over the configured providers
Expand All @@ -346,14 354,43 @@ func CreateCluster(l *logger.Logger, opts []*ClusterCreateOpts) error {
p = (p 1) % providerCount
}

var vms vm.List
var vmsLock sync.Mutex
// Create VMs in parallel across all providers.
// Each provider will return the list of VMs it created, and we append
// them to the cached Cluster.
if err := vm.ProvidersParallel(o.CreateOpts.VMProviders, func(p vm.Provider) error {
return p.Create(l, vmLocations[p.Name()], o.CreateOpts, o.ProviderOptsContainer[p.Name()])
vmList, err := p.Create(l, vmLocations[p.Name()], o.CreateOpts, o.ProviderOptsContainer[p.Name()])
if err != nil {
return err
}
vmsLock.Lock()
defer vmsLock.Unlock()
vms = append(vms, vmList...)
return nil
}); err != nil {
return err
return c, err
}

c.VMs = append(c.VMs, vms...)
}

return nil
// Check that we did create the expected number of VMs
if len(c.VMs) != nodesCreated {
return c, errors.Errorf("Created %d VMs, while %d nodes were expected", len(c.VMs), nodesCreated)
}

// If VMs were created, set the cluster user to the user of the first VM.
// This is the method also used in ListCloud() above
if len(c.VMs) > 0 {
var err error
c.User, err = c.VMs[0].UserName()
if err != nil {
return nil, err
}
}

return c, nil
}

// GrowCluster adds new nodes to an existing cluster.
Expand All @@ -374,9 411,26 @@ func GrowCluster(l *logger.Logger, c *Cluster, numNodes int) error {
c.Name, gce.ProviderName)
}
}
return vm.ForProvider(provider, func(p vm.Provider) error {
return p.Grow(l, c.VMs, c.Name, names)

var vmsLock sync.Mutex
err := vm.ForProvider(provider, func(p vm.Provider) error {
addedVms, err := p.Grow(l, c.VMs, c.Name, names)
if err != nil {
return err
}

// Update the list of VMs in the cluster.
vmsLock.Lock()
defer vmsLock.Unlock()
c.VMs = append(c.VMs, addedVms...)

return nil
})
if err != nil {
return err
}

return nil
}

// ShrinkCluster removes tail nodes from an existing cluster.
Expand All @@ -398,9 452,16 @@ func ShrinkCluster(l *logger.Logger, c *Cluster, numNodes int) error {
// Always delete from the tail.
vmsToDelete := c.VMs[len(c.VMs)-numNodes:]

return vm.ForProvider(provider, func(p vm.Provider) error {
err := vm.ForProvider(provider, func(p vm.Provider) error {
return p.Shrink(l, vmsToDelete, c.Name)
})
if err != nil {
return err
}

// Update the list of VMs in the cluster.
c.VMs = c.VMs[:len(c.VMs)-numNodes]
return nil
}

// DestroyCluster TODO(peter): document
Expand Down Expand Up @@ -454,7 515,12 @@ func DestroyCluster(l *logger.Logger, c *Cluster) error {
func ExtendCluster(l *logger.Logger, c *Cluster, extension time.Duration) error {
// Round new lifetime to nearest second.
newLifetime := (c.Lifetime extension).Round(time.Second)
return vm.FanOut(c.VMs, func(p vm.Provider, vms vm.List) error {
err := vm.FanOut(c.VMs, func(p vm.Provider, vms vm.List) error {
return p.Extend(l, vms, newLifetime)
})
if err != nil {
return err
}
c.Lifetime = newLifetime
return nil
}
6 changes: 6 additions & 0 deletions pkg/roachprod/clusters_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 113,12 @@ func loadCluster(name string) (*cloud.Cluster, error) {
return c, nil
}

// deleteCluster deletes the file in config.ClusterDir for a given cluster name.
func deleteCluster(name string) error {
filename := clusterFilename(name)
return os.Remove(filename)
}

// shouldIgnoreCluster returns true if the cluster references a project that is
// not active. This is relevant if we have a cluster that was cached when
// another project was in use.
Expand Down
59 changes: 44 additions & 15 deletions pkg/roachprod/roachprod.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,17 639,22 @@ func Reset(l *logger.Logger, clusterName string) error {
}

// SetupSSH sets up the keys and host keys for the vms in the cluster.
func SetupSSH(ctx context.Context, l *logger.Logger, clusterName string) error {
func SetupSSH(ctx context.Context, l *logger.Logger, clusterName string, sync bool) error {
if err := LoadClusters(); err != nil {
return err
}
cld, err := Sync(l, vm.ListOptions{})
if err != nil {
return err
var cloudCluster *cloud.Cluster
if sync {
cld, err := Sync(l, vm.ListOptions{})
if err != nil {
return err
}
cloudCluster, _ = cld.Clusters[clusterName]
} else {
cloudCluster, _ = readSyncedClusters(clusterName)
}

cloudCluster, ok := cld.Clusters[clusterName]
if !ok {
if cloudCluster == nil {
return fmt.Errorf("could not find %s in list of cluster", clusterName)
}

Expand All @@ -674,7 679,7 @@ func SetupSSH(ctx context.Context, l *logger.Logger, clusterName string) error {
return err
}

if err = cloudCluster.PrintDetails(l); err != nil {
if err := cloudCluster.PrintDetails(l); err != nil {
return err
}
// Run ssh-keygen -R serially on each new VM in case an IP address has been recycled
Expand Down Expand Up @@ -715,6 720,8 @@ func Extend(l *logger.Logger, clusterName string, lifetime time.Duration) error
if err := LoadClusters(); err != nil {
return err
}

// We force a sync to ensure cluster was not previously extended
c, err := getClusterFromCloud(l, clusterName)
if err != nil {
return err
Expand All @@ -724,8 731,8 @@ func Extend(l *logger.Logger, clusterName string, lifetime time.Duration) error
return err
}

// Reload the clusters and print details.
c, err = getClusterFromCloud(l, clusterName)
// Save the cluster to the cache and print details.
err = saveCluster(l, c)
if err != nil {
return err
}
Expand Down Expand Up @@ -1450,7 1457,16 @@ func Destroy(
// and let the caller retry.
cld, _ = cloud.ListCloud(l, vm.ListOptions{IncludeEmptyClusters: true})
}
return destroyCluster(ctx, cld, l, name)
err := destroyCluster(ctx, cld, l, name)
if err != nil {
return errors.Wrapf(err, "unable to destroy cluster %s", name)
}

err = deleteCluster(name)
if err != nil {
return errors.Wrapf(err, "unable to delete cluster %s from local cache", name)
}
return nil
}); err != nil {
return err
}
Expand Down Expand Up @@ -1641,16 1657,23 @@ func Create(
}

l.Printf("Creating cluster %s with %d nodes...", clusterName, numNodes)
if createErr := cloud.CreateCluster(l, opts); createErr != nil {
c, createErr := cloud.CreateCluster(l, opts)
if createErr != nil {
return createErr
}

// Save the cluster to the cache.
err := saveCluster(l, c)
if err != nil {
return errors.Wrapf(err, "failed to save cluster %s", clusterName)
}

if config.IsLocalClusterName(clusterName) {
// No need for ssh for local clusters.
return LoadClusters()
}
l.Printf("Created cluster %s; setting up SSH...", clusterName)
return SetupSSH(ctx, l, clusterName)
return SetupSSH(ctx, l, clusterName, false /* sync */)
}

func Grow(
Expand All @@ -1676,7 1699,12 @@ func Grow(
// reload the clusters before returning.
err = LoadClusters()
default:
err = SetupSSH(ctx, l, clusterName)
// Save the cluster to the cache.
err = saveCluster(l, &c.Cluster)
if err != nil {
return err
}
err = SetupSSH(ctx, l, clusterName, false /* sync */)
}
if err != nil {
return err
Expand Down Expand Up @@ -1712,8 1740,9 @@ func Shrink(ctx context.Context, l *logger.Logger, clusterName string, numNodes
// clusters before returning.
return LoadClusters()
}
_, err = Sync(l, vm.ListOptions{})
return err

// Save the cluster to the cache.
return saveCluster(l, &c.Cluster)
}

// GC garbage-collects expired clusters, unused SSH key pairs in AWS, and unused
Expand Down
1 change: 1 addition & 0 deletions pkg/roachprod/vm/aws/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 24,7 @@ go_library(
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_spf13_pflag//:pflag",
"@org_golang_x_exp//maps",
"@org_golang_x_sync//errgroup",
"@org_golang_x_time//rate",
],
Expand Down
Loading
Loading