Skip to content

Commit

Permalink
perf(fibers): rewrite FIFOBuffer as ring buffer
Browse files Browse the repository at this point in the history
- use old impl as basis for LIFOBuffer only
- update other buffer types to use new ring buffer impl
- add min. capacity assertion in ctors
  • Loading branch information
postspectacular committed Aug 10, 2023
1 parent 146d7be commit ebac714
Showing 1 changed file with 54 additions and 14 deletions.
68 changes: 54 additions & 14 deletions packages/fibers/src/csp.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { isNumber } from "@thi.ng/checks";
import { assert } from "@thi.ng/errors/assert";
import type { FiberOpts, IReadWriteBuffer } from "./api.js";
import { Fiber, fiber } from "./fiber.js";

Expand Down Expand Up @@ -169,34 +170,48 @@ export const channel = <T>(
opts?: Partial<FiberOpts>
) => new Channel<T>(buffer, opts);

/**
* First-in, first-out ring buffer implementation for use with {@link Channel}.
*/
export class FIFOBuffer<T> implements IReadWriteBuffer<T> {
protected buf: T[] = [];
protected buf: (T | undefined)[];
protected rpos = 0;
protected wpos = 0;

constructor(public cap = 1) {}
constructor(cap = 1) {
assert(cap >= 1, `capacity must be >= 1`);
this.buf = new Array(cap + 1);
}

clear() {
this.buf.length = 0;
this.buf.fill(undefined);
}

readable() {
return this.buf.length > 0;
return this.rpos !== this.wpos;
}

read() {
return this.buf.shift();
const { buf, rpos } = this;
const val = buf[rpos]!;
buf[rpos] = undefined;
this.rpos = (rpos + 1) % buf.length;
return val;
}

writable() {
return this.buf.length < this.cap;
return (this.wpos + 1) % this.buf.length !== this.rpos;
}

write(x: T) {
this.buf.push(x);
const { buf, wpos } = this;
buf[wpos] = x;
this.wpos = (wpos + 1) % buf.length;
}
}

/**
* Returns a {@link FIFOBuffer} with given capacity for use with
* Returns a {@link FIFOBuffer} ring buffer with given capacity for use with
* {@link channel}.
*
* @remarks
Expand All @@ -211,9 +226,31 @@ export class FIFOBuffer<T> implements IReadWriteBuffer<T> {
*/
export const fifo = <T>(cap: number) => new FIFOBuffer<T>(cap);

export class LIFOBuffer<T> extends FIFOBuffer<T> {
read(): T | undefined {
return this.buf.pop();
export class LIFOBuffer<T> implements IReadWriteBuffer<T> {
protected buf: T[] = [];

constructor(protected cap = 1) {
assert(cap >= 1, `capacity must be >= 1`);
}

clear() {
this.buf.length = 0;
}

readable() {
return this.buf.length > 0;
}

read() {
return this.buf.pop()!;
}

writable() {
return this.buf.length < this.cap;
}

write(x: T) {
this.buf.push(x);
}
}

Expand All @@ -236,7 +273,11 @@ export class SlidingBuffer<T> extends FIFOBuffer<T> {
}

write(x: T) {
if (this.buf.length >= this.cap) this.buf.shift();
if (!super.writable()) {
const { buf, rpos } = this;
buf[rpos] = undefined;
this.rpos = (rpos + 1) % buf.length;
}
super.write(x);
}
}
Expand All @@ -262,8 +303,7 @@ export class DroppingBuffer<T> extends FIFOBuffer<T> {
}

write(x: T) {
if (this.buf.length >= this.cap) return;
super.write(x);
if (super.writable()) super.write(x);
}
}

Expand Down

0 comments on commit ebac714

Please sign in to comment.