Most sinks in Vector batch up events to send in a buffer in order to improve efficiency and increase transmission rates. However, the current implementations of batching suffer from two significant problems:
- Batching sinks allow for configuring only the maximum number of events or the maximum size of the batch (in bytes) but not both. Worse, this is hard-coded into the sink, so that the choice of which one is allowed is made by Vector.
- For sinks that limit batch sizes in bytes, a buffer is considered ready to send when the size reaches or exceeds the given size limit. If the size limit is close to the maximum allow request size for a remote service, this will routinely lead to problems when the flow rate is high enough to avoid triggering a batch timeout.
For sinks that use the BatchSink
framework (PartitionBatchSink
is
similar), events move along the following path:
- Each event is optionally encoded into a more suitable format for
buffering (using
with_flat_map
on the batch sink, reftype Batch::Input
). - The encoded item is pushed into a buffer (in
BatchSink::start_send
). - When that buffer is full, as determined by being at or above the
configured batch size, it is collected into a batch (in
BatchSink::poll_complete
). - The batch is turned into a request (in
trait Service::call
) which will do the final encoding and serialization.
There exist the following buffer types:
Buffer
offers (gzip) compression, but requires the events be serialized before insertion and that serialization must include a terminator. Events are stored and output as one largeVec<u8>
, which allows for easy computation of both sizes.JsonArrayBuffer
internally serializes events into "raw" JSON, which may be then added to another structure after output. Events are stored asserde_json::value::RawValue
which is a newtype over a boxed string, which allows for maintaining a running total of both sizes.MetricBuffer
stores metrics in their structured form, without encoding. Since the target serialization is unknown, this buffer cannot know the total byte size.PartitionBuffer<T, K>
is a meta-buffer, that partitions events into multiple buffers of typeT
.Vec<T>
can only count events, not bytes, as it is agnostic to the size ofT
.
We want to get to the point where as many batching sinks as possible can be configured with either a byte limit or an event limit, or both. Additionally, the batch limits must be inviolable, so that no requests are ever sent that exceed size limits. To accomplish this the following is necessary:
The split configuration types for bytes and events will be rolled into a single configuration combining both attributes. Batch sizes will be limited to which ever maximum is reached first.
The max_size
element is retained for backwards compatibility, and each
sink will use it as a default for the maximum bytes or events depending
on the previous configuration mode. If both max_size
and either of
max_bytes
or max_events
are set, Vector will produce a configuration
error indicating a conflict in the configuration.
struct BatchConfig {
max_bytes: Option<usize>,
max_events: Option<usize>,
max_size: Option<usize>,
timeout_secs: Option<u64>,
}
The current batch buffers always allow events to be inserted, and then check separately if the buffer is "big enough" to be sent. This will be changed into a new signature that indicates if the insertion would push the buffer over its size limits.
#[must_use]
enum PushResult<E> {
Ok,
Overflow(E),
}
trait Batch {
fn push(&mut self, event: Self::Input) -> PushResult<Self::Input>;
}
This will require all the buffers to know their maximum size. This will not be encoded into the trait directly, as each buffer will implement their own creation method to accommodate additional creation parameters, such as the compression mode or partition key.
Does it make sense to move conversion of input events into the
intermediate type Input
into the buffer types instead of using a map
on the sink? This would mean something like a TryInto
trait,
implemented differently for each sink, or parameterizing the buffer with
something the way RetryLogic
works for the sink trait.
Similarly, does it make sense to move serialization of output events
into the trait? Is this possible for MetricBuffer
? If this could be
done for all buffers, it would guarantee availability of the byte size
limit across the board.
The desired goal can be accomplished incrementally as follows:
- Enhance all the buffers to store their maximum size. This will
require replacing the plain
Vec<T>
buffer with a newVecBuffer<T>
type. - Move the "is full" logic from
BatchSink
intotrait Batch
. - Modify
Batch::push
to return an indicator that the item would overflow the buffer. - Modify the sending logic in
impl Sink for BatchSink
to use failure from the new push method to cause a new request to be generated, makingstart_send
exit without consuming the new event. - Merge the separate
BatchBytesConfig
andBatchEventsConfig
into a unifiedBatchConfig
. Buffers that do not support byte size limits (likeMetricBuffer
) would error on creation if they were configured with one.