-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
migration controller: add a secondary queue to keep the main one managable #11616
base: main
Are you sure you want to change the base?
Conversation
Skipping CI for Draft Pull Request. |
/test all |
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
@@ -222,19 224,23 @@ func (c *MigrationController) runWorker() { | |||
} | |||
|
|||
func (c *MigrationController) Execute() bool { | |||
key, quit := c.Queue.Get() | |||
queue := &c.Queue | |||
if c.Queue.Len() == 0 && c.pendingQueue.Len() > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this prioritize the queue
but can cause that a multiple workers can be stuck on the pendingQueue
.
Maybe we don't want to do it. Maybe simple stack that pops when a migration is finished would be sufficient.
Edit: queue is required in order to be able to cycle candidates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having 2 queues is scary I agree, but we need a way to keep the main queue short, or we'll keep starving active migrations when we have a lot in pending.
So far, this is the only thing I could come up with that doesn't fundamentally change how we do controllers...
Keep in mind this is a draft, I opened it mostly to see how many tests would blow up (none it seems!), I have not yet committed to this approach.
I'm not sure what you mean by simple stack that pops when a migration is finished
, is that describing current behavior or is it a new idea to fix the issue at hand?
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you mean by simple stack that pops when a migration is finished, is that describing current behavior or is it a new idea to fix the issue at hand?
Yes, I am thinking about a possible solution.
I think it comes down to, we need a way to sleep migrations and awake them once a migration is finished. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If by "sleep" you mean "not re-enqueue them" then yes, anything that keeps them out of the queue will do.
We could keep that secondary queue and move 1 item from it to the main queue when a migration finishes, however that scares more than the current state of this PR...
if c.Queue.Len() == 0 && c.pendingQueue.Len() > 0 { | ||
queue = &c.pendingQueue | ||
} | ||
key, quit := (*queue).Get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually need (*queue)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surprisingly yes, not sure why, thought go would deref for you as usual...
Can you elaborate on the "each every second" part? |
Just a bad way of explaining these 3 lines: for i := 0; i < threadiness; i {
go wait.Until(c.runWorker, time.Second, stopCh)
} |
That piece of code means if the worker ever exits (it should not happen), then it will be restarted in a second. |
🤦 missed the for loop... That changes a lot of things, it also means that as long as the queue is not empty, the 3 threads are all running in a hot loop! |
I went looking for a field estimate of how often ever migration is seen by the migration controller and found that, when starting X migrations, each migration is seen every X/100 minutes.
When running a single queue, that X/100 number is the same on running migrations and pending ones... The solution proposed by this PR effectively eliminates the problem. |
/test pull-kubevirt-e2e-k8s-1.29-sig-compute-migrations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @jean-edouard! Some comment below.
I had fun proposing an alternative solution regarding the implementation of the pending queue:
If we create a struct that manages the lock we can safely add, flush and read without taking care of the lock.
type pendingQueue struct {
queue []string
queueLock *sync.Mutex
}
func newPendingQueue() *pendingQueue {
return &pendingQueue{
queue: []string{},
queueLock: &sync.Mutex{},
}
}
func (pq *pendingQueue) GetQueue() []string {
pq.queueLock.Lock()
defer pq.queueLock.Unlock()
return pq.queue
}
func (pq *pendingQueue) FlushTo(queue workqueue.RateLimitingInterface) {
pq.queueLock.Lock()
defer pq.queueLock.Unlock()
for _, key := range pq.queue {
queue.AddRateLimited(key)
}
pq.queue = []string{}
}
func (pq *pendingQueue) Add(item string) {
pq.queueLock.Lock()
defer pq.queueLock.Unlock()
pq.queue = append(pq.queue, item)
}
In this case the migrationController will have:
type MigrationController struct {
templateService services.TemplateService
clientset kubecli.KubevirtClient
Queue workqueue.RateLimitingInterface
pendingQueue *pendingQueue
and the addition and flush:
c.pendingQueue.Add(key)
c.pendingQueue.FlushTo(c.Queue)
Also, in the test we can read the pending queue without taking care of the lock:
Expect(controller.pendingQueue.GetQueue()).To(Equal([]string{"default/testmigrationpending"}))
WDYT?
Regarding the vmi addition to the queue, I think you are right: for how we use it we don't need to add it to the queue.
Thank you!
func (c *MigrationController) flushPendingQueue(oldPhase, newPhase virtv1.VirtualMachineInstanceMigrationPhase) { | ||
if newPhase != oldPhase && (newPhase == virtv1.MigrationFailed || newPhase == virtv1.MigrationSucceeded) { | ||
c.pendingQueueLock.Lock() | ||
for _, key := range c.pendingQueue { | ||
c.Queue.AddRateLimited(key) | ||
} | ||
c.pendingQueue = []string{} | ||
c.pendingQueueLock.Unlock() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just an alternative solution, feel free to choose:
func (c *MigrationController) flushPendingQueue(oldVMIM, newVMIM *virtv1.VirtualMachineInstanceMigration) {
if oldVMIM.Status.Phase == newVMIM.Status.Phase || !newVMIM.IsFinal() {
return
}
c.pendingQueueLock.Lock()
for _, key := range c.pendingQueue {
c.Queue.AddRateLimited(key)
}
c.pendingQueue = []string{}
c.pendingQueueLock.Unlock()
}
@@ -504,6 521,8 @@ func (c *MigrationController) updateStatus(migration *virtv1.VirtualMachineInsta | |||
controller.SetVMIMigrationPhaseTransitionTimestamp(migration, migrationCopy) | |||
controller.SetSourcePod(migrationCopy, vmi, c.podIndexer) | |||
|
|||
c.flushPendingQueue(migration.Status.Phase, migrationCopy.Status.Phase) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we perform this after the UpdateStatus
is performed?
I mean, if the update status fails there is the possibility that the migrations we have enqueued will be pushed back to the pending queue. No?
err := controller.migrationIndexer.Add(migration) | ||
Expect(err).To(Not(HaveOccurred())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
Expect(controller.migrationIndexer.Add(migration)).To(Succeed())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I wrote matches what the other functions do.
We could change them all, but it might be a bit out of scope for this PR.
Also, I actually find the 2-line version to be more readable somehow...
I don't feel strongly about it though, let me know if you do!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No strong opinion. It was just a nit, maybe for the future. I'm fine with leaving like this
|
||
By("Executing the controller and expecting the pending migration in the secondary queue") | ||
controller.Execute() | ||
Expect(controller.pendingQueue).To(Equal([]string{"default/testmigrationpending"})) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we need to lock before reading?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. The only thing that touches pendingQueue
is Execute()
, which we run manually in our own thread.
There's no controller loop running and each test has its own controller.
The only concern could be about the informer, but it doesn't touch pendingQueue
(and has no reason to trigger right now anyway).
Or am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right. It was more of a theoretical comment than anything else :)
19f0dbe
to
c65fa38
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good from my side.
Just a question below. Thanks
@@ -332,7 335,7 @@ func (c *MigrationController) execute(key string) error { | |||
return err | |||
} | |||
|
|||
needsSync := c.podExpectations.SatisfiedExpectations(key) && vmiExists | |||
needsSync := c.podExpectations.SatisfiedExpectations(key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for confirmation: Can you explain this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We return when it's false just above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @jean-edouard, and sorry for chiming in late!
A few questions:
-
I wonder if we can get to a situation where migrations are "forgotten" in the pending queue. For example, consider the following:
ParallelOutboundMigrationsPerNode
is set to 1 (to simplify the example).- Migration A is running.
- Migration B is created.
- Migration A finishes successfully.
- Migration B is enqueued to the pending queue.
Perhaps we should, just in case, periodically check if the main queue is empty and if it is flush the pending queue?
-
Perhaps it's worth mentioning in the PR description that "pending migrations" in this context means only if we have more migration than defined in either
ParallelOutboundMigrationsPerNode
orParallelOutboundMigrationsPerCluster
. That is, migrations that are pending because of other reasons (i.e. there's already a migration taking place) do not count as pending in this context. It wasn't obvious to me. -
Would it be valuable to test it with a functional test?
@@ -516,6 528,8 @@ func (c *MigrationController) updateStatus(migration *virtv1.VirtualMachineInsta | |||
} | |||
} | |||
|
|||
c.flushPendingQueue(migration, migrationCopy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's say that there are X running migrations (on the main queue) and Y pending migrations, and that Y is much bigger than X.
In this case IIUC we'd return to the state where we have a queue that's blown out with pending migrations. They will, in turn, be enqueued again to the pending queue, but perhaps we can limit the amount of migrations we flush into the main queue in a better way.
For example, how about we enqueue ParallelOutboundMigrationsPerNode
, or even double than that, just to ensure we don't blow up the main queue again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that was a compromise we made to avoid forgotten migrations... And it was still not enough, as you found!
Note that today we just keep re-enqueuing everything always, so this is still an improvement!
Maybe the added flush-pending-on-empty-queue will allow for partial flushes, I need to think about it!
@@ -2162,6 2161,59 @@ var _ = Describe("Migration watcher", func() { | |||
expectTargetPodWithSELinuxLevel(vmi.Namespace, vmi.UID, migration.UID, "") | |||
}) | |||
}) | |||
|
|||
Context("Secondary queue", func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
I wonder if it'd be valuable to create another test with multiple pending migrations that are expected to be flushed, just to cover this case as well (or maybe I'm being over-paranoid here?)
func (pq *PendingQueue) GetQueue() []string { | ||
pq.queueLock.Lock() | ||
defer pq.queueLock.Unlock() | ||
return pq.queue | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're handing out the original slice here rather than a copy, so the lock is redundant, or you should switch to returning a copy.
In addition, maybe it's just me, but I find the function name a bit confusing, since queue
usually means a queue object (like RateLimitingInterface
) although here it's a string slice, but that's an implementation detail.
WDYT about
func (pq *PendingQueue) GetQueue() []string { | |
pq.queueLock.Lock() | |
defer pq.queueLock.Unlock() | |
return pq.queue | |
} | |
func (pq *PendingQueue) GetKeys() []string { | |
pq.queueLock.Lock() | |
defer pq.queueLock.Unlock() | |
return slices.Clone(pq.queue) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I almost made the same comment to Federico when he suggested this, but decided to test it in the Go playground, and the returned slice does appear to be a copy, unaffected by changes to the original slice.
Maybe slices are passed as "references" when calling a function, but copied on return?
Either way, as discussed, this function is only used in unit tests, where the lock is not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://play.golang.com/p/wl2VYExfZSg
Are you sure? :)
Code:
type obj struct {
s []string
}
func getObj() obj {
return obj{
s: []string{"one", "two"},
}
}
func (o *obj) getSlice() []string {
return o.s
}
func main() {
// Initiate an object with a slice
o := getObj()
// Return the slice with copying it
s := o.getSlice()
// Change a slice element
s[0] = "ten"
// Print the object's internal slice
fmt.Println(o.getSlice())
}
Output:
[ten two]
This started as just an experiment, it hasn't been in a reviewable state until recently!
You're right... It's like the 4th redesign of this, and it still doesn't cover all cases!
Very good point, will do, thanks
I did't write any to avoid the headache of keeping migrations in the pending/running phase), but seeing what you found in 1., yes we should probably functest! |
c65fa38
to
bd7cbb9
Compare
New changes are detected. LGTM label has been removed. |
bd7cbb9
to
5801732
Compare
Stale issues rot after 30d of inactivity. If this issue is safe to close now please do so with /lifecycle rotten |
5801732
to
51fea1a
Compare
…gable When starting a large number of migrations, most of them will stay pending for a long time. Since pending migrations keep consuming controller execution cycles to just get re-enqueued, there can be not enough cycles left for active migrations to move forward in time. This commit introduces a second queue dedicated to the pending migrations. This second queue will only be consumed when a migration finishes. Signed-off-by: Jed Lejosne <[email protected]>
Signed-off-by: Jed Lejosne <[email protected]>
Signed-off-by: Jed Lejosne <[email protected]>
51fea1a
to
943a57c
Compare
hey @jean-edouard! |
Sure yeah! It's been a tricky one... |
What this PR does
When starting a large number of migrations, most of them will stay pending for a long time. Since pending migrations keep consuming controller execution cycles to just get re-enqueued, there can be not enough cycles left for active migrations to move forward in time.
This PR introduces a second queue dedicated to the pending migrations. This second queue will only be consumed when the main one is empty. That's a bit harsh, and in extreme scenarios could lead to fewer than 5 migrations running in parallel, but it's better than spinlocking virt-controller doing pointless re-enqueues.
Fixes #
Why we need it and why it was done in this way
The following tradeoffs were made:
The following alternatives were considered:
Links to places where the discussion took place:
Special notes for your reviewer
Checklist
This checklist is not enforcing, but it's a reminder of items that could be relevant to every PR.
Approvers are expected to review this list.
Release note