diff options
| author | Xe Iaso <me@xeiaso.net> | 2023-06-01 09:21:04 -0400 |
|---|---|---|
| committer | Xe Iaso <me@xeiaso.net> | 2023-06-01 09:21:04 -0400 |
| commit | ebaae2d18751ed41a379c8dbe277117e1741519e (patch) | |
| tree | 886a5d7e76d8b5c43d983793a3a36c6ca86e90cc /internal | |
| parent | b7f6ae4f45a2d43cc7dde92db414a273914aa410 (diff) | |
| download | x-ebaae2d18751ed41a379c8dbe277117e1741519e.tar.xz x-ebaae2d18751ed41a379c8dbe277117e1741519e.zip | |
internal/bundler: add bundler package that uses Go generics
Signed-off-by: Xe Iaso <me@xeiaso.net>
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/bundler/bundler.go | 401 | ||||
| -rw-r--r-- | internal/bundler/bundler_test.go | 36 |
2 files changed, 437 insertions, 0 deletions
diff --git a/internal/bundler/bundler.go b/internal/bundler/bundler.go new file mode 100644 index 0000000..7c623b5 --- /dev/null +++ b/internal/bundler/bundler.go @@ -0,0 +1,401 @@ +// Copyright 2016 Google LLC. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package bundler supports bundling (batching) of items. Bundling amortizes an +// action with fixed costs over multiple items. For example, if an API provides +// an RPC that accepts a list of items as input, but clients would prefer +// adding items one at a time, then a Bundler can accept individual items from +// the client and bundle many of them into a single RPC. +// +// This package is experimental and subject to change without notice. +package bundler + +import ( + "context" + "errors" + "sync" + "time" + + "golang.org/x/sync/semaphore" +) + +type mode int + +const ( + DefaultDelayThreshold = time.Second + DefaultBundleCountThreshold = 10 + DefaultBundleByteThreshold = 1e6 // 1M + DefaultBufferedByteLimit = 1e9 // 1G +) + +const ( + none mode = iota + add + addWait +) + +var ( + // ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit. + ErrOverflow = errors.New("bundler reached buffered byte limit") + + // ErrOversizedItem indicates that an item's size exceeds the maximum bundle size. + ErrOversizedItem = errors.New("item size exceeds bundle byte limit") + + // errMixedMethods indicates that mutually exclusive methods has been + // called subsequently. + errMixedMethods = errors.New("calls to Add and AddWait cannot be mixed") +) + +// A Bundler collects items added to it into a bundle until the bundle +// exceeds a given size, then calls a user-provided function to handle the +// bundle. +// +// The exported fields are only safe to modify prior to the first call to Add +// or AddWait. +type Bundler[T any] struct { + // Starting from the time that the first message is added to a bundle, once + // this delay has passed, handle the bundle. The default is DefaultDelayThreshold. + DelayThreshold time.Duration + + // Once a bundle has this many items, handle the bundle. Since only one + // item at a time is added to a bundle, no bundle will exceed this + // threshold, so it also serves as a limit. The default is + // DefaultBundleCountThreshold. + BundleCountThreshold int + + // Once the number of bytes in current bundle reaches this threshold, handle + // the bundle. The default is DefaultBundleByteThreshold. This triggers handling, + // but does not cap the total size of a bundle. + BundleByteThreshold int + + // The maximum size of a bundle, in bytes. Zero means unlimited. + BundleByteLimit int + + // The maximum number of bytes that the Bundler will keep in memory before + // returning ErrOverflow. The default is DefaultBufferedByteLimit. + BufferedByteLimit int + + // The maximum number of handler invocations that can be running at once. + // The default is 1. + HandlerLimit int + + handler func([]T) // called to handle a bundle + + mu sync.Mutex // guards access to fields below + flushTimer *time.Timer // implements DelayThreshold + handlerCount int // # of bundles currently being handled (i.e. handler is invoked on them) + sem *semaphore.Weighted // enforces BufferedByteLimit + semOnce sync.Once // guards semaphore initialization + // The current bundle we're adding items to. Not yet in the queue. + // Appended to the queue once the flushTimer fires or the bundle + // thresholds/limits are reached. If curBundle is nil and tail is + // not, we first try to add items to tail. Once tail is full or handled, + // we create a new curBundle for the incoming item. + curBundle *bundle[T] + // The next bundle in the queue to be handled. Nil if the queue is + // empty. + head *bundle[T] + // The last bundle in the queue to be handled. Nil if the queue is + // empty. If curBundle is nil and tail isn't, we attempt to add new + // items to the tail until if becomes full or has been passed to the + // handler. + tail *bundle[T] + curFlush *sync.WaitGroup // counts outstanding bundles since last flush + prevFlush chan bool // signal used to wait for prior flush + + // The first call to Add or AddWait, mode will be add or addWait respectively. + // If there wasn't call yet then mode is none. + mode mode + // TODO: consider alternative queue implementation for head/tail bundle. see: + // https://code-review.googlesource.com/c/google-api-go-client/+/47991/4/support/bundler/bundler.go#74 +} + +// A bundle is a group of items that were added individually and will be passed +// to a handler as a slice. +type bundle[T any] struct { + items []T // slice of T + size int // size in bytes of all items + next *bundle[T] // bundles are handled in order as a linked list queue + flush *sync.WaitGroup // the counter that tracks flush completion +} + +// add appends item to this bundle and increments the total size. It requires +// that b.mu is locked. +func (bu *bundle[T]) add(item T, size int) { + bu.items = append(bu.items, item) + bu.size += size +} + +// New creates a new Bundler. +// +// handler is a function that will be called on each bundle. If itemExample is +// of type T, the argument to handler is of type []T. handler is always called +// sequentially for each bundle, and never in parallel. +// +// Configure the Bundler by setting its thresholds and limits before calling +// any of its methods. +func New[T any](handler func([]T)) *Bundler[T] { + b := &Bundler[T]{ + DelayThreshold: DefaultDelayThreshold, + BundleCountThreshold: DefaultBundleCountThreshold, + BundleByteThreshold: DefaultBundleByteThreshold, + BufferedByteLimit: DefaultBufferedByteLimit, + HandlerLimit: 1, + + handler: handler, + curFlush: &sync.WaitGroup{}, + } + return b +} + +func (b *Bundler[T]) initSemaphores() { + // Create the semaphores lazily, because the user may set limits + // after NewBundler. + b.semOnce.Do(func() { + b.sem = semaphore.NewWeighted(int64(b.BufferedByteLimit)) + }) +} + +// enqueueCurBundle moves curBundle to the end of the queue. The bundle may be +// handled immediately if we are below HandlerLimit. It requires that b.mu is +// locked. +func (b *Bundler[T]) enqueueCurBundle() { + // We don't require callers to check if there is a pending bundle. It + // may have already been appended to the queue. If so, return early. + if b.curBundle == nil { + return + } + // If we are below the HandlerLimit, the queue must be empty. Handle + // immediately with a new goroutine. + if b.handlerCount < b.HandlerLimit { + b.handlerCount++ + go b.handle(b.curBundle) + } else if b.tail != nil { + // There are bundles on the queue, so append to the end + b.tail.next = b.curBundle + b.tail = b.curBundle + } else { + // The queue is empty, so initialize the queue + b.head = b.curBundle + b.tail = b.curBundle + } + b.curBundle = nil + if b.flushTimer != nil { + b.flushTimer.Stop() + b.flushTimer = nil + } +} + +// setMode sets the state of Bundler's mode. If mode was defined before +// and passed state is different from it then return an error. +func (b *Bundler[T]) setMode(m mode) error { + b.mu.Lock() + defer b.mu.Unlock() + if b.mode == m || b.mode == none { + b.mode = m + return nil + } + return errMixedMethods +} + +// canFit returns true if bu can fit an additional item of size bytes based +// on the limits of Bundler b. +func (b *Bundler[T]) canFit(bu *bundle[T], size int) bool { + return (b.BundleByteLimit <= 0 || bu.size+size <= b.BundleByteLimit) && + (b.BundleCountThreshold <= 0 || len(bu.items) < b.BundleCountThreshold) +} + +// Add adds item to the current bundle. It marks the bundle for handling and +// starts a new one if any of the thresholds or limits are exceeded. +// The type of item must be assignable to the itemExample parameter of the NewBundler +// method, otherwise there will be a panic. +// +// If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then +// the item can never be handled. Add returns ErrOversizedItem in this case. +// +// If adding the item would exceed the maximum memory allowed +// (Bundler.BufferedByteLimit) or an AddWait call is blocked waiting for +// memory, Add returns ErrOverflow. +// +// Add never blocks. +func (b *Bundler[T]) Add(item T, size int) error { + if err := b.setMode(add); err != nil { + return err + } + // If this item exceeds the maximum size of a bundle, + // we can never send it. + if b.BundleByteLimit > 0 && size > b.BundleByteLimit { + return ErrOversizedItem + } + + // If adding this item would exceed our allotted memory + // footprint, we can't accept it. + // (TryAcquire also returns false if anything is waiting on the semaphore, + // so calls to Add and AddWait shouldn't be mixed.) + b.initSemaphores() + if !b.sem.TryAcquire(int64(size)) { + return ErrOverflow + } + + b.mu.Lock() + defer b.mu.Unlock() + return b.add(item, size) +} + +// add adds item to the tail of the bundle queue or curBundle depending on space +// and nil-ness (see inline comments). It marks curBundle for handling (by +// appending it to the queue) if any of the thresholds or limits are exceeded. +// curBundle is lazily initialized. It requires that b.mu is locked. +func (b *Bundler[T]) add(item T, size int) error { + // If we don't have a curBundle, see if we can add to the queue tail. + if b.tail != nil && b.curBundle == nil && b.canFit(b.tail, size) { + b.tail.add(item, size) + return nil + } + + // If we can't fit in the existing curBundle, move it onto the queue. + if b.curBundle != nil && !b.canFit(b.curBundle, size) { + b.enqueueCurBundle() + } + + // Create a curBundle if we don't have one. + if b.curBundle == nil { + b.curFlush.Add(1) + b.curBundle = &bundle[T]{ + items: []T{}, + flush: b.curFlush, + } + } + + // Add the item. + b.curBundle.add(item, size) + + // If curBundle is ready for handling, move it to the queue. + if b.curBundle.size >= b.BundleByteThreshold || + len(b.curBundle.items) == b.BundleCountThreshold { + b.enqueueCurBundle() + } + + // If we created a new bundle and it wasn't immediately handled, set a timer + if b.curBundle != nil && b.flushTimer == nil { + b.flushTimer = time.AfterFunc(b.DelayThreshold, b.tryHandleBundles) + } + + return nil +} + +// tryHandleBundles is the timer callback that handles or queues any current +// bundle after DelayThreshold time, even if the bundle isn't completely full. +func (b *Bundler[T]) tryHandleBundles() { + b.mu.Lock() + b.enqueueCurBundle() + b.mu.Unlock() +} + +// next returns the next bundle that is ready for handling and removes it from +// the internal queue. It requires that b.mu is locked. +func (b *Bundler[T]) next() *bundle[T] { + if b.head == nil { + return nil + } + out := b.head + b.head = b.head.next + if b.head == nil { + b.tail = nil + } + out.next = nil + return out +} + +// handle calls the user-specified handler on the given bundle. handle is +// intended to be run as a goroutine. After the handler returns, we update the +// byte total. handle continues processing additional bundles that are ready. +// If no more bundles are ready, the handler count is decremented and the +// goroutine ends. +func (b *Bundler[T]) handle(bu *bundle[T]) { + for bu != nil { + b.handler(bu.items) + bu = b.postHandle(bu) + } +} + +func (b *Bundler[T]) postHandle(bu *bundle[T]) *bundle[T] { + b.mu.Lock() + defer b.mu.Unlock() + + b.sem.Release(int64(bu.size)) + bu.flush.Done() + + bu = b.next() + if bu == nil { + b.handlerCount-- + } + return bu +} + +// AddWait adds item to the current bundle. It marks the bundle for handling and +// starts a new one if any of the thresholds or limits are exceeded. +// +// If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then +// the item can never be handled. AddWait returns ErrOversizedItem in this case. +// +// If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit), +// AddWait blocks until space is available or ctx is done. +// +// Calls to Add and AddWait should not be mixed on the same Bundler. +func (b *Bundler[T]) AddWait(ctx context.Context, item T, size int) error { + if err := b.setMode(addWait); err != nil { + return err + } + // If this item exceeds the maximum size of a bundle, + // we can never send it. + if b.BundleByteLimit > 0 && size > b.BundleByteLimit { + return ErrOversizedItem + } + // If adding this item would exceed our allotted memory footprint, block + // until space is available. The semaphore is FIFO, so there will be no + // starvation. + b.initSemaphores() + if err := b.sem.Acquire(ctx, int64(size)); err != nil { + return err + } + + b.mu.Lock() + defer b.mu.Unlock() + return b.add(item, size) +} + +// Flush invokes the handler for all remaining items in the Bundler and waits +// for it to return. +func (b *Bundler[T]) Flush() { + b.mu.Lock() + + // If a curBundle is pending, move it to the queue. + b.enqueueCurBundle() + + // Store a pointer to the WaitGroup that counts outstanding bundles + // in the current flush and create a new one to track the next flush. + wg := b.curFlush + b.curFlush = &sync.WaitGroup{} + + // Flush must wait for all prior, outstanding flushes to complete. + // We use a channel to communicate completion between each flush in + // the sequence. + prev := b.prevFlush + next := make(chan bool) + b.prevFlush = next + + b.mu.Unlock() + + // Wait until the previous flush is finished. + if prev != nil { + <-prev + } + + // Wait until this flush is finished. + wg.Wait() + + // Allow the next flush to finish. + close(next) +} diff --git a/internal/bundler/bundler_test.go b/internal/bundler/bundler_test.go new file mode 100644 index 0000000..9a269da --- /dev/null +++ b/internal/bundler/bundler_test.go @@ -0,0 +1,36 @@ +package bundler + +import ( + "testing" +) + +func TestBundler(t *testing.T) { + input := []int{1, 2, 3, 4} + done := false + b := New[int](func(data []int) { + if len(data) != 4 { + t.Errorf("Wanted len(data) == %d, got: %d", len(input), len(data)) + } + + sum := 0 + const wantSum = 10 + for _, i := range data { + sum += i + } + + if sum != wantSum { + t.Errorf("wanted sum of inputs to be %d, got: %d", wantSum, sum) + } + done = true + }) + + for _, i := range input { + b.Add(i, 1) + } + + b.Flush() + + if !done { + t.Fatal("function wasn't called") + } +} |
