Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bugfix:Autograding] prioritize autograding jobs #10592

Merged
merged 6 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion autograder/autograder/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 193,31 @@ def __init__(self, config: Config, workers: List[Worker]):
def _assign_jobs(self, jobs: List[Job]):
idle_workers = [worker for worker in self.workers if worker.is_idle()]

jobs.sort(key=lambda j: os.stat(j.path).st_ctime_ns)
# sort jobs by priority
# 1. VCS gradeable that has not yet been checked out
# 2. Interative / non-regrade job
# 3. Time entering queue
# 4. Ppath name
jobs.sort(key=lambda j:
(
not ("vcs_checkout" in j.queue_obj and
j.queue_obj["vcs_checkout"] and
not ("checkout_total_size" in j.queue_obj)),
"regrade" in j.queue_obj and j.queue_obj["regrade"],
j.queue_obj['queue_time'],
j.path
),
reverse=False
)

# for testing / debugging
print("JOBS QUEUE count=" str(len(jobs)))
position = 0
for j in jobs:
position = 1
regrade = "regrade" in j.queue_obj and j.queue_obj["regrade"]
qt = j.queue_obj['queue_time']
print("JOB " str(position) " " str(regrade) " " str(qt) " " j.path)

for job in jobs:
if len(idle_workers) == 0:
Expand Down
49 changes: 13 additions & 36 deletions autograder/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 70,15 @@
CONFIG = None


def generate_queue_file(name: str, *, required_capabilities: str):
def generate_queue_file(name: str, *, required_capabilities: str, queue_time: str):
queue_obj = {
'required_capabilities': required_capabilities,
'queue_time': queue_time
}
with open(os.path.join(TO_BE_GRADED, name), 'w') as f:
json.dump(queue_obj, f)


def generate_broken_queue_file(name: str):
with open(os.path.join(TO_BE_GRADED, name), 'w') as f:
f.write("This isn't a valid JSON file.\n")


class TestScheduler(unittest.TestCase):
@classmethod
def tearDownClass(cls):
Expand Down Expand Up @@ -183,7 179,7 @@ def test_fcfs_simple(self, MockProcess: mock.Mock):
scheduler = FCFSScheduler(CONFIG, [worker])

# Place a dummy queue file in the queue
generate_queue_file("test", required_capabilities='default')
generate_queue_file("test", required_capabilities='default', queue_time='')

# Invoke the scheduler's update mechanism
scheduler.update_and_schedule()
Expand All @@ -209,7 205,7 @@ def test_fcfs_multiworker(self, MockProcess: mock.Mock):

scheduler = FCFSScheduler(CONFIG, workers)

generate_queue_file("test", required_capabilities='default')
generate_queue_file("test", required_capabilities='default', queue_time='')

scheduler.update_and_schedule()

Expand All @@ -234,9 230,9 @@ def test_fcfs_capacity(self, MockProcess: mock.Mock):

scheduler = FCFSScheduler(CONFIG, [worker])

generate_queue_file("first", required_capabilities='default')
generate_queue_file("first", required_capabilities='default', queue_time='')
time.sleep(0.1) # Kinda ugly, but helps avoid temporal aliasing by the OS
generate_queue_file("second", required_capabilities='default')
generate_queue_file("second", required_capabilities='default', queue_time='')

scheduler.update_and_schedule()

Expand Down Expand Up @@ -277,11 273,11 @@ def test_fcfs_capacity_multi(self, MockProcess: mock.Mock):

scheduler = FCFSScheduler(CONFIG, workers)

generate_queue_file("first", required_capabilities='default')
generate_queue_file("first", required_capabilities='default', queue_time='')
time.sleep(0.1)
generate_queue_file("second", required_capabilities='default')
generate_queue_file("second", required_capabilities='default', queue_time='')
time.sleep(0.1)
generate_queue_file("third", required_capabilities='default')
generate_queue_file("third", required_capabilities='default', queue_time='')

scheduler.update_and_schedule()

Expand Down Expand Up @@ -310,11 306,11 @@ def test_fcfs_constraints(self, MockProcess: mock.Mock):

scheduler = FCFSScheduler(CONFIG, workers)

generate_queue_file("first", required_capabilities='zero')
generate_queue_file("second", required_capabilities='one')
generate_queue_file("third", required_capabilities='two')
generate_queue_file("first", required_capabilities='zero', queue_time='')
generate_queue_file("second", required_capabilities='one', queue_time='')
generate_queue_file("third", required_capabilities='two', queue_time='')
time.sleep(0.1)
generate_queue_file("fourth", required_capabilities='zero')
generate_queue_file("fourth", required_capabilities='zero', queue_time='')

scheduler.update_and_schedule()

Expand All @@ -325,22 321,3 @@ def test_fcfs_constraints(self, MockProcess: mock.Mock):
all_worker_files = sum([os.listdir(worker.folder) for worker in workers], [])
self.assertNotIn("fourth", all_worker_files)

@mock.patch('multiprocessing.Process')
def test_fcfs_ignore_invalid_queue_file(self, MockProcess: mock.Mock):
"""Test that the scheduler properly ignores invalid queue files."""
worker_proc = MockProcess()
worker_proc.is_alive = mock.MagicMock(return_value=True)

worker = Worker(CONFIG, 'worker_0', WORKER_PROPERTIES['worker_0'], worker_proc)

scheduler = FCFSScheduler(CONFIG, [worker])

generate_broken_queue_file("first")
time.sleep(0.1)
generate_queue_file("second", required_capabilities='default')

scheduler.update_and_schedule()

worker_files = os.listdir(worker.folder)
self.assertNotIn("first", worker_files)
self.assertIn("second", worker_files)
Loading