diff --git a/packages/fibers/src/csp.ts b/packages/fibers/src/csp.ts index c4f5a9df7e..48c5158dcf 100644 --- a/packages/fibers/src/csp.ts +++ b/packages/fibers/src/csp.ts @@ -1,4 +1,5 @@ import { isNumber } from "@thi.ng/checks"; +import { assert } from "@thi.ng/errors/assert"; import type { FiberOpts, IReadWriteBuffer } from "./api.js"; import { Fiber, fiber } from "./fiber.js"; @@ -169,34 +170,48 @@ export const channel = ( opts?: Partial ) => new Channel(buffer, opts); +/** + * First-in, first-out ring buffer implementation for use with {@link Channel}. + */ export class FIFOBuffer implements IReadWriteBuffer { - protected buf: T[] = []; + protected buf: (T | undefined)[]; + protected rpos = 0; + protected wpos = 0; - constructor(public cap = 1) {} + constructor(cap = 1) { + assert(cap >= 1, `capacity must be >= 1`); + this.buf = new Array(cap + 1); + } clear() { - this.buf.length = 0; + this.buf.fill(undefined); } readable() { - return this.buf.length > 0; + return this.rpos !== this.wpos; } read() { - return this.buf.shift(); + const { buf, rpos } = this; + const val = buf[rpos]!; + buf[rpos] = undefined; + this.rpos = (rpos + 1) % buf.length; + return val; } writable() { - return this.buf.length < this.cap; + return (this.wpos + 1) % this.buf.length !== this.rpos; } write(x: T) { - this.buf.push(x); + const { buf, wpos } = this; + buf[wpos] = x; + this.wpos = (wpos + 1) % buf.length; } } /** - * Returns a {@link FIFOBuffer} with given capacity for use with + * Returns a {@link FIFOBuffer} ring buffer with given capacity for use with * {@link channel}. * * @remarks @@ -211,9 +226,31 @@ export class FIFOBuffer implements IReadWriteBuffer { */ export const fifo = (cap: number) => new FIFOBuffer(cap); -export class LIFOBuffer extends FIFOBuffer { - read(): T | undefined { - return this.buf.pop(); +export class LIFOBuffer implements IReadWriteBuffer { + protected buf: T[] = []; + + constructor(protected cap = 1) { + assert(cap >= 1, `capacity must be >= 1`); + } + + clear() { + this.buf.length = 0; + } + + readable() { + return this.buf.length > 0; + } + + read() { + return this.buf.pop()!; + } + + writable() { + return this.buf.length < this.cap; + } + + write(x: T) { + this.buf.push(x); } } @@ -236,7 +273,11 @@ export class SlidingBuffer extends FIFOBuffer { } write(x: T) { - if (this.buf.length >= this.cap) this.buf.shift(); + if (!super.writable()) { + const { buf, rpos } = this; + buf[rpos] = undefined; + this.rpos = (rpos + 1) % buf.length; + } super.write(x); } } @@ -262,8 +303,7 @@ export class DroppingBuffer extends FIFOBuffer { } write(x: T) { - if (this.buf.length >= this.cap) return; - super.write(x); + if (super.writable()) super.write(x); } }