Skip to content

Commit

Permalink
added initial concurrent library (#325)
Browse files Browse the repository at this point in the history
  • Loading branch information
jariazavalverde committed Jun 22, 2022
1 parent 95d011d commit 933d92f
Show file tree
Hide file tree
Showing 2 changed files with 378 additions and 0 deletions.
39 changes: 39 additions & 0 deletions lib/concurrent.pl
Original file line number Diff line number Diff line change
@@ -0,0 1,39 @@
:- module(concurrent, [future_all/2]).

%! future_all( Futures, -All)
%
% Returns a single Future that resolves to a list of the results of the
% input futures.
% This is not the current implementation in the compiled version of the
% concurrent library, since it cannot detect the premature failure of a
% future.

/*
:- use_module(library(os)).
:- use_module(library(concurrent)).
top(X) :-
future(a, sleep(2000), F1),
future(b, throw(b), F2),
future(c, sleep(1000), F3),
future_all([F1,F2,F3], F),
await(F, X),
write(Time).
?- top(X).
uncaught exception: b
% The Prolog implementation reports the error after 2 seconds.
% The implementation in the compiled version reports the failure immediately.
*/

future_all([], All) :-
future([], true, All).
future_all([F|Fs], All) :-
future([X|Xs],
( await(F, X),
future_all(Fs, Rest),
await(Rest, Xs)
), All).
339 changes: 339 additions & 0 deletions modules/concurrent.js
Original file line number Diff line number Diff line change
@@ -0,0 1,339 @@
var pl;
(function(pl) {

var FUTURE_PENDING = 0, FUTURE_FULFILLED = 1, FUTURE_FAILED = 2, FUTURE_REJECTED = 3;

var predicates = function() {

return {

// future/3
"future/3": function(thread, point, atom) {
var resolve = atom.args[0], goal = atom.args[1], var_future = atom.args[2];
if(!pl.type.is_variable(var_future)) {
thread.throw_error(pl.error.instantiation(atom.indicator));
} else if(!pl.type.is_callable(goal)) {
thread.throw_error(pl.error.type("callable", goal, atom.indicator));
} else {
var future = new pl.type.Future();
var nthread = new pl.type.Thread(thread.session);
var template = thread.next_free_variable();
thread.session.renamed_variables = {};
var future_goal = new pl.type.Term(",", [new pl.type.Term("call", [goal.rename(thread)]), new pl.type.Term("=", [template, resolve.rename(thread)])]);
nthread.add_goal(future_goal);
var handlers = {
success: function(answer) {
future.done(answer.links[template.id], FUTURE_FULFILLED);
},
error: function(error) {
future.done(error.args[0], FUTURE_REJECTED);
},
fail: function() {
future.done(null, FUTURE_FAILED);
},
limit: function() {
nthread.answer(handlers);
}
};
nthread.answer(handlers);
thread.prepend([new pl.type.State(
point.goal.replace(new pl.type.Term("=", [
var_future,
future
])),
point.substitution,
point
)]);
}
},

// await/2
"await/2": function(thread, point, atom) {
var future = atom.args[0], value = atom.args[1];
if(pl.type.is_variable(future)) {
thread.throw_error(pl.error.instantiation(atom.indicator));
} else if(!pl.type.is_future_object(future)) {
thread.throw_error(pl.error.type("future", future, atom.indicator));
} else {
future.then(
function(answer) {
thread.prepend([new pl.type.State(
point.goal.replace(new pl.type.Term("=", [
value,
answer
])),
point.substitution,
point
)]);
thread.again();
},
function(error) {
thread.throw_error(error);
thread.again();
},
function() {
thread.again();
}
);
return true;
}
},

// future_all/2
"future_all/2": function(thread, point, atom) {
var futures = atom.args[0], all = atom.args[1];
if(pl.type.is_variable(futures) || !pl.type.is_variable(all)) {
thread.throw_error(pl.error.instantiation(atom.indicator));
} else if(!pl.type.is_list(futures)) {
thread.throw_error(pl.error.type("list", futures, atom.indicator));
} else {
var arr_futures = [];
var pointer = futures;
while(pl.type.is_term(pointer) && pointer.indicator === "./2") {
var head_future = pointer.args[0];
if(pl.type.is_variable(head_future)) {
thread.throw_error(pl.error.instantiation(atom.indicator));
return;
} else if(!pl.type.is_future_object(head_future)) {
thread.throw_error(pl.error.type("future", head_future, atom.indicator));
return;
}
arr_futures.push(head_future);
pointer = pointer.args[1];
}
if(pl.type.is_variable(pointer)) {
thread.throw_error(pl.error.instantiation(atom.indicator));
return;
} else if(!pl.type.is_empty_list(pointer)) {
thread.throw_error(pl.error.type("list", futures, atom.indicator));
return;
}
var future = new pl.type.Future();
future.expected = arr_futures.length;
var templates = [];
for(var i = 0; i < arr_futures.length; i ) {
arr_futures[i].then(
function(answer) {
templates.push(answer);
future.expected--;
if(future.state === FUTURE_PENDING && future.expected === 0) {
var list = new pl.type.Term("[]", []);
for(var j = templates.length-1; j >= 0; j--)
list = new pl.type.Term(".", [templates[j], list]);
future.done(list, FUTURE_FULFILLED);
}
},
function(error) {
future.done(error, FUTURE_REJECTED);
},
function() {
future.done(null, FUTURE_FAILED);
}
);
}
thread.prepend([new pl.type.State(
point.goal.replace(new pl.type.Term("=", [
all,
future
])),
point.substitution,
point
)]);
}
},

// future_any/2
"future_any/2": function(thread, point, atom) {
var futures = atom.args[0], any = atom.args[1];
if(pl.type.is_variable(futures) || !pl.type.is_variable(any)) {
thread.throw_error(pl.error.instantiation(atom.indicator));
} else if(!pl.type.is_list(futures)) {
thread.throw_error(pl.error.type("list", futures, atom.indicator));
} else {
var arr_futures = [];
var pointer = futures;
while(pl.type.is_term(pointer) && pointer.indicator === "./2") {
var head_future = pointer.args[0];
if(pl.type.is_variable(head_future)) {
thread.throw_error(pl.error.instantiation(atom.indicator));
return;
} else if(!pl.type.is_future_object(head_future)) {
thread.throw_error(pl.error.type("future", head_future, atom.indicator));
return;
}
arr_futures.push(head_future);
pointer = pointer.args[1];
}
if(pl.type.is_variable(pointer)) {
thread.throw_error(pl.error.instantiation(atom.indicator));
return;
} else if(!pl.type.is_empty_list(pointer)) {
thread.throw_error(pl.error.type("list", futures, atom.indicator));
return;
}
var future = new pl.type.Future();
future.expected = arr_futures.length;
var templates = [];
for(var i = 0; i < arr_futures.length; i ) {
arr_futures[i].then(
function(answer) {
future.expected--;
if(future.state === FUTURE_PENDING) {
future.done(answer, FUTURE_FULFILLED);
}
},
function(error) {
future.expected--;
future.done(error, FUTURE_REJECTED);
},
function() {
future.expected--;
if(future.expected === 0)
future.done(null, FUTURE_FAILED);
}
);
}
thread.prepend([new pl.type.State(
point.goal.replace(new pl.type.Term("=", [
any,
future
])),
point.substitution,
point
)]);
}
}

};

};

var exports = ["future/3", "await/2", "future_all/2", "future_any/2"];

// Is a Future object
pl.type.is_future_object = function(obj) {
return obj instanceof pl.type.Future;
};

// Ordering relation
pl.type.order.push(pl.type.Future);

// DOM Prolog object
pl.type.Future = function() {
this.value = null;
this.state = FUTURE_PENDING;
this.tasks = [];
};

pl.type.Future.prototype.done = function(value, state) {
this.value = value;
this.state = state;
if(state === FUTURE_FULFILLED) {
while(this.tasks.length > 0) {
var task = this.tasks.shift();
task.resolve(this.value);
}
} else if(state === FUTURE_REJECTED) {
while(this.tasks.length > 0) {
var task = this.tasks.shift();
task.reject(this.value);
}
} else if(state === FUTURE_FAILED) {
while(this.tasks.length > 0) {
var task = this.tasks.shift();
task.fail();
}
}
}

pl.type.Future.prototype.then = function(resolve, reject, fail) {
if(this.state === FUTURE_FULFILLED)
resolve(this.value);
else if(this.state === FUTURE_REJECTED)
reject(this.value);
else if(this.state === FUTURE_FAILED)
fail();
else
this.tasks.push({
resolve: resolve,
reject: reject,
fail: fail
});
};

// toString
pl.type.Future.prototype.toString = function(options) {
if(this.value !== null)
return "<future>(" this.value.toString(options) ")";
return "<future>(pending)";
};

// clone
pl.type.Future.prototype.clone = function() {
var p = new pl.type.Future();
p.state = this.state;
p.value = this.value;
};

// equals
pl.type.Future.prototype.equals = function(obj) {
return obj === this;
};

// rename
pl.type.Future.prototype.rename = function( _ ) {
return this;
};

// get variables
pl.type.Future.prototype.variables = function() {
return [];
};

// apply substitutions
pl.type.Future.prototype.apply = function( _ ) {
return this;
};

// unify
pl.type.Future.prototype.unify = function(obj, _) {
if(obj === this)
return new pl.type.Substitution();
return null;
};

// interpret
pl.type.Future.prototype.interpret = function( indicator ) {
return pl.error.instantiation( indicator );
};

// compare
pl.type.Future.prototype.compare = function( obj ) {
if(this === obj) {
return 0;
} else if(this < obj) {
return -1;
} else {
return 1;
}
};

var options = function() {
return {
meta_predicates: {
// future(?, 0, -)
"future/3": new pl.type.Term("future", [new pl.type.Term("?"), new pl.type.Num(0, false), new pl.type.Term("-")])
}
};
};

if(typeof module !== 'undefined') {
module.exports = function(p) {
pl = p;
new pl.type.Module("concurrent", predicates(), exports, options());
};
} else {
new pl.type.Module("concurrent", predicates(), exports, options());
}

})(pl);

0 comments on commit 933d92f

Please sign in to comment.