Skip to content

Commit

Permalink
feat(fibers): add CSP primitives
Browse files Browse the repository at this point in the history
- add fiber-based Channel class
- add various buffer implementations
  - fifo
  - lifo
  - sliding
  - dropping
  • Loading branch information
postspectacular committed Aug 9, 2023
1 parent 52836a8 commit d8fa8ce
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 1 deletion.
3 changes: 3 additions & 0 deletions packages/fibers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@
"./api": {
"default": "./api.js"
},
"./csp": {
"default": "./csp.js"
},
"./fiber": {
"default": "./fiber.js"
},
Expand Down
12 changes: 11 additions & 1 deletion packages/fibers/src/api.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Fn, Fn2, IIDGen } from "@thi.ng/api";
import type { Fn, Fn2, IClear, IIDGen } from "@thi.ng/api";
import type { ILogger } from "@thi.ng/logger";
import type { Fiber } from "./fiber.js";

Expand Down Expand Up @@ -89,3 +89,13 @@ export type FiberEventType =
| typeof EVENT_FIBER_CANCELED
| typeof EVENT_FIBER_ERROR
| "*";

export interface IReadBuffer<T> {
readable(): boolean;
read(): T | undefined;
}

export interface IReadWriteBuffer<T> extends IReadBuffer<T>, IClear {
writable(): boolean;
write(x: T): void;
}
283 changes: 283 additions & 0 deletions packages/fibers/src/csp.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
import { isNumber } from "@thi.ng/checks";
import type { FiberOpts, IReadWriteBuffer } from "./api.js";
import { Fiber, fiber } from "./fiber.js";

const STATE_OPEN = 0;
const STATE_CLOSING = 1;
const STATE_CLOSED = 2;

type CSPState = typeof STATE_OPEN | typeof STATE_CLOSING | typeof STATE_CLOSED;

/**
* Fiber-based CSP channel implementation, supporting any
* {@link IReadWriteBuffer} implementation to customize read/write behaviors
* (and ordering). By default uses a single value {@link fifo} buffer impl.
*/
export class Channel<T> {
protected buffer: IReadWriteBuffer<T>;
protected state: CSPState = STATE_OPEN;

constructor(
buffer: IReadWriteBuffer<T> | number = 1,
public opts?: Partial<FiberOpts>
) {
this.buffer = isNumber(buffer) ? new FIFOBuffer(buffer) : buffer;
}

/**
* Returns a new fiber which attempts to read a value from the channel and
* "blocks" until that value is available. Unless the channel has meanwhile
* been closed, the fiber returns the read value (otherwise: `undefined`).
*
* @remarks
* Depending on chosen back buffer behavior/implementation and
* {@link Channel.close}, read requests might still be successful whilst the
* channel is closing and there're still buffered values.
*
* @example
* ```ts
* const val = yield* chan.read();
* ```
*/
read() {
const $this = this;
return fiber<T | undefined>(function* (ctx: Fiber) {
while ($this.readable()) {
// wait until channel is readable
if ($this.buffer.readable()) {
const val = $this.buffer.read();
ctx.logger?.debug("read", val);
return val;
} else if ($this.state === STATE_CLOSING) {
return;
}
yield;
}
}, this.opts);
}

/**
* Returns a new fiber which attempts to write the given `value` to the
* channel and "blocks" until channel is writable (which depends on the
* channel's buffer implementation).
*
* @remarks
* Once the channel has been closed (or still is closing, see
* {@link Channel.close}), all write requests will be silently ignored.
*
* @example
* ```ts
* yield* chan.write(23);
* ```
*/
write(val: T) {
const $this = this;
return fiber(function* (ctx: Fiber) {
while ($this.writable()) {
// wait until channel is writable
if ($this.buffer.writable()) {
ctx.logger?.debug("write", val);
$this.buffer.write(val);
return;
}
yield;
}
}, this.opts);
}

/**
* Returns new fiber which closes the channel. By default this op will defer
* the full closing until all buffered values have been read (by another
* fiber), however any writes will already become unavailable/ignored even
* at this stage (also see {@link Channel.write}). If `wait=false`, the
* channel will be closed immediately, the backing buffered cleared and any
* in-flight reads or writes will be canceled.
*
* @param wait
*/
close(wait = true) {
const $this = this;
return fiber(function* (ctx: Fiber) {
if ($this.state >= STATE_CLOSING) return;
if (wait) {
ctx.logger?.debug("waiting to close...");
$this.state = STATE_CLOSING;
while ($this.buffer.readable()) yield;
}
$this.state = STATE_CLOSED;
$this.buffer.clear();
ctx.logger?.debug("channel closed");
}, this.opts);
}

/**
* Returns true if the channel is principally readable (i.e. not yet
* closed), however there might not be any values available yet and reads
* might block.
*/
readable() {
return this.state <= STATE_CLOSING;
}

/**
* Returns true if the channel is principally writable (i.e. not closing or
* closed), however depending on buffer behavior the channel might not yet
* accept new values and writes might block.
*/
writable() {
return this.state === STATE_OPEN;
}

/**
* Returns true if the channel is fully closed and no further reads or
* writes are possible.
*/
closed() {
return this.state === STATE_CLOSED;
}
}

/**
* Functional syntax sugar for {@link Channel} ctor, using provided backing
* buffer and shared options for all fibers returned by the channel for its
* various operations.
*
* @remarks
* If `buffer` is given as number, a {@link fifo} buffer of given capacity will
* be used.
*
* @example
* ```ts
* // create unbuffered channel with single value capacity
* const chan = channel();
*
* // create channel with a fixed buffer capacity of 3 values
* const chan2 = channel(3);
*
* // create channel with a sliding window buffer and custom ID & logger
* const chan3 = channel(
* sliding(3),
* { id: "main", logger: new ConsoleLogger("chan") }
* );
* ```
*
* @param buffer
* @param opts
*/
export const channel = <T>(
buffer?: IReadWriteBuffer<T> | number,
opts?: Partial<FiberOpts>
) => new Channel<T>(buffer, opts);

export class FIFOBuffer<T> implements IReadWriteBuffer<T> {
protected buf: T[] = [];

constructor(public cap = 1) {}

clear() {
this.buf.length = 0;
}

readable() {
return this.buf.length > 0;
}

read() {
return this.buf.shift();
}

writable() {
return this.buf.length < this.cap;
}

write(x: T) {
this.buf.push(x);
}
}

/**
* Returns a {@link FIFOBuffer} with given capacity for use with
* {@link channel}.
*
* @remarks
* With this implementation, writes to the channel will only start blocking once
* the buffer's capacity is reached, otherwise complete immediately. Likewise,
* channel reads are non-blocking whilst there're more buffered values
* available. Reads will only block if the buffer is empty.
*
* Also see {@link lifo}.
*
* @param cap
*/
export const fifo = <T>(cap: number) => new FIFOBuffer<T>(cap);

export class LIFOBuffer<T> extends FIFOBuffer<T> {
read(): T | undefined {
return this.buf.pop();
}
}

/**
* Returns a {@link LIFOBuffer} with given capacity for use with
* {@link channel}.
*
* @remarks
* Read/write behavior is mostly the same as with {@link fifo}, with the
* important difference, that (as the name indicates), the last value written
* will be the first value read (i.e. stack behavior).
*
* @param cap
*/
export const lifo = <T>(cap: number) => new LIFOBuffer<T>(cap);

export class SlidingBuffer<T> extends FIFOBuffer<T> {
writable() {
return true;
}

write(x: T) {
if (this.buf.length >= this.cap) this.buf.shift();
super.write(x);
}
}

/**
* Returns a {@link SlidingBuffer} with given capacity for use with
* {@link channel}.
*
* @remarks
* With this implementation, writes to the channel are **never** blocking! Once
* the buffer's capacity is reached, a new write will first expunge the oldest
* buffered value. Read behavior is the same as for {@link fifo}.
*
* Also see {@link dropping} for alternative behavior.
*
* @param cap
*/
export const sliding = <T>(cap: number) => new SlidingBuffer<T>(cap);

export class DroppingBuffer<T> extends FIFOBuffer<T> {
writable() {
return true;
}

write(x: T) {
if (this.buf.length >= this.cap) return;
super.write(x);
}
}

/**
* Returns a {@link DroppingBuffer} with given capacity for use with
* {@link channel}.
*
* @remarks
* With this implementation, writes to the channel are **never** blocking!
* Whilst the buffer's capacity is reached, new writes will be silently ignored.
* Read behavior is the same as for {@link fifo}.
*
* Also see {@link sliding} for alternative behavior.
*
* @param cap
*/
export const dropping = <T>(cap: number) => new DroppingBuffer<T>(cap);
1 change: 1 addition & 0 deletions packages/fibers/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from "./api.js";
export * from "./csp.js";
export * from "./fiber.js";
export * from "./ops.js";

0 comments on commit d8fa8ce

Please sign in to comment.