Skip to content

Commit

Permalink
work on polling
Browse files Browse the repository at this point in the history
introduce the ability to hand back control during blocking poll to userland

fixes #140
  • Loading branch information
krakjoe committed May 18, 2024
1 parent 9160324 commit dbd75c2
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 12 deletions.
49 changes: 41 additions & 8 deletions src/events.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 41,7 @@ static zend_object* php_parallel_events_create(zend_class_entry *type) {
events->blocking = 1;

ZVAL_UNDEF(&events->input);
ZVAL_UNDEF(&events->blocker);

return &events->std;
}
Expand Down Expand Up @@ -79,6 80,10 @@ static void php_parallel_events_destroy(zend_object *zo) {
zval_ptr_dtor(&events->input);
}

if (!Z_ISUNDEF(events->blocker)) {
zval_ptr_dtor(&events->blocker);
}

zend_hash_destroy(&events->targets);

zend_object_std_dtor(zo);
Expand Down Expand Up @@ -218,6 223,33 @@ PHP_METHOD(Events, setBlocking)
events->blocking = blocking;
}

ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(php_parallel_events_set_blocker_arginfo, 0, 1, IS_VOID, 0)
ZEND_ARG_CALLABLE_INFO(0, blocker, 0)
ZEND_END_ARG_INFO()

PHP_METHOD(Events, setBlocker)
{
php_parallel_events_t *events = php_parallel_events_from(getThis());
zval *blocker = NULL;

ZEND_PARSE_PARAMETERS_START(1, 1)
Z_PARAM_ZVAL(blocker)
ZEND_PARSE_PARAMETERS_END();

if (!events->blocking) {
php_parallel_exception_ex(
php_parallel_events_error_ce,
"cannot set blocker for non-blocking event loop");
return;
}

if (!Z_ISUNDEF(events->blocker)) {
zval_ptr_dtor(&events->blocker);
}

ZVAL_COPY(&events->blocker, blocker);
}

ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(php_parallel_events_poll_arginfo, 0, 0, \\parallel\\Events\\Event, 1)
ZEND_END_ARG_INFO()

Expand Down Expand Up @@ -247,14 279,15 @@ PHP_METHOD(Events, count)
}

zend_function_entry php_parallel_events_methods[] = {
PHP_ME(Events, setInput, php_parallel_events_set_input_arginfo, ZEND_ACC_PUBLIC)
PHP_ME(Events, addChannel, php_parallel_events_add_channel_arginfo, ZEND_ACC_PUBLIC)
PHP_ME(Events, addFuture, php_parallel_events_add_future_arginfo, ZEND_ACC_PUBLIC)
PHP_ME(Events, remove, php_parallel_events_remove_arginfo, ZEND_ACC_PUBLIC)
PHP_ME(Events, setBlocking, php_parallel_events_set_blocking_arginfo, ZEND_ACC_PUBLIC)
PHP_ME(Events, setTimeout, php_parallel_events_set_timeout_arginfo, ZEND_ACC_PUBLIC)
PHP_ME(Events, poll, php_parallel_events_poll_arginfo, ZEND_ACC_PUBLIC)
PHP_ME(Events, count, php_parallel_events_count_arginfo, ZEND_ACC_PUBLIC)
PHP_ME(Events, setInput, php_parallel_events_set_input_arginfo, ZEND_ACC_PUBLIC)
PHP_ME(Events, addChannel, php_parallel_events_add_channel_arginfo, ZEND_ACC_PUBLIC)
PHP_ME(Events, addFuture, php_parallel_events_add_future_arginfo, ZEND_ACC_PUBLIC)
PHP_ME(Events, remove, php_parallel_events_remove_arginfo, ZEND_ACC_PUBLIC)
PHP_ME(Events, setBlocking, php_parallel_events_set_blocking_arginfo, ZEND_ACC_PUBLIC)
PHP_ME(Events, setBlocker, php_parallel_events_set_blocker_arginfo, ZEND_ACC_PUBLIC)
PHP_ME(Events, setTimeout, php_parallel_events_set_timeout_arginfo, ZEND_ACC_PUBLIC)
PHP_ME(Events, poll, php_parallel_events_poll_arginfo, ZEND_ACC_PUBLIC)
PHP_ME(Events, count, php_parallel_events_count_arginfo, ZEND_ACC_PUBLIC)
PHP_FE_END
};

Expand Down
1 change: 1 addition & 0 deletions src/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 23,7 @@ typedef struct _php_parallel_events_t {
HashTable targets;
zend_long timeout;
zend_bool blocking;
zval blocker;
zend_object std;
} php_parallel_events_t;

Expand Down
36 changes: 32 additions & 4 deletions src/poll.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 28,11 @@

typedef struct _php_parallel_events_poll_t {
struct timeval stop;
struct {
zend_fcall_info fci;
zend_fcall_info_cache fcc;
zval ival;
} block;
} php_parallel_events_poll_t;

static zend_always_inline zend_bool php_parallel_events_poll_init(php_parallel_events_poll_t *poll, php_parallel_events_t *events) {
Expand All @@ -44,6 49,14 @@ static zend_always_inline zend_bool php_parallel_events_poll_init(php_parallel_e
/* return 0 ? */
}

if (!Z_ISUNDEF(events->blocker)) {
zend_fcall_info_init(&events->blocker, 0,
&poll->block.fci, &poll->block.fcc, NULL, NULL);
poll->block.fci.retval = &poll->block.ival;
} else {
memset(&poll->block, 0, sizeof(poll->block));
}

return 1;
}

Expand Down Expand Up @@ -287,12 300,27 @@ void php_parallel_events_poll(php_parallel_events_t *events, zval *retval) {
goto _php_parallel_events_poll_null;
}

if (php_parallel_events_poll_timeout(&poll, events)) {
return;
if (poll.block.fci.size) {
zend_call_function(
&poll.block.fci, &poll.block.fcc);

if (zend_is_true(&poll.block.ival)) {
zval_ptr_dtor(
&poll.block.ival);

goto _php_parallel_events_poll_null;
}

zval_ptr_dtor(
&poll.block.ival);
} else {
if ((try % 10) == 0) {
usleep(1);
}
}

if ((try % 10) == 0) {
usleep(1);
if (php_parallel_events_poll_timeout(&poll, events)) {
return;
}

continue;
Expand Down
31 changes: 31 additions & 0 deletions tests/events/wait/007.phpt
Original file line number Diff line number Diff line change
@@ -0,0 1,31 @@
--TEST--
Check Events blocker
--SKIPIF--
<?php
if (!extension_loaded('parallel')) {
echo 'skip';
}
?>
--FILE--
<?php
use \parallel\Events;
use \parallel\Channel;

$events = new Events();
$events->addChannel(Channel::make("buffer"));
$events->setBlocker(function(){
echo
"BLOCKER\n";
/* interrupt loop */
return true;
});

if ($events->poll() === null && count($events)) {
echo "OK";
}
?>
--EXPECT--
BLOCKER
OK


0 comments on commit dbd75c2

Please sign in to comment.