Skip to content

Commit

Permalink
[dbnode] Optimize snapshotting which can speedup bootstrapping consid…
Browse files Browse the repository at this point in the history
…erably (m3db#4093)
  • Loading branch information
robskillington authored Apr 4, 2022
1 parent 866fa71 commit ef226b2
Show file tree
Hide file tree
Showing 24 changed files with 493 additions and 139 deletions.
14 changes: 14 additions & 0 deletions src/dbnode/encoding/encoding_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/dbnode/encoding/m3tsz/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 326,11 @@ func (enc *encoder) LastAnnotationChecksum() (uint64, error) {
return enc.tsEncoderState.PrevAnnotationChecksum, nil
}

// Empty returns true when underlying stream is empty.
func (enc *encoder) Empty() bool {
return enc.os.Empty()
}

// Len returns the length of the final data stream that would be generated
// by a call to Stream().
func (enc *encoder) Len() int {
Expand Down
8 changes: 8 additions & 0 deletions src/dbnode/encoding/m3tsz/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 483,14 @@ func TestEncoderLenReturnsFinalStreamLength(t *testing.T) {
})
}

func TestEncoderEmpty(t *testing.T) {
testMultiplePasses(t, multiplePassesTest{
postEncodeAll: func(enc *encoder, numDatapointsEncoded int) {
assert.Equal(t, numDatapointsEncoded == 0, enc.Empty())
},
})
}

type testFinalizer struct {
t *testing.T
numActiveStreams *atomic.Int32
Expand Down
3 changes: 3 additions & 0 deletions src/dbnode/encoding/null.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 42,19 @@ func NewNullEncoder() Encoder {
func (e *nullEncoder) Encode(dp ts.Datapoint, timeUnit xtime.Unit, annotation ts.Annotation) error {
return nil
}

func (e *nullEncoder) Stream(ctx context.Context) (xio.SegmentReader, bool) {
return nil, false
}
func (e *nullEncoder) NumEncoded() int { return 0 }
func (e *nullEncoder) LastEncoded() (ts.Datapoint, error) {
return ts.Datapoint{}, fmt.Errorf("not implemented")
}

func (e *nullEncoder) LastAnnotationChecksum() (uint64, error) {
return 0, fmt.Errorf("not implemented")
}
func (e *nullEncoder) Empty() bool { return true }
func (e *nullEncoder) Len() int { return 0 }
func (e *nullEncoder) Seal() { e.sealed = true }
func (e *nullEncoder) Reset(xtime.UnixNano, int, namespace.SchemaDescr) {}
Expand Down
7 changes: 6 additions & 1 deletion src/dbnode/encoding/proto/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 297,7 @@ func (enc *Encoder) LastEncoded() (ts.Datapoint, error) {
return enc.lastEncodedDP, nil
}

// LastAnnotation returns the checksum of the last encoded annotation (which contain the bytes
// LastAnnotationChecksum returns the checksum of the last encoded annotation (which contain the bytes
// used for ProtoBuf data).
func (enc *Encoder) LastAnnotationChecksum() (uint64, error) {
if enc.numEncoded == 0 {
Expand All @@ -312,6 312,11 @@ func (enc *Encoder) Len() int {
return enc.stream.Len()
}

// Empty returns true when underlying stream is empty.
func (enc *Encoder) Empty() bool {
return enc.stream.Empty()
}

// Stats returns EncoderStats which contain statistics about the encoders compression
// ratio.
func (enc *Encoder) Stats() EncoderStats {
Expand Down
3 changes: 3 additions & 0 deletions src/dbnode/encoding/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 69,9 @@ type Encoder interface {
// an error is returned.
LastAnnotationChecksum() (uint64, error)

// Empty returns true when encoder is considered empty.
Empty() bool

// Len returns the length of the encoded stream as returned by a call to Stream().
Len() int

Expand Down
7 changes: 7 additions & 0 deletions src/dbnode/storage/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 128,13 @@ func (b *dbBlock) LastReadTime() xtime.UnixNano {
return xtime.UnixNano(atomic.LoadInt64(&b.lastReadUnixNanos))
}

func (b *dbBlock) Empty() bool {
b.RLock()
empty := b.length == 0
b.RUnlock()
return empty
}

func (b *dbBlock) Len() int {
b.RLock()
length := b.length
Expand Down
14 changes: 14 additions & 0 deletions src/dbnode/storage/block/block_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/dbnode/storage/block/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 151,9 @@ type DatabaseBlock interface {
// LastReadTime returns the last read time of the block.
LastReadTime() xtime.UnixNano

// Empty returns true iff block is empty.
Empty() bool

// Len returns the block length.
Len() int

Expand Down
23 changes: 9 additions & 14 deletions src/dbnode/storage/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 38,7 @@ import (
"go.uber.org/zap"
)

var (
errFlushOperationsInProgress = errors.New("flush operations already in progress")
)
var errFlushOperationsInProgress = errors.New("flush operations already in progress")

type flushManagerState int

Expand Down Expand Up @@ -224,17 222,14 @@ func (m *flushManager) dataSnapshot(
if len(snapshotBlockStarts) > maxBlocksSnapshottedByNamespace {
maxBlocksSnapshottedByNamespace = len(snapshotBlockStarts)
}
for _, snapshotBlockStart := range snapshotBlockStarts {
err := ns.Snapshot(
snapshotBlockStart, startTime, snapshotPersist)

if err != nil {
detailedErr := fmt.Errorf(
"namespace %s failed to snapshot data for blockStart %s: %v",
ns.ID().String(), snapshotBlockStart.String(), err)
multiErr = multiErr.Add(detailedErr)
continue
}

err := ns.Snapshot(snapshotBlockStarts, startTime, snapshotPersist)
if err != nil {
detailedErr := fmt.Errorf(
"namespace %s failed to snapshot data for some blocks: %w",
ns.ID().String(), err)
multiErr = multiErr.Add(detailedErr)
continue
}
}
m.metrics.maxBlocksSnapshottedByNamespace.Update(float64(maxBlocksSnapshottedByNamespace))
Expand Down
11 changes: 7 additions & 4 deletions src/dbnode/storage/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,12 573,15 @@ func TestFlushManagerFlushSnapshot(t *testing.T) {
ns.EXPECT().NeedsFlush(st, st).Return(false, nil)
}

snapshotEnd := now.Add(bufferFuture).Truncate(blockSize)
var (
snapshotEnd = now.Add(bufferFuture).Truncate(blockSize)
snapshotBlocks []xtime.UnixNano
)
num = numIntervals(start, snapshotEnd, blockSize)
for i := 0; i < num; i {
st := start.Add(time.Duration(i) * blockSize)
ns.EXPECT().Snapshot(st, now, gomock.Any())
for i := num - 1; i >= 0; i-- {
snapshotBlocks = append(snapshotBlocks, start.Add(time.Duration(i)*blockSize))
}
ns.EXPECT().Snapshot(snapshotBlocks, now, gomock.Any())
}

require.NoError(t, fm.Flush(now))
Expand Down
28 changes: 17 additions & 11 deletions src/dbnode/storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -1343,7 1343,7 @@ func (n *dbNamespace) FlushIndex(flush persist.IndexFlush) error {
}

func (n *dbNamespace) Snapshot(
blockStart xtime.UnixNano,
blockStarts []xtime.UnixNano,
snapshotTime xtime.UnixNano,
snapshotPersist persist.SnapshotPreparer,
) error {
Expand Down Expand Up @@ -1374,21 1374,27 @@ func (n *dbNamespace) Snapshot(
seriesPersist int
multiErr xerrors.MultiError
)

for _, shard := range n.OwnedShards() {
log := n.log.With(zap.Uint32("shard", shard.ID()))
if !shard.IsBootstrapped() {
n.log.
With(zap.Uint32("shard", shard.ID())).
Debug("skipping snapshot due to shard not bootstrapped yet")
log.Debug("skipping snapshot due to shard not bootstrapped yet")
continue
}
result, err := shard.Snapshot(blockStart, snapshotTime, snapshotPersist, nsCtx)
if err != nil {
detailedErr := fmt.Errorf("shard %d failed to snapshot: %v", shard.ID(), err)
multiErr = multiErr.Add(detailedErr)
// Continue with remaining shards
snapshotBlockStarts := shard.FilterBlocksNeedSnapshot(blockStarts)
if len(snapshotBlockStarts) == 0 {
log.Debug("skipping shard snapshot since no blocks need it")
continue
}
for _, blockStart := range snapshotBlockStarts {
snapshotResult, err := shard.Snapshot(blockStart, snapshotTime, snapshotPersist, nsCtx)
if err != nil {
detailedErr := fmt.Errorf("shard %d failed to snapshot %v block: %w", shard.ID(), blockStart, err)
multiErr = multiErr.Add(detailedErr)
continue
}
seriesPersist = snapshotResult.SeriesPersist
}

seriesPersist = result.SeriesPersist
}

n.metrics.snapshotSeriesPersist.Inc(int64(seriesPersist))
Expand Down
83 changes: 81 additions & 2 deletions src/dbnode/storage/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 699,7 @@ func TestNamespaceSnapshotNotBootstrapped(t *testing.T) {

blockSize := ns.Options().RetentionOptions().BlockSize()
blockStart := xtime.Now().Truncate(blockSize)
require.Equal(t, errNamespaceNotBootstrapped, ns.Snapshot(blockStart, blockStart, nil))
require.Equal(t, errNamespaceNotBootstrapped, ns.Snapshot([]xtime.UnixNano{blockStart}, blockStart, nil))
}

func TestNamespaceSnapshotAllShardsSuccess(t *testing.T) {
Expand Down Expand Up @@ -763,6 763,74 @@ func TestNamespaceSnapshotShardSkipNotBootstrapped(t *testing.T) {
require.NoError(t, testSnapshotWithShardSnapshotErrs(t, shardMethodResults))
}

func TestNamespaceSnapshotShardBlockFiltered(t *testing.T) {
var (
ctrl = xtest.NewController(t)
ctx = context.NewBackground()
now = xtime.Now()
ns, closer = newTestNamespaceWithIDOpts(t, defaultTestNs1ID,
namespace.NewOptions().SetSnapshotEnabled(true))
)
defer func() {
ctrl.Finish()
ctx.Close()
closer()
}()

var (
shardBootstrapStates = ShardBootstrapStates{}
blockSize = ns.Options().RetentionOptions().BlockSize()
block1 = now.Truncate(blockSize)
block2 = block1.Truncate(blockSize)
blocks = []xtime.UnixNano{block2, block1}
filteredBlocks = []xtime.UnixNano{block1}
)

ns.bootstrapState = Bootstrapped
ns.nowFn = func() time.Time { return now.ToTime() }
shard := newTestShard(ctrl)
ns.shards[shard.ID()] = shard
shardBootstrapStates[shard.ID()] = Bootstrapped
shard.EXPECT().FilterBlocksNeedSnapshot(blocks).Return(filteredBlocks)
shard.EXPECT().
Snapshot(block1, now, gomock.Any(), gomock.Any()).
Return(ShardSnapshotResult{}, nil)

err := ns.Snapshot(blocks, now, nil)
require.NoError(t, err)
}

func TestNamespaceSnapshotShardBlockAllShardsFiltered(t *testing.T) {
var (
ctrl = xtest.NewController(t)
ctx = context.NewBackground()
now = xtime.Now()
ns, closer = newTestNamespaceWithIDOpts(t, defaultTestNs1ID,
namespace.NewOptions().SetSnapshotEnabled(true))
)
defer func() {
ctrl.Finish()
ctx.Close()
closer()
}()

var (
shardBootstrapStates = ShardBootstrapStates{}
blockSize = ns.Options().RetentionOptions().BlockSize()
blocks = []xtime.UnixNano{now.Truncate(blockSize)}
)

ns.bootstrapState = Bootstrapped
ns.nowFn = func() time.Time { return now.ToTime() }
shard := newTestShard(ctrl)
ns.shards[shard.ID()] = shard
shardBootstrapStates[shard.ID()] = Bootstrapped
shard.EXPECT().FilterBlocksNeedSnapshot(blocks).Return([]xtime.UnixNano{})

err := ns.Snapshot(blocks, now, nil)
require.NoError(t, err)
}

func testSnapshotWithShardSnapshotErrs(
t *testing.T,
shardMethodResults []snapshotTestCase,
Expand Down Expand Up @@ -796,6 864,9 @@ func testSnapshotWithShardSnapshotErrs(
if !tc.isBootstrapped {
continue
}
shard.EXPECT().
FilterBlocksNeedSnapshot([]xtime.UnixNano{blockStart}).
Return([]xtime.UnixNano{blockStart})
if tc.expectSnapshot {
shard.EXPECT().
Snapshot(blockStart, now, gomock.Any(), gomock.Any()).
Expand All @@ -805,7 876,7 @@ func testSnapshotWithShardSnapshotErrs(
shardBootstrapStates[shardID] = tc.shardBootstrapStateBeforeTick
}

return ns.Snapshot(blockStart, now, nil)
return ns.Snapshot([]xtime.UnixNano{blockStart}, now, nil)
}

func TestNamespaceTruncate(t *testing.T) {
Expand Down Expand Up @@ -1780,3 1851,11 @@ func contains(c []uint32, x uint32) bool {
}
return false
}

func newTestShard(ctrl *gomock.Controller) *MockdatabaseShard {
s := NewMockdatabaseShard(ctrl)
shardID := testShardIDs[0].ID()
s.EXPECT().ID().Return(shardID).AnyTimes()
s.EXPECT().IsBootstrapped().Return(true)
return s
}
Loading

0 comments on commit ef226b2

Please sign in to comment.