Skip to content

Commit

Permalink
base: add mechanism to poll futures to completion
Browse files Browse the repository at this point in the history
This CL adds the ability to "spawn" Futures and Streams on a
TaskRunner i.e. poll them to completion by interfacing with the
FileDescriptorWatch APIs of TaskRunner.

Spawning a future returns a handle which, when destroyed, cancels the
Future/Stream. This allows propagation of cancellation across thread
boundaries.

Change-Id: Ie2915acbbbe539d8d5b2d2ffc8c7599160a2bf93
  • Loading branch information
LalitMaganti committed Mar 24, 2023
1 parent 9edf3c1 commit 5654079
Show file tree
Hide file tree
Showing 6 changed files with 411 additions and 1 deletion.
2 changes: 2 additions & 0 deletions Android.bp
Original file line number Diff line number Diff line change
Expand Up @@ -8222,6 8222,7 @@ filegroup {
filegroup {
name: "perfetto_src_base_threading_threading",
srcs: [
"src/base/threading/spawn.cc",
"src/base/threading/stream_combinators.cc",
"src/base/threading/thread_pool.cc",
],
Expand All @@ -8233,6 8234,7 @@ filegroup {
srcs: [
"src/base/threading/channel_unittest.cc",
"src/base/threading/future_unittest.cc",
"src/base/threading/spawn_unittest.cc",
"src/base/threading/stream_unittest.cc",
"src/base/threading/thread_pool_unittest.cc",
"src/base/threading/util_unittest.cc",
Expand Down
1 change: 1 addition & 0 deletions include/perfetto/ext/base/threading/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 20,7 @@ source_set("threading") {
"future.h",
"future_combinators.h",
"poll.h",
"spawn.h",
"stream.h",
"stream_combinators.h",
"thread_pool.h",
Expand Down
144 changes: 144 additions & 0 deletions include/perfetto/ext/base/threading/spawn.h
Original file line number Diff line number Diff line change
@@ -0,0 1,144 @@
/*
* Copyright (C) 2023 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_SPAWN_H_
#define INCLUDE_PERFETTO_EXT_BASE_THREADING_SPAWN_H_

#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <utility>
#include <vector>

#include "perfetto/base/compiler.h"
#include "perfetto/base/flat_set.h"
#include "perfetto/base/platform_handle.h"
#include "perfetto/base/task_runner.h"
#include "perfetto/ext/base/event_fd.h"
#include "perfetto/ext/base/flat_hash_map.h"
#include "perfetto/ext/base/optional.h"
#include "perfetto/ext/base/thread_checker.h"
#include "perfetto/ext/base/threading/channel.h"
#include "perfetto/ext/base/threading/future.h"
#include "perfetto/ext/base/threading/poll.h"
#include "perfetto/ext/base/threading/stream.h"
#include "perfetto/ext/base/threading/stream_combinators.h"
#include "perfetto/ext/base/threading/util.h"
#include "perfetto/ext/base/uuid.h"
#include "perfetto/ext/base/weak_ptr.h"

namespace perfetto {
namespace base {

class PolledFuture;

// A RAII object which tracks the polling of a Future.
//
// When this object is dropped, the backing Future will be cancelled as
// soon as possible. In practice, the cancellation happens on the TaskRunner
// thread so there can be some delay.
class SpawnHandle {
public:
SpawnHandle(TaskRunner* task_runner, std::function<Future<FVoid>()> fn);
~SpawnHandle();

private:
SpawnHandle(const SpawnHandle&) = delete;
SpawnHandle& operator=(const SpawnHandle&) = delete;

TaskRunner* task_runner_ = nullptr;
std::shared_ptr<std::unique_ptr<PolledFuture>> polled_future_;
};

// Specialization of SpawnHandle used by Futures/Streams which return T.
//
// Values of T are returned through a Channel<T> which allows reading these
// values on a different thread to where the polling happens.
template <typename T>
class ResultSpawnHandle {
public:
ResultSpawnHandle(TaskRunner* task_runner,
std::shared_ptr<Channel<T>> channel,
std::function<Future<FVoid>()> fn)
: handle_(task_runner, std::move(fn)), channel_(std::move(channel)) {}

Channel<T>* channel() const { return channel_.get(); }

private:
SpawnHandle handle_;
std::shared_ptr<Channel<T>> channel_;
};

// "Spawns" a Future<FVoid> on the given TaskRunner and returns an RAII
// SpawnHandle which can be used to cancel the spawn.
//
// Spawning a Future means to poll it to completion. In Perfetto, this is done
// by using a TaskRunner object to track FD readiness and polling the Future
// when progress can be made.
//
// The returned SpawnHandle should be stashed as it is responsible for the
// lifetime of the pollling. If the SpawnHandle is dropped, the Future is
// cancelled and dropped ASAP (this happens on the TaskRunner thread so there
// can be some delay).
PERFETTO_WARN_UNUSED_RESULT inline SpawnHandle SpawnFuture(
TaskRunner* task_runner,
std::function<Future<FVoid>()> fn) {
return SpawnHandle(task_runner, std::move(fn));
}

// Variant of |SpawnFuture| for a Stream<T> allowing returning items of T.
//
// See ResultSpawnHandle for how elements from the stream can be consumed.
template <typename T>
PERFETTO_WARN_UNUSED_RESULT inline ResultSpawnHandle<T> SpawnResultStream(
TaskRunner* task_runner,
std::function<Stream<T>()> fn) {
class AllVoidCollector : public Collector<FVoid, FVoid> {
public:
Optional<FVoid> OnNext(FVoid) override { return nullopt; }
FVoid OnDone() override { return FVoid(); }
};
auto channel = std::make_shared<Channel<T>>(4);
return ResultSpawnHandle<T>(
task_runner, channel, [c = channel, fn = std::move(fn)]() {
return fn()
.MapFuture([c](T value) {
return WriteChannelFuture(c.get(), std::move(value));
})
.Concat(OnDestroyStream<FVoid>([c]() { c->Close(); }))
.Collect(std::unique_ptr<Collector<FVoid, FVoid>>(
new AllVoidCollector()));
});
}

// Variant of |SpawnFuture| for a Future<T> allowing returning items of T.
//
// See ResultSpawnHandle for how elements from the future can be consumed.
template <typename T>
PERFETTO_WARN_UNUSED_RESULT inline ResultSpawnHandle<T> SpawnResultFuture(
TaskRunner* task_runner,
std::function<Future<T>()> fn) {
return SpawnResultStream<T>(task_runner, [fn = std::move(fn)]() {
return StreamFromFuture(std::move(fn()));
});
}

} // namespace base
} // namespace perfetto

#endif // INCLUDE_PERFETTO_EXT_BASE_THREADING_SPAWN_H_
8 changes: 7 additions & 1 deletion src/base/threading/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 15,13 @@
import("../../../gn/test.gni")

source_set("threading") {
deps = [ "../../../gn:default_deps" ]
deps = [
"..:base",
"../../../gn:default_deps",
]
public_deps = [ "../../../include/perfetto/ext/base/threading" ]
sources = [
"spawn.cc",
"stream_combinators.cc",
"thread_pool.cc",
]
Expand All @@ -28,12 32,14 @@ perfetto_unittest_source_set("unittests") {
deps = [
":threading",
"..:base",
"..:test_support",
"../../../gn:default_deps",
"../../../gn:gtest_and_gmock",
]
sources = [
"channel_unittest.cc",
"future_unittest.cc",
"spawn_unittest.cc",
"stream_unittest.cc",
"thread_pool_unittest.cc",
"util_unittest.cc",
Expand Down
124 changes: 124 additions & 0 deletions src/base/threading/spawn.cc
Original file line number Diff line number Diff line change
@@ -0,0 1,124 @@
/*
* Copyright (C) 2023 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "perfetto/ext/base/threading/spawn.h"

#include "perfetto/base/task_runner.h"
#include "perfetto/ext/base/optional.h"
#include "perfetto/ext/base/thread_checker.h"
#include "perfetto/ext/base/threading/future.h"
#include "perfetto/ext/base/threading/poll.h"
#include "perfetto/ext/base/threading/stream.h"

namespace perfetto {
namespace base {

// Represents a future which is being polled to completion. Owned by
// SpawnHandle.
class PolledFuture {
public:
explicit PolledFuture(TaskRunner* task_runner, Future<FVoid> future)
: task_runner_(task_runner), future_(std::move(future)) {
PERFETTO_DCHECK(task_runner_->RunsTasksOnCurrentThread());
PollUntilFinish();
}

~PolledFuture() {
PERFETTO_DCHECK_THREAD(thread_checker);
ClearFutureAndWatches(interested_);
}

private:
PolledFuture(PolledFuture&&) = delete;
PolledFuture& operator=(PolledFuture&&) = delete;

void PollUntilFinish() {
PERFETTO_DCHECK(task_runner_->RunsTasksOnCurrentThread());

auto pre_poll_interested = std::move(interested_);
interested_.clear();

FuturePollResult<FVoid> res = future_->Poll(&context_);
if (!res.IsPending()) {
ClearFutureAndWatches(pre_poll_interested);
return;
}

for (PlatformHandle fd : SetDifference(pre_poll_interested, interested_)) {
task_runner_->RemoveFileDescriptorWatch(fd);
}

auto weak_this = weak_ptr_factory_.GetWeakPtr();
for (PlatformHandle fd : SetDifference(interested_, pre_poll_interested)) {
task_runner_->AddFileDescriptorWatch(fd, [weak_this, fd]() {
if (!weak_this) {
return;
}
weak_this->ready_ = {fd};
weak_this->PollUntilFinish();
});
}
}

void ClearFutureAndWatches(FlatSet<PlatformHandle> interested) {
future_ = nullopt;
for (PlatformHandle fd : interested) {
task_runner_->RemoveFileDescriptorWatch(fd);
}
interested_.clear();
ready_.clear();
}

static std::vector<PlatformHandle> SetDifference(
const FlatSet<PlatformHandle>& f,
const FlatSet<PlatformHandle>& s) {
std::vector<PlatformHandle> out(f.size());
auto it = std::set_difference(f.begin(), f.end(), s.begin(), s.end(),
out.begin());
out.resize(static_cast<size_t>(std::distance(out.begin(), it)));
return out;
}

TaskRunner* const task_runner_ = nullptr;

Optional<Future<FVoid>> future_;
FlatSet<PlatformHandle> interested_;
FlatSet<PlatformHandle> ready_;
PollContext context_{&interested_, &ready_};

PERFETTO_THREAD_CHECKER(thread_checker)

// Keep this last.
WeakPtrFactory<PolledFuture> weak_ptr_factory_{this};
};

SpawnHandle::SpawnHandle(TaskRunner* task_runner,
std::function<Future<FVoid>()> fn)
: task_runner_(task_runner),
polled_future_(std::make_shared<std::unique_ptr<PolledFuture>>()) {
task_runner->PostTask(
[t = task_runner, fn = std::move(fn), p = polled_future_]() mutable {
p->reset(new PolledFuture(t, fn()));
});
}

SpawnHandle::~SpawnHandle() {
task_runner_->PostTask(
[f = std::move(polled_future_)]() mutable { f.reset(); });
}

} // namespace base
} // namespace perfetto
Loading

0 comments on commit 5654079

Please sign in to comment.