-
Notifications
You must be signed in to change notification settings - Fork 45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pg executor #84
base: main
Are you sure you want to change the base?
Pg executor #84
Conversation
2615083
to
bff0346
Compare
477cef2
to
60fa23e
Compare
b384e9c
to
056484d
Compare
87c9c88
to
f8a5889
Compare
@mattrasmus this would be very helpful for us. What is the approval mechanism here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the great work on the alternative to an AWS executor @gabriel-v!
I am evaluating a number of pipeline/workflow solutions right now and redun is at the top of the list with the only catch being that there is no self-hosted executor solution.
In hope to get this feature upstream, I provide some code review. I also took the liberty to rebase it onto master and resolved the conflicts here (@gabriel-v feel free to pull my changes into this PR so we have one place to work on this together).
@mattrasmus what else do we need to get this upstream?
conn = psycopg2.connect(**opt) | ||
cur = conn.cursor() | ||
try: | ||
yield cur | ||
finally: | ||
cur.close() | ||
conn.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be:
with psycopg2.connect(**opt) as conn:
with conn.cursor() as cursor:
yield cursor
?
redun/executors/postgres.py
Outdated
conn.close() | ||
|
||
|
||
def run_worker_single(cur: psycopg2.cursor, queue: str, result_queue: Optional[str] = None) -> int: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring says that this function executes a batch, however looking at the code it seems more like it executes a single job (or no job). So maybe fix the docstring?
Also maybe change the return value to a boolean?
max_tasks: int = 1000, | ||
max_time: int = 3600, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docstring for the parameters is missing.
cur.execute("COMMIT;") | ||
|
||
|
||
def run_worker_until_empty( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would propose a name change here since it could be easily misunderstood as "run the worker until the worker is empty". For sure we mean the queue that goes empty here. But even that is not true when the queue is larger than 1k items or it takes more than 3600 seconds to run the job. What about just run_worker
?
queue: str, | ||
result_queue: Optional[str] = None, | ||
max_tasks: int = 1000, | ||
max_time: int = 3600, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about naming this max_seconds
instead? Won't hurt to clarify the unions.
Also: max_...
is misleading. We can easily exceed this time limit when we have a task that takes very long. The more verbose but correct name would be start_tasks_until_this_many_seconds_elapsed
. Otherwise we would have to actively abort tasks right?
obj: Dict[str, object] = {"func": func} | ||
obj["args"] = args | ||
obj["kw"] = kw | ||
return encode_obj(obj) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just:
return encode_obj({"func": func, "args": args, "kw": kw})
redun/executors/postgres.py
Outdated
Args: | ||
cur (Cursor): Database cursor we use to run LISTEN/UNLISTEN | ||
queue: table queue name | ||
timeout (int, optional): Max seconds to wait. Defaults to 60. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe document the randomization of the timeout here? also the code below waits up to 1.5 * timeout
in seconds.
"""Inserts some payloads into the queue, then notifies any listeners of | ||
that queue. | ||
|
||
**WARNING**: This function requires the caller to run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can change this function so it calls "commit" on it's own. It is used only in two places:
- In line 431 in
submit_task
, where "commit" is called anyways right after. - In line 179 in
run_worker_single
. There it might look like we could accumulate multiple task submissions since we call it in a loop but in practice (due toLIMIT 1
in line 164) we only execute the loop body once.
def exec_task( | ||
job_id: int, | ||
module_name: str, | ||
task_fullname: str, | ||
args: Tuple, | ||
kwargs: dict, | ||
**extra, | ||
) -> Any: | ||
""" | ||
Execute a task in the worker process. | ||
""" | ||
# stolen from local_executor.py | ||
load_task_module(module_name, task_fullname) | ||
task = get_task_registry().get(task_fullname) | ||
return task.func(*args, **kwargs) | ||
|
||
|
||
def exec_script_task( | ||
job_id: int, | ||
module_name: str, | ||
task_fullname: str, | ||
args: Tuple, | ||
kwargs: dict, | ||
**extra, | ||
) -> bytes: | ||
""" | ||
Execute a script task from the task registry. | ||
""" | ||
# stolen from local_executor.py | ||
load_task_module(module_name, task_fullname) | ||
task = get_task_registry().get(task_fullname) | ||
command = get_task_command(task, args, kwargs) | ||
return exec_script(command) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of "stealing" from local_executor.py
(which is just "redun/executors/local.py" I guess?) why not import it?
return dict( | ||
( | ||
(k, v) | ||
for (k, v) in config.items() | ||
if k in ["dbname", "user", "password", "host", "port", "dsn"] | ||
) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a tiny bit more concise and easier to understand?
return {k: config[k] for k in config.keys() & set(["dbname", "user", "password", "host", "port", "dsn"])}
hi @ernestum Thanks for the review. I'm no longer working on this experiment, it was a nice starting point into understanding how this works; I ended up working on a new framework using different opinions, requirements and backends Feel free to take over the code if you want to further pursue this feature |
All right. Will do. Thanks for the huge initial chunk of work! May I ask what other framework you ended up using @gabriel-v? |
Did a custom reinterpretation of this repo's DAG definition API using async python and scylladb, seeing some great throughput out of it - will leave a comment here when we publish it if you're interested |
Yes that would be great! |
meanwhile you can look at https://github.com/temporalio/samples-python we're evaluating it and it's a very strong contender |
closes #78
If you want to accept this, I guess this would be the minimum left to do