-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Asynchronous (concurrent-safe) predicates #325
Comments
The :- use_module(library(os)).
top :-
set_timeout(0, (sleep(3000), write(a)), _),
set_timeout(0, (sleep(2000), write(b)), _),
set_timeout(0, (sleep(1000), write(c)), _). ?- top.
true.
cba If you don't need to collect the answers, this predicate should suffice. But, for what you ask, it is a good idea to add a |
One thing I tried was to make my custom module predicate return a promise identifier, which could be passed to an :-
% This `bagof` call forces evaluation of all the `suggested_package_range` answers,
% before the execution blocks on the subsequent `await`
findall(Promise, suggested_package_range(Cwd, Name, Range, Promise), PromiseList),
% Then we turn back the result list into individual answers
member(Promise, PromiseList),
% And finally we can await each of them individually (they are all running in the background,
% so blocking the execution isn't a problem)
await(Promise, Latest). It'd be interesting if there was a meta predicate abstracting this writing so that it'd become:
|
I just added the basic predicates for the library:
Example: http://tau-prolog.org/sandbox/I6q4S1lY :- use_module(library(os)).
:- use_module(library(concurrent)).
top(X) :-
future(X, (sleep(3000), X = a), F1),
future(X, (sleep(1000), X = b), F2),
future(X, (sleep(1000), X = c), F3),
get_time(T0),
future_all([F1,F2,F3], F),
await(F, X),
get_time(T1),
Time is T1 - T0,
write(time(Time)). ?- top(X).
X = [a,b,c].
time(3003.0) If the ?- findall(Future, suggested_package_range(Cwd, Name, Range, Future), FutureList),
future_all(FutureList, Future),
await(Future, Latest). |
Thanks for looking into that! I tried it with Yarn, it's a nice start, here are my observations:
In terms of performance, I must be doing something wrong. To give you an idea, my last experiment (yarnpkg/berry#4566) involved creating a special operator (
On the other hand, using
To evidence that the execution didn't start in parallel, we can see that the naive command (where we don't parallelize anything, we just immediately await all futures) takes as much time:
It's almost as if the |
Oh, I think I misunderstood your predicates. If ?- findall(Future, (
workspace_has_dependency(Cwd, Ident, Range, _),
suggested_package_range(Cwd, Ident, Range, Future)
), FutureList),
future_all(FutureList, FutureAll),
await(FutureAll, Latest). Furthermore, you don't need to directly use [`suggested_package_range/4`]: (thread, point, atom) => {
const [workspaceCwdAtom, packageIdentAtom, packageRangeAtom, suggestedRangeVar] = atom.args;
if (!isAtom(workspaceCwdAtom) || !isAtom(packageIdentAtom) || !isAtom(packageRangeAtom) || !isVariable(suggestedRangeVar)) {
thread.throw_error(pl.error.instantiation(atom.indicator));
return undefined;
}
const promise = Promise.resolve().then(async () => {
const project = getProject(thread);
const workspace = project.getWorkspaceByCwd(workspaceCwdAtom.id as any);
const cache = await getCache(thread);
const ident = structUtils.parseIdent(packageIdentAtom.id);
const range = packageRangeAtom.id;
let updated: Descriptor | null;
try {
updated = await suggestUtils.fetchDescriptorFrom(ident, range, {
project,
cache,
workspace
});
} catch {
updated = null;
}
return updated?.range;
});
promise.then(result => {
thread.prepend([new pl.type.State(
point.goal.replace(new pl.type.Term(`=`, [suggestedRangeVar, new pl.type.Term(String(result))])),
point.substitution,
point
)]);
thread.again();
}, error => {
thread.throw_error(new pl.type.Term(String(error)));
thread.again();
});
return true;
} ?- findall(Future, (
workspace_has_dependency(Cwd, Ident, Range, _),
future(X, suggested_package_range(Cwd, Ident, Range, X), Future)
), FutureList),
future_all(FutureList, FutureAll),
await(FutureAll, Latest).
Fixed.
I will look at it.
|
If the Example (from JavaScript): var p = new Promise((resolve, reject) => setTimeout(() => resolve("done"), 10000));
var q = new Promise((resolve, reject) => setTimeout(() => reject("error"), 10000)); ?- get_prop(p, FutureP), await(FutureP, Value).
Future = <future>(done), Value = done. % after x seconds (depending on when p is defined)
?- get_prop(q, FutureQ), await(FutureQ, Value).
% uncaught exception: error Example (to JavaScript): ...
session.query("future(X, (sleep(10000), X = done), F).");
session.answer(answer => answer.lookup("F").toJavaScript().then(x => console.log(x))); // done (after 10 seconds) |
Oh, I didn't realize the However, moving it inside I think I'd need a predicate that would store the answers into a future opaque object, then another that would await this future and unpack the answers 🤔 |
You can add ?- findall(Future, (
workspace_has_dependency(Cwd, Ident, Range, _),
future(package(Cwd, Ident, Range, X), suggested_package_range(Cwd, Ident, Range, X), Future)
), FutureList),
future_all(FutureList, FutureAll),
await(FutureAll, Latest). Now, |
In Yarn, we offer a way to inspect information about the project dependencies via Prolog. At the moment this process is completely synchronous, but I'm considering adding some new predicates, one of which lets you obtain the latest release available for a given dependency range - which requires to query the registry. While it's reasonably fast in parallel (~10s for all dependencies in an average project), it's much slower when done sequentially (~2mns).
Do you see a way we could make it possible to parallelize some parts of the execution? Perhaps by having a "fork point" predicate or syntax from which the interpreter would compute all the possible answers up until now, fork them, continue the execution into each individual thread, and coalesce all those result sets into a single one?
The text was updated successfully, but these errors were encountered: