-
Notifications
You must be signed in to change notification settings - Fork 0
/
bytearray.go
554 lines (491 loc) · 15.5 KB
/
bytearray.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
// Copyright 2015 Fredrik Lidström. All rights reserved.
// Use of this source code is governed by the standard MIT License (MIT)
// that can be found in the LICENSE file.
// Package bytearray provides a fixed size chunk []byte array with slab allocation.
// ByteArrays utilize next-chunk-linking which provides full reader/writer compatibility
// with any size you need. It uses an automatic chunk allocation with manual deallocation,
// which means that all ByteArrays must be manually released when not used anymore.
// The slab and chunk sizes are globally configurable and must be done before any chunks
// are allocated.
package bytearray
import (
"errors"
"fmt"
"io"
"os"
"runtime"
"sync"
"time"
)
//*******************************************************************************//
// public //
//*******************************************************************************//
const (
SEEK_SET int = os.SEEK_SET // seek relative to the origin of the array
SEEK_CUR int = os.SEEK_CUR // seek relative to the current offset
SEEK_END int = os.SEEK_END // seek relative to the end of the array
)
// ChunkSize defaults to 2048 bytes (2KiB) chunks. Use Setup function to change.
var ChunkSize int = 2048
// SlabSize defaults to 2048 slabs (4MiB memory with default ChunkSize). Use Setup function to change.
var SlabSize int = ChunkSize * 2048
// MaxSlabs defaults to 4096 slabs (16GiB memory with default SlabSize and ChunkSize). Use Setup function to change.
var MaxSlabs int = 4097 // 1 because slab 0 is never used (or allocated), it is reserved for emptyLocation pointers
// Setup the ByteArray global sizes. IMPORTANT, Setup can only be called before any
// chunks have been allocated or it will throw a panic.
func Setup(chunkSize int, slabSize int, maxSlabs int) {
if allocatedSlabs > 0 {
panic(errors.New("bytearray setup called with slabs already allocated"))
}
ChunkSize = chunkSize
SlabSize = slabSize
MaxSlabs = maxSlabs 1 // 1 because slab 0 is never used (or allocated), it is reserved for emptyLocation pointers
slabs = make([]*byteSlab, MaxSlabs) // Fixed array size because we do not want it to move around in memory, ever!
}
var autoGCstop chan struct{}
var autoGCwg sync.WaitGroup
// EnableAutoGC enables the automatic GC goroutine
// releaseInterval is how often the GC is run.
// maxAge is the number of seconds since the slab was touched before it can be freed
func EnableAutoGC(releaseInterval int, maxAge int) {
if autoGCstop == nil {
autoGCstop = make(chan struct{})
autoGCwg.Add(1)
go func() {
for { // ever
select {
case <-time.After(time.Duration(releaseInterval) * time.Second):
GC(maxAge)
case <-autoGCstop:
break
}
}
autoGCwg.Done()
}()
}
}
// DisableAutoGC disables the automatic GC goroutine
func DisableAutoGC() {
close(autoGCstop)
autoGCwg.Wait()
}
// Not really a GC, more of a slab releaser in case it has not been used for a while.
// maxAge is the number of seconds since the slab was touched before it can be freed
func GC(maxAge int) {
memoryMutex.Lock()
defer memoryMutex.Unlock()
if allocatedSlabs > 0 {
for s := range slabs {
//if slabs[s] != nil {
// fmt.Printf("slab %d, free %d, total %d, touched %.2f sec\n", s, slabs[s].free, len(slabs[s].next), time.Since(slabs[s].touched).Seconds())
//}
if slabs[s] != nil && slabs[s].free == len(slabs[s].next) && time.Since(slabs[s].touched).Seconds() >= float64(maxAge) {
deallocateSlab(uint16(s))
runtime.GC()
}
}
}
}
// Stats returns the current allocation statistics
func Stats() (AllocatedSlabs int64, GrabbedChunks int64, ReleasedChunks int64, MemoryAllocated int64, MemoryInUse int64) {
memoryMutex.Lock()
defer memoryMutex.Unlock()
AllocatedSlabs = int64(allocatedSlabs)
GrabbedChunks = grabbedChunks
ReleasedChunks = releasedChunks
MemoryInUse = (GrabbedChunks - ReleasedChunks) * int64(ChunkSize)
MemoryAllocated = AllocatedSlabs * int64(SlabSize)
return
}
// ChunkQuantize takes a size and round it up to an even chunk size (this can be used to calculate used memory)
func ChunkQuantize(size int64) int64 {
return int64(ChunkSize) (size/int64(ChunkSize))*int64(ChunkSize)
}
//*******************************************************************************//
// ByteArray //
//*******************************************************************************//
// Byte array read and write is not concurrency safe however the underlying slab structures
// are so you can use multiple ByteArrays at the same time
type ByteArray struct {
rootChunk uint32 // first chunk of the array
rootOffset int // internal offset of the data, used when splitting arrays
usedBytes int // total of used bytes
writePos position // current writer position
readPos position // current reader position
}
// WritePosition returns the current write position
func (b ByteArray) WritePosition() int {
return b.writePos.current
}
// ReadPosition returns the current write position
func (b ByteArray) ReadPosition() int {
return b.readPos.current
}
// Len returns the current length of the ByteArray
func (b ByteArray) Len() int {
return b.usedBytes
}
// Release will release all chunks associated with the ByteArray
func (b *ByteArray) Release() {
b.Truncate(0)
if b.rootChunk != emptyLocation {
releaseChunk(b.rootChunk)
b.rootChunk = emptyLocation
}
}
// Split a byte array into a new ByteArray at the specified offset, the old byte array will be truncated at the split offset
func (b *ByteArray) Split(offset int) (newArray ByteArray) {
if offset > b.usedBytes {
panic("ASSERT")
}
if b.rootChunk == emptyLocation {
panic("ASSERT")
}
if offset == 0 {
newArray.rootChunk = b.rootChunk
b.rootChunk = emptyLocation
} else if offset == b.usedBytes {
// optimization because no copy or rootOffset is needed
} else if (b.rootOffset offset)%ChunkSize > 0 { // Split inside a chunk
var splitPosition position
splitPosition = b.seek(splitPosition, offset, SEEK_SET)
splitSlice := getSlice(splitPosition)
newArray.rootOffset = splitPosition.chunkPos
newArray.prepare()
newSlice := newArray.WriteSlice()
// Duplicate the split block
copy(newSlice, splitSlice)
setNextLocation(newArray.rootChunk, getNextLocation(splitPosition.chunk))
setNextLocation(splitPosition.chunk, emptyLocation)
} else {
var splitPosition position
splitPosition = b.seek(splitPosition, offset-1, SEEK_SET) // -1 just to get the index of the previous chunk
newArray.rootChunk = getNextLocation(splitPosition.chunk)
setNextLocation(splitPosition.chunk, emptyLocation)
}
newArray.usedBytes = b.usedBytes - offset
newArray.writePos = newArray.seek(newArray.writePos, 0, SEEK_END)
if newArray.readPos.current != 0 {
panic("ASSERT!")
}
//newArray.readPos = newArray.seek(newArray.readPos, 0, SEEK_SET)
b.Truncate(offset)
return
}
// Truncate sets the length, it also expands the length in case offset > usedBytes
func (b *ByteArray) Truncate(offset int) int {
var p position
p = b.seek(p, offset, SEEK_SET)
for next := getNextLocation(p.chunk); next != emptyLocation; {
releaseMe := next
next = getNextLocation(next)
releaseChunk(releaseMe)
}
setNextLocation(p.chunk, emptyLocation)
b.usedBytes = p.current
if b.readPos.current > b.usedBytes {
b.readPos = b.seek(b.readPos, 0, SEEK_END)
}
if b.writePos.current > b.usedBytes {
b.writePos = b.seek(b.writePos, 0, SEEK_END)
}
return b.usedBytes
}
// WriteSeek will allocate and expand bounds if needed
func (b *ByteArray) WriteSeek(offset int, whence int) int {
b.writePos = b.seek(b.writePos, offset, whence)
return b.writePos.current
}
// ReadSeek will check bounds and return EOF error if seeking outside
func (b *ByteArray) ReadSeek(offset int, whence int) (absolute int, err error) {
switch whence {
case SEEK_SET:
absolute = offset
case SEEK_CUR:
absolute = b.readPos.current offset
case SEEK_END:
absolute = b.usedBytes - offset
}
if absolute < 0 {
absolute = 0
err = io.EOF
}
if absolute > b.usedBytes {
absolute = b.usedBytes
err = io.EOF
}
b.readPos = b.seek(b.readPos, absolute, SEEK_SET)
return b.readPos.current, err
}
// ReadSlice returns a byte slice chunk for the current read position (it does not advance read position)
func (b *ByteArray) ReadSlice() ([]byte, error) {
if b.readPos.current >= b.usedBytes {
return nil, io.EOF
}
slice := getSlice(b.readPos)
if len(slice) > b.usedBytes-b.readPos.current {
return slice[:b.usedBytes-b.readPos.current], nil
} else {
return slice, nil
}
}
// Read from the byte array into a buffer and advance the current read position
func (b *ByteArray) Read(p []byte) (n int, err error) {
for n = 0; n < len(p); {
var slice []byte
slice, err = b.ReadSlice()
if slice != nil {
read := copy(p[n:], slice)
b.readPos = b.seek(b.readPos, read, SEEK_CUR)
n = read
} else {
break
}
}
if n < len(p) {
err = io.EOF
}
return n, err
}
// WriteTo writes all ByteArray data (from the current read position) to a writer interface
func (b *ByteArray) WriteTo(w io.Writer) (n int64, err error) {
for b.readPos.current < b.usedBytes {
slice, er := b.ReadSlice()
if slice != nil {
read, err := w.Write(slice)
b.readPos = b.seek(b.readPos, read, SEEK_CUR)
n = int64(read)
if err != nil {
return n, err
}
} else {
if er != io.EOF {
err = er
}
break
}
}
return n, err
}
// WriteSlice returns a byte slice chunk for the current write position (it does not advance write position)
func (b *ByteArray) WriteSlice() []byte {
b.prepare()
return getSlice(b.writePos)
}
// Write to the byte array from a buffer and advance the current write position
func (b *ByteArray) Write(p []byte) (n int, err error) {
for n = 0; n < len(p); {
var slice []byte
slice = b.WriteSlice()
if slice == nil {
panic("ASSERT")
}
written := copy(slice, p[n:])
b.writePos = b.seek(b.writePos, written, SEEK_CUR)
n = written
}
return n, err
}
// ReadFrom reads from the Reader until EOF and fills up the ByteArray (at the current write position)
func (b *ByteArray) ReadFrom(r io.Reader) (n int64, err error) {
for {
slice := b.WriteSlice()
if slice == nil {
panic("ASSERT")
}
written, er := r.Read(slice)
b.writePos = b.seek(b.writePos, written, SEEK_CUR)
n = int64(written)
if er != nil {
if er != io.EOF {
err = er
}
break
}
}
return n, err
}
// internal function to prepare a ByteArray for writing
func (b *ByteArray) prepare() {
if b.rootChunk == emptyLocation {
b.rootChunk = grabChunk()
b.readPos = b.seek(b.readPos, 0, SEEK_SET)
b.writePos = b.seek(b.writePos, 0, SEEK_SET)
}
}
// internal seek function (does not limit on size, it will allocate and grow)
func (b *ByteArray) seek(was position, offset int, whence int) (now position) {
switch whence {
case SEEK_SET:
now.current = offset
case SEEK_CUR:
now.current = was.current offset
case SEEK_END:
now.current = b.usedBytes - offset
}
now.chunkPos = (now.current b.rootOffset) % ChunkSize
now.chunkIX = (now.current b.rootOffset) / ChunkSize
if was.chunkIX == 0 || now.chunkIX < was.chunkIX { // Chunks are only linked forward, so in reverse we need to restart
was.chunkIX = 0
b.prepare()
was.chunk = b.rootChunk
}
now.chunk = was.chunk
for was.chunkIX < now.chunkIX {
if now.chunk == emptyLocation {
panic("ASSERT")
}
if getNextLocation(now.chunk) == emptyLocation {
now.chunk = appendChunk(now.chunk)
} else {
now.chunk = getNextLocation(now.chunk)
}
was.chunkIX
}
if now.current > b.usedBytes {
b.usedBytes = now.current
}
return now
}
//*******************************************************************************//
// internals //
//*******************************************************************************//
type position struct {
current int
chunkIX int
chunkPos int
chunk uint32
}
type byteSlab struct {
memory []byte
next []uint32
used []bool // Only used for ASSERT checking, should be removed
free int
touched time.Time
}
const emptyLocation uint32 = 0 // Location is empty
var slabs []*byteSlab = make([]*byteSlab, MaxSlabs)
var freeChunk uint32 = emptyLocation
var allocatedSlabs uint32
var grabbedChunks int64
var releasedChunks int64
var memoryMutex sync.Mutex
type byteChunkLocation uint32 // upper 16bit is slabIndex, lower 16bit is chunkIndex
func getChunkLocation(chunk uint32) (slabIndex, chunkIndex uint16) {
return uint16(chunk >> 16), uint16(chunk)
}
func setChunkLocation(slabIndex, chunkIndex uint16) uint32 {
return uint32(slabIndex)<<16 | uint32(chunkIndex)
}
func getNextLocation(chunk uint32) uint32 {
if chunk == emptyLocation {
return emptyLocation
} else {
return slabs[(chunk>>16)&0xffff].next[(chunk & 0xffff)]
}
}
func setNextLocation(chunk uint32, next uint32) {
if chunk == emptyLocation {
panic("ASSERT!")
}
slabs[(chunk>>16)&0xffff].next[(chunk & 0xffff)] = next
}
// getSlice gets a byte slice for a chunk position
func getSlice(p position) []byte {
s, i := getChunkLocation(p.chunk)
bufStart := int(i)*ChunkSize p.chunkPos
bufLen := ChunkSize - p.chunkPos
return slabs[s].memory[bufStart : bufStart bufLen]
}
// appendChunk adds a chunk to the chain after the "after" chunk
func appendChunk(after uint32) (newChunk uint32) {
if getNextLocation(after) != emptyLocation {
panic("ASSERT!")
}
newChunk = grabChunk()
setNextLocation(newChunk, getNextLocation(after))
setNextLocation(after, newChunk)
return newChunk
}
func allocateSlab() (ix uint16) {
slab := &byteSlab{
memory: make([]byte, SlabSize),
next: make([]uint32, SlabSize/ChunkSize),
used: make([]bool, SlabSize/ChunkSize), // Only used for ASSERT checking, should be removed
}
ix = 1
for ; slabs[ix] != nil; ix {
}
slabs[ix] = slab
allocatedSlabs
for i, _ := range slab.next {
slabs[ix].used[i] = false
slabs[ix].free
release := setChunkLocation(uint16(ix), uint16(i))
setNextLocation(release, freeChunk)
freeChunk = release
}
return ix
}
func deallocateSlab(ix uint16) {
for i := range slabs[ix].used {
if slabs[ix].used[i] {
panic("ASSERT: Deallocate on a slab that has a chunk in use")
}
}
this, last := freeChunk, emptyLocation
freeChunk = emptyLocation
for ; this != emptyLocation || last != emptyLocation; this = getNextLocation(this) {
s, _ := getChunkLocation(this)
if s != ix {
if last != emptyLocation {
setNextLocation(last, this)
}
last = this
if freeChunk == emptyLocation {
freeChunk = this
}
} else {
slabs[ix].free--
}
}
if slabs[ix].free > 0 {
panic("ASSERT: Unable to remove all chunks from free-chain")
}
slabs[ix] = nil
allocatedSlabs--
}
// grab a free chunk
func grabChunk() uint32 {
memoryMutex.Lock()
defer memoryMutex.Unlock()
if freeChunk == emptyLocation {
allocateSlab()
}
grabbed := freeChunk
freeChunk = getNextLocation(grabbed)
grabbedChunks
s, i := getChunkLocation(grabbed)
if slabs[s].used[i] {
panic(fmt.Sprintf("ASSERT: Grabbing chunk already in use %x", grabbed))
}
slabs[s].used[i] = true
slabs[s].free--
slabs[s].touched = time.Now()
setNextLocation(grabbed, emptyLocation)
return grabbed
}
func releaseChunk(release uint32) {
memoryMutex.Lock()
defer memoryMutex.Unlock()
s, i := getChunkLocation(release)
if !slabs[s].used[i] {
panic(fmt.Sprintf("ASSERT: Releasing chunk not in use %x", release))
}
slabs[s].used[i] = false
slabs[s].free
slabs[s].touched = time.Now()
setNextLocation(release, freeChunk)
freeChunk = release
releasedChunks
}