Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rearchitect rebaser to support more scale #4638

Merged
merged 11 commits into from
Sep 22, 2024

Conversation

fnichol
Copy link
Contributor

@fnichol fnichol commented Sep 19, 2024

A rather large change which changes how a Rebaser server consumes
requests for change sets and when it performs related "dependent values
update" (aka "DVU") runs.

General NATS Jetstream Architecture

A Rebaser server uses 2 NATS Jetstream streams to track its work:

  • REBASER_REQUESTS: as before all requests are consumed from this
    stream and can be thought of as a work queue. In this implementation,
    the stream is a limits-based stream, as the Jetstream consumer model
    is different. As before a request for workspace $wk_id and change
    set $cs_id are published on rebaser.requests.$wk_id.$cs_id.
  • REBASER_TASKS: this is a classic Jetstream work queue stream, but
    where there is only one message per subject and where each message
    represents an exclusive task for a single Rebaser to spin up and run.
    In this case, there are only "process" tasks in which a Rebaser will
    consume the enqueued requests for a change set and run DVUs in serial
    (that is, only one at a time and one after the other as needed). When
    a request message is sent to the REBASER_REQUESTS stream, another
    body-less message is sent to this tasks stream. These messages are
    published on rebaser.tasks.$wk_id.$cs_id.process.

Tasks Stream

On the tasks stream, a single Jetstream consumer is setup to share work
accross multiple Rebasers as clients. When a Rebaser starts to process a
task message it continuously sends AckKind::Progress messages to keep
the message from being ack'd or redelivered. If the task encounters an
error, it can return a Result::Err(_) from its Naxum handler which
will trigger an AckKind::Nack(None) message, causing an immediate
redlivery of the message to another Rebaser.

If a task needs to be interrupted due to a graceful shutdown of a Rebaser
server, the handler it will return an Error::Interupted(_) error which
ensures that the message is nack'd and will be redelivered. If a task
can be cleanly completed, the the handler will return a Result::Ok(_)
which triggers an AckKind::Ack, causing the message to be deleted from
the tasks stream and so will not be ran again.

Requests Stream

When a "process" task is running, a dedicated NATS consumer is created
to exclusively process all requests for a change set in serial (that is,
one at a time). This consumer is known as an "ordered consumer" and is
push-based (rather than the default pull-based consumers). An ordered
consumer is much lighter weight and ephemeral as far as a NATS cluster
is concerned and thus should reduce the stress on NATS when many change
sets are created/active over a short period of time.

This ordered consumer is set up with a timeout that detects when no
message has entered the subject (or no message has been pulled into the
Naxum app) within a period. When this "quiescence" period is seen, this
triggers a specific graceful shutdown of the "process" task where its
exit state is to return Result::Ok(_) and ack the task message. In
this way, change sets which become inactive (that is, no Rebaser request
message) are spun down to conserve resources and allow the Rebaser to
focus only on "active" change sets.

Using an ordered consumer means that we can no longer use a message
ack to delete a processed messages (this is a trick of a work queue
stream). Therefore when a request has been successfully processed, the
message is deleted from the stream using its sequence number. A new
Naxum middleware called PostProcess provides us a way to handle this
delete on an OnSuccess callback. In the OnFailure case we simply
don't delete the message which means the next message is still the first
and only message to process.

Another tracked Tokio task is running alongside this consuming requests
task, called the SerialDvuTask. It is waiting for a
tokio::sync::Notify to fire which is triggered by the
request-consuming Tokio task. If, during the run of a DVU, another
request is processed that requires another DVU run, then the Notify
will be re-enabled. That way when the SerialDvuTask loop comes back to
check, the Notify will be set thus trigger "yet one more" DVU run.

Other Structural Changes

Among other changes, some of note are:

  • The publisher of a Rebaser request send it directly to the
    REBASER_REQUESTS stream. Due to its current RPC (i.e.
    request-then-await-response) communication pattern, setting up the
    Rebaser in this way makes it operate more like Pinga and Veritech.
    Future work is likely going to bring back the concept of "activities"
    that flow for interested parties to follow, however prior to this
    commit, the only users of the activity stream were callers to the
    Rebaser and the Rebaser server itself.
  • A first class client for the Rebaser is provided in
    lib/rebaser-client which abstracts the NATS communication and the
    on-the-wire request formatting details, ideally allowing us to evolve
    the Rebaser's messaging internally where possible.
  • The start of an API messaging framework is provided largely in the
    lib/rebaser-core crate in the api_types module. Several Rust
    traits are provided that when implemented gives the user a versioned
    message that can be upgraded and understands its serialization format
    independently of versioning. This should allow us to evolve the
    Rebasers request and response messages with both the client and server
    able to detect unsupported message versions without having to
    deserialize the message payload itself. Big props to @jhelwig's work
    on version graph snapshots for the inspiration.
  • Rebaser request and response messages are transmitted with several
    metadata header in the NATS message including:
    • X-CONTENT-TYPE: describes the serialization format in a content
      type/MIME header compatible string. Current values are
      application/json and application/cbor with application/cbor
      the current default.
    • X-MESSAGE-TYPE: describes the message type, typically the primary
      Rust name. Current values are EnqueueUpdatesRequest and
      EnqueueUpdatesResponse.
    • X-MESSAGE-VERSION: a positive integer, where a greater number
      value implies a newer version. It is likely that code that would
      understand version 5 of a message may be able to understand version
      4, 3, 2, 1 but not 6 as an example.
    • NATS-MSG-ID: this is a standard NATS header which can be used for
      message de-duplication and is populated with a random
      at-request-time Ulid value, managed via a RequestId type
      (required for all API data types).

@fnichol
Copy link
Contributor Author

fnichol commented Sep 19, 2024

/try

@fnichol
Copy link
Contributor Author

fnichol commented Sep 21, 2024

/try

Copy link
Contributor

@nickgerace nickgerace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me! I like the naxum-spawned-by-naxum work and it's easy to follow. Clever use of the tasks stream basically being held locks to change sets.

Comment on lines 192 to 205
if has_roots {
loop {
let mut ctx_clone = ctx.clone();
ctx_clone.update_snapshot_to_visibility().await?;
if !ctx_clone
.workspace_snapshot()?
.has_dependent_value_roots()
.await?
{
break;
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally I'd be weary about the potential infinite loop, but this is a test helper, so looks great to me!

lib/dal/src/context.rs Show resolved Hide resolved
Comment on lines 52 to 67
fn call(&mut self, req: jetstream::Message) -> Self::Future {
let parts = req.into_parts();
let head = Arc::new(parts.0.clone());
let message = match <jetstream::Message as MessageHead>::from_parts(parts.0, parts.1) {
Ok(message) => message,
Err(err) => unreachable!(
"NATS Jetstream message from parts should succeed, this is a bug!; error={:?}",
err
),
};

let info = Arc::new(Info::from(
// TODO(fnichol): the middleware here is infallible, but this call could, in theory
// error. There's probably a better alternative here...
message.info().expect("failed to parse message info"),
));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These look like they'd be hit by logic errors caught by local dev and tests. I'm okay with panics here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be able to deal with this differently

lib/rebaser-client/src/lib.rs Show resolved Hide resolved
Comment on lines 195 to 199
// There is one more optional future here which is confirmation from the NATS server that
// our publish was acked. However, the task stream will drop new messages that are
// duplicates and this returns an error on the "ack future". Instead, we'll keep this as
// fire and forget.
self.context.publish(tasks_subject, vec![].into()).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

Comment on lines 366 to 371
// TODO(fnichol): hrm, is this *really* true that we've written to the change set. I mean,
// yes but until a dvu has finished this is an incomplete view?
let mut event = WsEvent::change_set_written(&ctx, change_set_id.into()).await?;
event.set_workspace_pk(workspace_id.into());
event.set_change_set_id(Some(change_set_id.into()));
event.publish_immediately(&ctx).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Smells like a "fuck it we ball, reload that UI" trigger. I'm okay with it for now, but you're right... not necessarily going to be true. Good to call out!

Comment on lines 282 to 322
) -> push::OrderedConfig {
push::OrderedConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw this yesterday and am a bit surprised it comes from push and not pull... just an interesting note lol

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we were suggested that a push consumer would be okay to use here (maybe slightly preferred in our use case). My understanding is that under the covers it uses a core NATS subject, fires all messages that you're subscribed to there and set the client up to subscribe to the single core NATS subject (so old-school NATS firehose style). Kind of neat!

lib/rebaser-server/src/lib.rs Outdated Show resolved Hide resolved
lib/rebaser-server/src/middleware.rs Outdated Show resolved Hide resolved
Comment on lines 81 to 84
debug!(
"to_rebase_address: {}, rebase_batch_address: {}",
to_rebase_workspace_snapshot_address, message.payload.rebase_batch_address
to_rebase_workspace_snapshot_address = %to_rebase_workspace_snapshot_address,
updates_address = %request.updates_address,
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! These fields will be easier to index in Jaegar.

@fnichol fnichol force-pushed the fnichol/rebaser-scale-moar branch 2 times, most recently from 582e2a7 to 17dd07e Compare September 21, 2024 22:43
This change resolves an issue where customizing the `OnSuccess` or
`OnFailure` callbacks would result in a compiler error. Whoops!

Signed-off-by: Fletcher Nichol <[email protected]>
This fix came up while trying to use `tokio-stream`'s `.timeout()`
combinator which is not `Unpin`.

Signed-off-by: Fletcher Nichol <[email protected]>
This new middleware works when processing Jetstream messages and is
useful in situations where `ack`'ing messages isn't possible or
appropriate. The callback for `OnSuccess` and `OnFailure` have access to
the message's `Head` metadata as well as its `Info` metadata.

Signed-off-by: Fletcher Nichol <[email protected]>
@fnichol fnichol force-pushed the fnichol/rebaser-scale-moar branch 2 times, most recently from 011afed to 8c1cd8b Compare September 22, 2024 01:08
@fnichol
Copy link
Contributor Author

fnichol commented Sep 22, 2024

/try

fnichol and others added 2 commits September 21, 2024 19:15
A rather large change which changes how a Rebaser server consumes
requests for change sets and when it performs related "dependent values
update" (aka "DVU") runs.

General NATS Jetstream Architecture
-----------------------------------

A Rebaser server uses 2 NATS Jetstream streams to track its work:

- `REBASER_REQUESTS`: as before all requests are consumed from this
  stream and can be thought of as a work queue. In this implementation,
  the stream is a limits-based stream, as the Jetstream consumer model
  is different. As before a request for workspace `$wk_id` and change
  set `$cs_id` are published on `rebaser.requests.$wk_id.$cs_id`.
- `REBASER_TASKS`: this is a classic Jetstream work queue stream, but
  where there is only *one* message per subject and where each message
  represents an exclusive task for a single Rebaser to spin up and run.
  In this case, there are only "process" tasks in which a Rebaser will
  consume the enqueued requests for a change set and run DVUs in serial
  (that is, only one at a time and one after the other as needed). When
  a request message is sent to the `REBASER_REQUESTS` stream, another
  body-less message is sent to this tasks stream. These messages are
  published on `rebaser.tasks.$wk_id.$cs_id.process`.

Tasks Stream
------------

On the tasks stream, a single Jetstream consumer is setup to share work
accross multiple Rebasers as clients. When a Rebaser starts to process a
task message it continuously sends `AckKind::Progress` messages to keep
the message from being ack'd or redelivered. If the task encounters an
error, it can return a `Result::Err(_)` from its Naxum handler which
will trigger an `AckKind::Nack(None)` message, causing an immediate
redlivery of the message to another Rebaser.

If a task needs to be interrupted due to a graceful shutdown of a Rebaser
server, the handler it will return an `Error::Interupted(_)` error which
ensures that the message is nack'd and will be redelivered. If a task
can be cleanly completed, the the handler will return a `Result::Ok(_)`
which triggers an `AckKind::Ack`, causing the message to be deleted from
the tasks stream and so will *not* be ran again.

Requests Stream
---------------

When a "process" task is running, a dedicated NATS consumer is created
to exclusively process all requests for a change set in serial (that is,
one at a time). This consumer is known as an "ordered consumer" and is
push-based (rather than the default pull-based consumers). An ordered
consumer is much lighter weight and ephemeral as far as a NATS cluster
is concerned and thus should reduce the stress on NATS when many change
sets are created/active over a short period of time.

This ordered consumer is set up with a timeout that detects when no
message has entered the subject (or no message has been pulled into the
Naxum app) within a period. When this "quiescence" period is seen, this
triggers a specific graceful shutdown of the "process" task where its
exit state is to return `Result::Ok(_)` and ack the task message. In
this way, change sets which become inactive (that is, no Rebaser request
message) are spun down to conserve resources and allow the Rebaser to
focus only on "active" change sets.

Using an ordered consumer means that we can no longer use a message
`ack` to delete a processed messages (this is a trick of a work queue
stream). Therefore when a request has been successfully processed, the
message is deleted from the stream using its sequence number. A new
Naxum middleware called `PostProcess` provides us a way to handle this
delete on an `OnSuccess` callback. In the `OnFailure` case we simply
don't delete the message which means the next message is still the first
and only message to process.

Another tracked Tokio task is running alongside this consuming requests
task, called the `SerialDvuTask`. It is waiting for a
`tokio::sync::Notify` to fire which is triggered by the
request-consuming Tokio task. If, during the run of a DVU, another
request is processed that requires another DVU run, then the `Notify`
will be re-enabled. That way when the `SerialDvuTask` loop comes back to
check, the `Notify` will be set thus trigger "yet one more" DVU run.

Other Structural Changes
------------------------

Among other changes, some of note are:

- The publisher of a Rebaser request send it directly to the
  `REBASER_REQUESTS` stream. Due to its current RPC (i.e.
  request-then-await-response) communication pattern, setting up the
  Rebaser in this way makes it operate more like Pinga and Veritech.
  Future work is likely going to bring back the concept of "activities"
  that flow for interested parties to follow, however prior to this
  commit, the only users of the activity stream were callers to the
  Rebaser and the Rebaser server itself.
- A first class client for the Rebaser is provided in
  `lib/rebaser-client` which abstracts the NATS communication and the
  on-the-wire request formatting details, ideally allowing us to evolve
  the Rebaser's messaging internally where possible.
- The start of an API messaging framework is provided largely in the
  `lib/rebaser-core` crate in the `api_types` module. Several Rust
  traits are provided that when implemented gives the user a versioned
  message that can be upgraded and understands its serialization format
  independently of versioning. This should allow us to evolve the
  Rebasers request and response messages with both the client and server
  able to detect unsupported message versions without having to
  deserialize the message payload itself. Big props to @jhelwig's work
  on version graph snapshots for the inspiration.
- Rebaser request and response messages are transmitted with several
  metadata header in the NATS message including:
  - `X-CONTENT-TYPE`: describes the serialization format in a content
    type/MIME header compatible string. Current values are
    `application/json` and `application/cbor` with `application/cbor`
    the current default.
  - `X-MESSAGE-TYPE`: describes the message type, typically the primary
    Rust name. Current values are `EnqueueUpdatesRequest` and
    `EnqueueUpdatesResponse`.
  - `X-MESSAGE-VERSION`: a positive integer, where a greater number
    value implies a newer version. It is likely that code that would
    understand version 5 of a message may be able to understand version
    4, 3, 2, 1 but not 6 as an example.
  - `NATS-MSG-ID`: this is a standard NATS header which can be used for
    message de-duplication and is populated with a random
    at-request-time `Ulid` value, managed via a `RequestId` type
    (required for all API data types).

Signed-off-by: Fletcher Nichol <[email protected]>
@fnichol
Copy link
Contributor Author

fnichol commented Sep 22, 2024

/try

@fnichol fnichol marked this pull request as ready for review September 22, 2024 01:51
Copy link
Contributor

@nickgerace nickgerace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-approved!

@fnichol fnichol added this pull request to the merge queue Sep 22, 2024
Merged via the queue into main with commit 5bb689d Sep 22, 2024
40 checks passed
@fnichol fnichol deleted the fnichol/rebaser-scale-moar branch September 22, 2024 22:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants