Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pg executor #84

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open

Pg executor #84

wants to merge 2 commits into from

Conversation

gabriel-v
Copy link

@gabriel-v gabriel-v commented Sep 7, 2023

closes #78

If you want to accept this, I guess this would be the minimum left to do

  • add CLI subcommand to start executor worker process
  • add&fix type annotations
  • add instructions & docs sections for running
  • test internals of executor (send/receive to/from queue)
  • test a bunch of different / missing settings in the ini file
  • test executor with workflows

@gregjohnso
Copy link

@mattrasmus this would be very helpful for us. What is the approval mechanism here?

Copy link

@ernestum ernestum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the great work on the alternative to an AWS executor @gabriel-v!

I am evaluating a number of pipeline/workflow solutions right now and redun is at the top of the list with the only catch being that there is no self-hosted executor solution.

In hope to get this feature upstream, I provide some code review. I also took the liberty to rebase it onto master and resolved the conflicts here (@gabriel-v feel free to pull my changes into this PR so we have one place to work on this together).

@mattrasmus what else do we need to get this upstream?

Comment on lines 123 to 129
conn = psycopg2.connect(**opt)
cur = conn.cursor()
try:
yield cur
finally:
cur.close()
conn.close()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be:

with psycopg2.connect(**opt) as conn:
    with conn.cursor() as cursor:
        yield cursor

?

conn.close()


def run_worker_single(cur: psycopg2.cursor, queue: str, result_queue: Optional[str] = None) -> int:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring says that this function executes a batch, however looking at the code it seems more like it executes a single job (or no job). So maybe fix the docstring?
Also maybe change the return value to a boolean?

Comment on lines 234 to 235
max_tasks: int = 1000,
max_time: int = 3600,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring for the parameters is missing.

cur.execute("COMMIT;")


def run_worker_until_empty(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would propose a name change here since it could be easily misunderstood as "run the worker until the worker is empty". For sure we mean the queue that goes empty here. But even that is not true when the queue is larger than 1k items or it takes more than 3600 seconds to run the job. What about just run_worker?

queue: str,
result_queue: Optional[str] = None,
max_tasks: int = 1000,
max_time: int = 3600,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about naming this max_seconds instead? Won't hurt to clarify the unions.

Also: max_... is misleading. We can easily exceed this time limit when we have a task that takes very long. The more verbose but correct name would be start_tasks_until_this_many_seconds_elapsed. Otherwise we would have to actively abort tasks right?

Comment on lines 300 to 303
obj: Dict[str, object] = {"func": func}
obj["args"] = args
obj["kw"] = kw
return encode_obj(obj)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just:

return encode_obj({"func": func, "args": args, "kw": kw})

Args:
cur (Cursor): Database cursor we use to run LISTEN/UNLISTEN
queue: table queue name
timeout (int, optional): Max seconds to wait. Defaults to 60.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe document the randomization of the timeout here? also the code below waits up to 1.5 * timeout in seconds.

"""Inserts some payloads into the queue, then notifies any listeners of
that queue.

**WARNING**: This function requires the caller to run

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can change this function so it calls "commit" on it's own. It is used only in two places:

  1. In line 431 in submit_task, where "commit" is called anyways right after.
  2. In line 179 in run_worker_single. There it might look like we could accumulate multiple task submissions since we call it in a loop but in practice (due to LIMIT 1 in line 164) we only execute the loop body once.

Comment on lines 447 to 479
def exec_task(
job_id: int,
module_name: str,
task_fullname: str,
args: Tuple,
kwargs: dict,
**extra,
) -> Any:
"""
Execute a task in the worker process.
"""
# stolen from local_executor.py
load_task_module(module_name, task_fullname)
task = get_task_registry().get(task_fullname)
return task.func(*args, **kwargs)


def exec_script_task(
job_id: int,
module_name: str,
task_fullname: str,
args: Tuple,
kwargs: dict,
**extra,
) -> bytes:
"""
Execute a script task from the task registry.
"""
# stolen from local_executor.py
load_task_module(module_name, task_fullname)
task = get_task_registry().get(task_fullname)
command = get_task_command(task, args, kwargs)
return exec_script(command)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of "stealing" from local_executor.py (which is just "redun/executors/local.py" I guess?) why not import it?

Comment on lines 504 to 510
return dict(
(
(k, v)
for (k, v) in config.items()
if k in ["dbname", "user", "password", "host", "port", "dsn"]
)
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a tiny bit more concise and easier to understand?

return {k: config[k] for k in config.keys() & set(["dbname", "user", "password", "host", "port", "dsn"])}

@gabriel-v
Copy link
Author

hi @ernestum

Thanks for the review. I'm no longer working on this experiment, it was a nice starting point into understanding how this works; I ended up working on a new framework using different opinions, requirements and backends

Feel free to take over the code if you want to further pursue this feature

@ernestum
Copy link

All right. Will do. Thanks for the huge initial chunk of work!

May I ask what other framework you ended up using @gabriel-v?

@gabriel-v
Copy link
Author

Did a custom reinterpretation of this repo's DAG definition API using async python and scylladb, seeing some great throughput out of it - will leave a comment here when we publish it if you're interested

@ernestum
Copy link

Did a custom reinterpretation of this repo's DAG definition API using async python and scylladb, seeing some great throughput out of it - will leave a comment here when we publish it if you're interested

Yes that would be great!

@gabriel-v
Copy link
Author

meanwhile you can look at https://github.com/temporalio/samples-python

we're evaluating it and it's a very strong contender

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

PostgreSQL Queue Executor (Distributed executor without Amazon/k8s)
3 participants