-
Notifications
You must be signed in to change notification settings - Fork 266
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: rearchitect rebaser to support more scale #4638
Conversation
f2597f4
to
fb3c461
Compare
/try |
fb3c461
to
13c4ad5
Compare
c76f963
to
a1db2f2
Compare
6a743a1
to
f6828ee
Compare
/try |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if has_roots { | ||
loop { | ||
let mut ctx_clone = ctx.clone(); | ||
ctx_clone.update_snapshot_to_visibility().await?; | ||
if !ctx_clone | ||
.workspace_snapshot()? | ||
.has_dependent_value_roots() | ||
.await? | ||
{ | ||
break; | ||
} | ||
tokio::time::sleep(Duration::from_millis(25)).await; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally I'd be weary about the potential infinite loop, but this is a test helper, so looks great to me!
fn call(&mut self, req: jetstream::Message) -> Self::Future { | ||
let parts = req.into_parts(); | ||
let head = Arc::new(parts.0.clone()); | ||
let message = match <jetstream::Message as MessageHead>::from_parts(parts.0, parts.1) { | ||
Ok(message) => message, | ||
Err(err) => unreachable!( | ||
"NATS Jetstream message from parts should succeed, this is a bug!; error={:?}", | ||
err | ||
), | ||
}; | ||
|
||
let info = Arc::new(Info::from( | ||
// TODO(fnichol): the middleware here is infallible, but this call could, in theory | ||
// error. There's probably a better alternative here... | ||
message.info().expect("failed to parse message info"), | ||
)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These look like they'd be hit by logic errors caught by local dev and tests. I'm okay with panics here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be able to deal with this differently
// There is one more optional future here which is confirmation from the NATS server that | ||
// our publish was acked. However, the task stream will drop new messages that are | ||
// duplicates and this returns an error on the "ack future". Instead, we'll keep this as | ||
// fire and forget. | ||
self.context.publish(tasks_subject, vec![].into()).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
// TODO(fnichol): hrm, is this *really* true that we've written to the change set. I mean, | ||
// yes but until a dvu has finished this is an incomplete view? | ||
let mut event = WsEvent::change_set_written(&ctx, change_set_id.into()).await?; | ||
event.set_workspace_pk(workspace_id.into()); | ||
event.set_change_set_id(Some(change_set_id.into())); | ||
event.publish_immediately(&ctx).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Smells like a "fuck it we ball, reload that UI" trigger. I'm okay with it for now, but you're right... not necessarily going to be true. Good to call out!
) -> push::OrderedConfig { | ||
push::OrderedConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw this yesterday and am a bit surprised it comes from push
and not pull
... just an interesting note lol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we were suggested that a push consumer would be okay to use here (maybe slightly preferred in our use case). My understanding is that under the covers it uses a core NATS subject, fires all messages that you're subscribed to there and set the client up to subscribe to the single core NATS subject (so old-school NATS firehose style). Kind of neat!
debug!( | ||
"to_rebase_address: {}, rebase_batch_address: {}", | ||
to_rebase_workspace_snapshot_address, message.payload.rebase_batch_address | ||
to_rebase_workspace_snapshot_address = %to_rebase_workspace_snapshot_address, | ||
updates_address = %request.updates_address, | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! These fields will be easier to index in Jaegar.
582e2a7
to
17dd07e
Compare
This change resolves an issue where customizing the `OnSuccess` or `OnFailure` callbacks would result in a compiler error. Whoops! Signed-off-by: Fletcher Nichol <[email protected]>
This fix came up while trying to use `tokio-stream`'s `.timeout()` combinator which is not `Unpin`. Signed-off-by: Fletcher Nichol <[email protected]>
Signed-off-by: Fletcher Nichol <[email protected]>
This new middleware works when processing Jetstream messages and is useful in situations where `ack`'ing messages isn't possible or appropriate. The callback for `OnSuccess` and `OnFailure` have access to the message's `Head` metadata as well as its `Info` metadata. Signed-off-by: Fletcher Nichol <[email protected]>
Signed-off-by: Fletcher Nichol <[email protected]>
Signed-off-by: Fletcher Nichol <[email protected]>
Signed-off-by: Fletcher Nichol <[email protected]>
Signed-off-by: Fletcher Nichol <[email protected]>
Signed-off-by: Fletcher Nichol <[email protected]>
011afed
to
8c1cd8b
Compare
/try |
A rather large change which changes how a Rebaser server consumes requests for change sets and when it performs related "dependent values update" (aka "DVU") runs. General NATS Jetstream Architecture ----------------------------------- A Rebaser server uses 2 NATS Jetstream streams to track its work: - `REBASER_REQUESTS`: as before all requests are consumed from this stream and can be thought of as a work queue. In this implementation, the stream is a limits-based stream, as the Jetstream consumer model is different. As before a request for workspace `$wk_id` and change set `$cs_id` are published on `rebaser.requests.$wk_id.$cs_id`. - `REBASER_TASKS`: this is a classic Jetstream work queue stream, but where there is only *one* message per subject and where each message represents an exclusive task for a single Rebaser to spin up and run. In this case, there are only "process" tasks in which a Rebaser will consume the enqueued requests for a change set and run DVUs in serial (that is, only one at a time and one after the other as needed). When a request message is sent to the `REBASER_REQUESTS` stream, another body-less message is sent to this tasks stream. These messages are published on `rebaser.tasks.$wk_id.$cs_id.process`. Tasks Stream ------------ On the tasks stream, a single Jetstream consumer is setup to share work accross multiple Rebasers as clients. When a Rebaser starts to process a task message it continuously sends `AckKind::Progress` messages to keep the message from being ack'd or redelivered. If the task encounters an error, it can return a `Result::Err(_)` from its Naxum handler which will trigger an `AckKind::Nack(None)` message, causing an immediate redlivery of the message to another Rebaser. If a task needs to be interrupted due to a graceful shutdown of a Rebaser server, the handler it will return an `Error::Interupted(_)` error which ensures that the message is nack'd and will be redelivered. If a task can be cleanly completed, the the handler will return a `Result::Ok(_)` which triggers an `AckKind::Ack`, causing the message to be deleted from the tasks stream and so will *not* be ran again. Requests Stream --------------- When a "process" task is running, a dedicated NATS consumer is created to exclusively process all requests for a change set in serial (that is, one at a time). This consumer is known as an "ordered consumer" and is push-based (rather than the default pull-based consumers). An ordered consumer is much lighter weight and ephemeral as far as a NATS cluster is concerned and thus should reduce the stress on NATS when many change sets are created/active over a short period of time. This ordered consumer is set up with a timeout that detects when no message has entered the subject (or no message has been pulled into the Naxum app) within a period. When this "quiescence" period is seen, this triggers a specific graceful shutdown of the "process" task where its exit state is to return `Result::Ok(_)` and ack the task message. In this way, change sets which become inactive (that is, no Rebaser request message) are spun down to conserve resources and allow the Rebaser to focus only on "active" change sets. Using an ordered consumer means that we can no longer use a message `ack` to delete a processed messages (this is a trick of a work queue stream). Therefore when a request has been successfully processed, the message is deleted from the stream using its sequence number. A new Naxum middleware called `PostProcess` provides us a way to handle this delete on an `OnSuccess` callback. In the `OnFailure` case we simply don't delete the message which means the next message is still the first and only message to process. Another tracked Tokio task is running alongside this consuming requests task, called the `SerialDvuTask`. It is waiting for a `tokio::sync::Notify` to fire which is triggered by the request-consuming Tokio task. If, during the run of a DVU, another request is processed that requires another DVU run, then the `Notify` will be re-enabled. That way when the `SerialDvuTask` loop comes back to check, the `Notify` will be set thus trigger "yet one more" DVU run. Other Structural Changes ------------------------ Among other changes, some of note are: - The publisher of a Rebaser request send it directly to the `REBASER_REQUESTS` stream. Due to its current RPC (i.e. request-then-await-response) communication pattern, setting up the Rebaser in this way makes it operate more like Pinga and Veritech. Future work is likely going to bring back the concept of "activities" that flow for interested parties to follow, however prior to this commit, the only users of the activity stream were callers to the Rebaser and the Rebaser server itself. - A first class client for the Rebaser is provided in `lib/rebaser-client` which abstracts the NATS communication and the on-the-wire request formatting details, ideally allowing us to evolve the Rebaser's messaging internally where possible. - The start of an API messaging framework is provided largely in the `lib/rebaser-core` crate in the `api_types` module. Several Rust traits are provided that when implemented gives the user a versioned message that can be upgraded and understands its serialization format independently of versioning. This should allow us to evolve the Rebasers request and response messages with both the client and server able to detect unsupported message versions without having to deserialize the message payload itself. Big props to @jhelwig's work on version graph snapshots for the inspiration. - Rebaser request and response messages are transmitted with several metadata header in the NATS message including: - `X-CONTENT-TYPE`: describes the serialization format in a content type/MIME header compatible string. Current values are `application/json` and `application/cbor` with `application/cbor` the current default. - `X-MESSAGE-TYPE`: describes the message type, typically the primary Rust name. Current values are `EnqueueUpdatesRequest` and `EnqueueUpdatesResponse`. - `X-MESSAGE-VERSION`: a positive integer, where a greater number value implies a newer version. It is likely that code that would understand version 5 of a message may be able to understand version 4, 3, 2, 1 but not 6 as an example. - `NATS-MSG-ID`: this is a standard NATS header which can be used for message de-duplication and is populated with a random at-request-time `Ulid` value, managed via a `RequestId` type (required for all API data types). Signed-off-by: Fletcher Nichol <[email protected]>
8c1cd8b
to
2033a8c
Compare
/try |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-approved!
A rather large change which changes how a Rebaser server consumes
requests for change sets and when it performs related "dependent values
update" (aka "DVU") runs.
General NATS Jetstream Architecture
A Rebaser server uses 2 NATS Jetstream streams to track its work:
REBASER_REQUESTS
: as before all requests are consumed from thisstream and can be thought of as a work queue. In this implementation,
the stream is a limits-based stream, as the Jetstream consumer model
is different. As before a request for workspace
$wk_id
and changeset
$cs_id
are published onrebaser.requests.$wk_id.$cs_id
.REBASER_TASKS
: this is a classic Jetstream work queue stream, butwhere there is only one message per subject and where each message
represents an exclusive task for a single Rebaser to spin up and run.
In this case, there are only "process" tasks in which a Rebaser will
consume the enqueued requests for a change set and run DVUs in serial
(that is, only one at a time and one after the other as needed). When
a request message is sent to the
REBASER_REQUESTS
stream, anotherbody-less message is sent to this tasks stream. These messages are
published on
rebaser.tasks.$wk_id.$cs_id.process
.Tasks Stream
On the tasks stream, a single Jetstream consumer is setup to share work
accross multiple Rebasers as clients. When a Rebaser starts to process a
task message it continuously sends
AckKind::Progress
messages to keepthe message from being ack'd or redelivered. If the task encounters an
error, it can return a
Result::Err(_)
from its Naxum handler whichwill trigger an
AckKind::Nack(None)
message, causing an immediateredlivery of the message to another Rebaser.
If a task needs to be interrupted due to a graceful shutdown of a Rebaser
server, the handler it will return an
Error::Interupted(_)
error whichensures that the message is nack'd and will be redelivered. If a task
can be cleanly completed, the the handler will return a
Result::Ok(_)
which triggers an
AckKind::Ack
, causing the message to be deleted fromthe tasks stream and so will not be ran again.
Requests Stream
When a "process" task is running, a dedicated NATS consumer is created
to exclusively process all requests for a change set in serial (that is,
one at a time). This consumer is known as an "ordered consumer" and is
push-based (rather than the default pull-based consumers). An ordered
consumer is much lighter weight and ephemeral as far as a NATS cluster
is concerned and thus should reduce the stress on NATS when many change
sets are created/active over a short period of time.
This ordered consumer is set up with a timeout that detects when no
message has entered the subject (or no message has been pulled into the
Naxum app) within a period. When this "quiescence" period is seen, this
triggers a specific graceful shutdown of the "process" task where its
exit state is to return
Result::Ok(_)
and ack the task message. Inthis way, change sets which become inactive (that is, no Rebaser request
message) are spun down to conserve resources and allow the Rebaser to
focus only on "active" change sets.
Using an ordered consumer means that we can no longer use a message
ack
to delete a processed messages (this is a trick of a work queuestream). Therefore when a request has been successfully processed, the
message is deleted from the stream using its sequence number. A new
Naxum middleware called
PostProcess
provides us a way to handle thisdelete on an
OnSuccess
callback. In theOnFailure
case we simplydon't delete the message which means the next message is still the first
and only message to process.
Another tracked Tokio task is running alongside this consuming requests
task, called the
SerialDvuTask
. It is waiting for atokio::sync::Notify
to fire which is triggered by therequest-consuming Tokio task. If, during the run of a DVU, another
request is processed that requires another DVU run, then the
Notify
will be re-enabled. That way when the
SerialDvuTask
loop comes back tocheck, the
Notify
will be set thus trigger "yet one more" DVU run.Other Structural Changes
Among other changes, some of note are:
REBASER_REQUESTS
stream. Due to its current RPC (i.e.request-then-await-response) communication pattern, setting up the
Rebaser in this way makes it operate more like Pinga and Veritech.
Future work is likely going to bring back the concept of "activities"
that flow for interested parties to follow, however prior to this
commit, the only users of the activity stream were callers to the
Rebaser and the Rebaser server itself.
lib/rebaser-client
which abstracts the NATS communication and theon-the-wire request formatting details, ideally allowing us to evolve
the Rebaser's messaging internally where possible.
lib/rebaser-core
crate in theapi_types
module. Several Rusttraits are provided that when implemented gives the user a versioned
message that can be upgraded and understands its serialization format
independently of versioning. This should allow us to evolve the
Rebasers request and response messages with both the client and server
able to detect unsupported message versions without having to
deserialize the message payload itself. Big props to @jhelwig's work
on version graph snapshots for the inspiration.
metadata header in the NATS message including:
X-CONTENT-TYPE
: describes the serialization format in a contenttype/MIME header compatible string. Current values are
application/json
andapplication/cbor
withapplication/cbor
the current default.
X-MESSAGE-TYPE
: describes the message type, typically the primaryRust name. Current values are
EnqueueUpdatesRequest
andEnqueueUpdatesResponse
.X-MESSAGE-VERSION
: a positive integer, where a greater numbervalue implies a newer version. It is likely that code that would
understand version 5 of a message may be able to understand version
4, 3, 2, 1 but not 6 as an example.
NATS-MSG-ID
: this is a standard NATS header which can be used formessage de-duplication and is populated with a random
at-request-time
Ulid
value, managed via aRequestId
type(required for all API data types).