diff --git a/_tools/check_docs.ts b/_tools/check_docs.ts index b3ad49f23019..1307a8817f86 100644 --- a/_tools/check_docs.ts +++ b/_tools/check_docs.ts @@ -40,6 +40,7 @@ const ENTRY_POINTS = [ "../jsonc/mod.ts", "../media_types/mod.ts", "../semver/mod.ts", + "../streams/mod.ts", "../text/mod.ts", "../ulid/mod.ts", "../webgpu/mod.ts", diff --git a/streams/buffer.ts b/streams/buffer.ts index 0a47c73b9b20..75d410ec9433 100644 --- a/streams/buffer.ts +++ b/streams/buffer.ts @@ -7,7 +7,21 @@ import { copy } from "@std/bytes/copy"; const MAX_SIZE = 2 ** 32 - 2; const DEFAULT_CHUNK_SIZE = 16_640; -/** A variable-sized buffer of bytes with `read()` and `write()` methods. +/** Options for {@linkcode Buffer.bytes}. */ +export interface BufferBytesOptions { + /** + * If true, {@linkcode Buffer.bytes} will return a copy of the buffered data. + * + * If false, it will return a slice to the buffer's data. + * + * @default {true} + */ + copy?: boolean; +} + +/** + * A variable-sized buffer of bytes with `readable` and `writable` getters that + * allows you to work with {@link https://developer.mozilla.org/en-US/docs/Web/API/Streams_API | Web Streams API}. * * Buffer is almost always used with some I/O like files and sockets. It allows * one to buffer up a download from a socket. Buffer grows and shrinks as @@ -20,7 +34,38 @@ const DEFAULT_CHUNK_SIZE = 16_640; * ArrayBuffer is a fixed memory allocation. Buffer is implemented on top of * ArrayBuffer. * - * Based on {@link https://golang.org/pkg/bytes/#Buffer | Go Buffer}. */ + * Based on {@link https://golang.org/pkg/bytes/#Buffer | Go Buffer}. + * + * @example Buffer input bytes and convert it to a string + * ```ts + * import { Buffer } from "@std/streams/buffer"; + * import { toText } from "@std/streams/to-text"; + * import { assert } from "@std/assert/assert"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * // Create a new buffer + * const buf = new Buffer(); + * assertEquals(buf.capacity, 0); + * assertEquals(buf.length, 0); + * + * // Dummy input stream + * const inputStream = ReadableStream.from([ + * "hello, ", + * "world", + * "!", + * ]); + * + * // Pipe the input stream to the buffer + * await inputStream.pipeThrough(new TextEncoderStream()).pipeTo(buf.writable); + * assert(buf.capacity > 0); + * assert(buf.length > 0); + * + * // Convert the buffered bytes to a string + * const result = await toText(buf.readable); + * assertEquals(result, "hello, world!"); + * assert(buf.empty()); + * ``` + */ export class Buffer { #buf: Uint8Array; // contents are the bytes buf[off : len(buf)] #off = 0; // read at buf[off], write at buf[buf.byteLength] @@ -42,7 +87,19 @@ export class Buffer { autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, }); - /** Getter returning the instance's {@linkcode ReadableStream}. */ + /** + * Getter returning the instance's {@linkcode ReadableStream}. + * + * @returns A `ReadableStream` of the buffer. + * + * @example Read the content out of the buffer to stdout + * ```ts + * import { Buffer } from "@std/streams/buffer"; + * + * const buf = new Buffer(); + * await buf.readable.pipeTo(Deno.stdout.writable); + * ``` + */ get readable(): ReadableStream { return this.#readable; } @@ -54,41 +111,203 @@ export class Buffer { }, }); - /** Getter returning the instance's {@linkcode WritableStream}. */ + /** + * Getter returning the instance's {@linkcode WritableStream}. + * + * @returns A `WritableStream` of the buffer. + * + * @example Write the data from stdin to the buffer + * ```ts + * import { Buffer } from "@std/streams/buffer"; + * + * const buf = new Buffer(); + * await Deno.stdin.readable.pipeTo(buf.writable); + * ``` + */ get writable(): WritableStream { return this.#writable; } - /** Constructs a new instance. */ + /** + * Constructs a new instance. + * + * @param ab An optional buffer to use as the initial buffer. + * + * @example No initial buffer provided + * ```ts + * import { Buffer } from "@std/streams/buffer"; + * + * const buf = new Buffer(); + * ``` + * + * @example With a pre-allocated buffer + * ```ts + * import { Buffer } from "@std/streams/buffer"; + * + * const arrayBuffer = new ArrayBuffer(8); + * const buf = new Buffer(arrayBuffer); + * ``` + * + * @example From Uint8Array + * ```ts + * import { Buffer } from "@std/streams/buffer"; + * + * const array = new Uint8Array([0, 1, 2]); + * const buf = new Buffer(array.buffer); + * ``` + */ constructor(ab?: ArrayBufferLike | ArrayLike) { this.#buf = ab === undefined ? new Uint8Array(0) : new Uint8Array(ab); } - /** Returns a slice holding the unread portion of the buffer. + /** + * Returns a slice holding the unread portion of the buffer. * * The slice is valid for use only until the next buffer modification (that - * is, only until the next call to a method like `read()`, `write()`, - * `reset()`, or `truncate()`). If `options.copy` is false the slice aliases - * the buffer content at least until the next buffer modification, so - * immediate changes to the slice will affect the result of future reads. + * is, only until the next call to a method that mutates or consumes the + * buffer, like reading data out via `readable`, `reset()`, or `truncate()`). + * + * If `options.copy` is false the slice aliases the buffer content at least + * until the next buffer modification, so immediate changes to the slice will + * affect the result of future reads. If `options` is not provided, + * `options.copy` defaults to `true`. + * + * @param options Options for the bytes method. + * @returns A copy or a slice of the buffer. + * + * @example Copy the buffer + * ```ts + * import { assertEquals } from "@std/assert/assert-equals"; + * import { assertNotEquals } from "@std/assert/assert-not-equals"; + * import { Buffer } from "@std/streams/buffer"; + * + * const array = new Uint8Array([0, 1, 2]); + * const buf = new Buffer(array.buffer); + * const copied = buf.bytes(); + * assertEquals(copied.length, array.length); + * + * // Modify an element in the original array + * array[1] = 99; + * assertEquals(copied[0], array[0]); + * // The copied buffer is not affected by the modification + * assertNotEquals(copied[1], array[1]); + * assertEquals(copied[2], array[2]); + * ``` + * + * @example Get a slice to the buffer + * ```ts + * import { assertEquals } from "@std/assert/assert-equals"; + * import { Buffer } from "@std/streams/buffer"; + * + * const array = new Uint8Array([0, 1, 2]); + * const buf = new Buffer(array.buffer); + * const slice = buf.bytes({ copy: false }); + * assertEquals(slice.length, array.length); + * + * // Modify an element in the original array + * array[1] = 99; + * assertEquals(slice[0], array[0]); + * // The slice _is_ affected by the modification + * assertEquals(slice[1], array[1]); + * assertEquals(slice[2], array[2]); + * ``` */ - bytes(options = { copy: true }): Uint8Array { + bytes(options: BufferBytesOptions = { copy: true }): Uint8Array { if (options.copy === false) return this.#buf.subarray(this.#off); return this.#buf.slice(this.#off); } - /** Returns whether the unread portion of the buffer is empty. */ + /** + * Returns whether the unread portion of the buffer is empty. + * + * @returns Whether the buffer is empty. + * + * @example Empty buffer + * ```ts + * import { assert } from "@std/assert/assert"; + * import { Buffer } from "@std/streams/buffer"; + * + * const buf = new Buffer(); + * assert(buf.empty()); + * ``` + * + * @example Non-empty buffer + * ```ts + * import { assert } from "@std/assert/assert"; + * import { Buffer } from "@std/streams/buffer"; + * + * const array = new Uint8Array([42]); + * const buf = new Buffer(array.buffer); + * assert(!buf.empty()); + * ``` + * + * @example Non-empty, but the content was already read + * ```ts + * import { assert } from "@std/assert/assert"; + * import { Buffer } from "@std/streams/buffer"; + * + * const array = new Uint8Array([42]); + * const buf = new Buffer(array.buffer); + * assert(!buf.empty()); + * // Read the content out of the buffer + * await buf.readable.pipeTo(Deno.stdout.writable); + * // The buffer is now empty + * assert(buf.empty()); + * ``` + */ empty(): boolean { return this.#buf.byteLength <= this.#off; } - /** A read only number of bytes of the unread portion of the buffer. */ + /** + * A read only number of bytes of the unread portion of the buffer. + * + * @returns The number of bytes in the unread portion of the buffer. + * + * @example Basic usage + * ```ts + * import { assertEquals } from "@std/assert/assert-equals"; + * import { Buffer } from "@std/streams/buffer"; + * + * const array = new Uint8Array([0, 1, 2]); + * const buf = new Buffer(array.buffer); + * assertEquals(buf.length, 3); + * ``` + * + * @example Length becomes 0 after the content is read + * ```ts + * import { assertEquals } from "@std/assert/assert-equals"; + * import { Buffer } from "@std/streams/buffer"; + * + * const array = new Uint8Array([42]); + * const buf = new Buffer(array.buffer); + * assertEquals(buf.length, 1); + * // Read the content out of the buffer + * await buf.readable.pipeTo(Deno.stdout.writable); + * // The length is now 0 + * assertEquals(buf.length, 0); + * ``` + */ get length(): number { return this.#buf.byteLength - this.#off; } - /** The read only capacity of the buffer's underlying byte slice, that is, - * the total space allocated for the buffer's data. */ + /** + * The read only capacity of the buffer's underlying byte slice, that is, + * the total space allocated for the buffer's data. + * + * @returns The number of allocated bytes for the buffer. + * + * @example Basic usage + * ```ts + * import { assertEquals } from "@std/assert/assert-equals"; + * import { Buffer } from "@std/streams/buffer"; + * + * const arrayBuffer = new ArrayBuffer(256); + * const buf = new Buffer(arrayBuffer); + * assertEquals(buf.capacity, 256); + * ``` + */ get capacity(): number { return this.#buf.buffer.byteLength; } @@ -97,6 +316,22 @@ export class Buffer { * Discards all but the first `n` unread bytes from the buffer but * continues to use the same allocated storage. It throws if `n` is * negative or greater than the length of the buffer. + * + * @param n The number of bytes to keep. + * + * @example Basic usage + * ```ts + * import { assertEquals } from "@std/assert/assert-equals"; + * import { Buffer } from "@std/streams/buffer"; + * + * const array = new Uint8Array([0, 1, 2]); + * const buf = new Buffer(array.buffer); + * assertEquals(buf.bytes(), array); + * + * // Discard all but the first 2 bytes + * buf.truncate(2); + * assertEquals(buf.bytes(), array.slice(0, 2)); + * ``` */ truncate(n: number): void { if (n === 0) { @@ -109,7 +344,23 @@ export class Buffer { this.#reslice(this.#off + n); } - /** Resets to an empty buffer. */ + /** + * Resets to an empty buffer. + * + * @example Basic usage + * ```ts + * import { assert } from "@std/assert/assert"; + * import { Buffer } from "@std/streams/buffer"; + * + * const array = new Uint8Array([0, 1, 2]); + * const buf = new Buffer(array.buffer); + * assert(!buf.empty()); + * + * // Reset + * buf.reset(); + * assert(buf.empty()); + * ``` + */ reset() { this.#reslice(0); this.#off = 0; @@ -161,13 +412,30 @@ export class Buffer { return m; } - /** Grows the buffer's capacity, if necessary, to guarantee space for + /** + * Grows the buffer's capacity, if necessary, to guarantee space for * another `n` bytes. After `.grow(n)`, at least `n` bytes can be written to * the buffer without another allocation. If `n` is negative, `.grow()` will * throw. If the buffer can't grow it will throw an error. * + * @param n The number of bytes to grow the buffer by. + * * Based on Go Lang's - * {@link https://golang.org/pkg/bytes/#Buffer.Grow | Buffer.Grow}. */ + * {@link https://golang.org/pkg/bytes/#Buffer.Grow | Buffer.Grow}. + * + * @example Basic usage + * ```ts + * import { assert } from "@std/assert/assert"; + * import { assertEquals } from "@std/assert/assert-equals"; + * import { Buffer } from "@std/streams/buffer"; + * + * const buf = new Buffer(); + * assertEquals(buf.capacity, 0); + * + * buf.grow(200); + * assert(buf.capacity >= 200); + * ``` + */ grow(n: number) { if (n < 0) { throw Error("Buffer.grow: negative count"); diff --git a/streams/byte_slice_stream.ts b/streams/byte_slice_stream.ts index 52e46290dda9..c3af61beeb25 100644 --- a/streams/byte_slice_stream.ts +++ b/streams/byte_slice_stream.ts @@ -7,20 +7,59 @@ import { assert } from "@std/assert/assert"; * A transform stream that only transforms from the zero-indexed `start` and * `end` bytes (both inclusive). * - * @example + * @example Basic usage * ```ts * import { ByteSliceStream } from "@std/streams/byte-slice-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream = ReadableStream.from([ + * new Uint8Array([0, 1]), + * new Uint8Array([2, 3, 4]), + * ]); + * const slicedStream = stream.pipeThrough(new ByteSliceStream(1, 3)); + * + * assertEquals( + * await Array.fromAsync(slicedStream), + * [new Uint8Array([1]), new Uint8Array([2, 3])] + * ); + * ``` + * + * @example Get a range of bytes from a fetch response body + * ```ts + * import { ByteSliceStream } from "@std/streams/byte-slice-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; * * const response = await fetch("https://example.com"); * const rangedStream = response.body! * .pipeThrough(new ByteSliceStream(3, 8)); + * const collected = await Array.fromAsync(rangedStream); + * assertEquals(collected[0]?.length, 6); * ``` */ export class ByteSliceStream extends TransformStream { #offsetStart = 0; #offsetEnd = 0; - /** Constructs a new instance. */ + /** + * Constructs a new instance. + * + * @param start The zero-indexed byte index to start reading from. + * @param end The zero-indexed byte index to stop reading at. Inclusive. + * + * @example No parameters + * ```ts + * import { ByteSliceStream } from "@std/streams/byte-slice-stream"; + * + * const byteSliceStream = new ByteSliceStream(); + * ``` + * + * @example start = 4, end = 11 + * ```ts + * import { ByteSliceStream } from "@std/streams/byte-slice-stream"; + * + * const byteSliceStream = new ByteSliceStream(4, 11); + * ``` + */ constructor(start = 0, end: number = Infinity) { super({ start: () => { diff --git a/streams/concat_readable_streams.ts b/streams/concat_readable_streams.ts index a49d4befea3d..25d227c4f3e2 100644 --- a/streams/concat_readable_streams.ts +++ b/streams/concat_readable_streams.ts @@ -6,9 +6,9 @@ * * Cancelling the resulting stream will cancel all the input streams. * - * @typeParam T Type of the chunks in the streams. - * - * @param streams An iterable of `ReadableStream`s. + * @typeParam T The type of the chunks in the streams. + * @param streams An iterable of `ReadableStream`s to concat. + * @returns A `ReadableStream` that will emit the concatenated chunks. * * @example Usage * ```ts diff --git a/streams/delimiter_stream.ts b/streams/delimiter_stream.ts index 96aa9fcd8b51..1f3c5afa6d08 100644 --- a/streams/delimiter_stream.ts +++ b/streams/delimiter_stream.ts @@ -16,36 +16,50 @@ export type DelimiterDisposition = /** Options for {@linkcode DelimiterStream}. */ export interface DelimiterStreamOptions { - /** Disposition of the delimiter. */ + /** + * Disposition of the delimiter. + * + * @default {"discard"} + */ disposition?: DelimiterDisposition; } /** * Divide a stream into chunks delimited by a given byte sequence. * + * If you are working with a stream of `string`, consider using {@linkcode TextDelimiterStream}. + * * @example * Divide a CSV stream by commas, discarding the commas: * ```ts * import { DelimiterStream } from "@std/streams/delimiter-stream"; - * const res = await fetch("https://example.com/data.csv"); - * const parts = res.body! + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const inputStream = ReadableStream.from(["foo,bar", ",baz"]); + * + * const transformed = inputStream.pipeThrough(new TextEncoderStream()) * .pipeThrough(new DelimiterStream(new TextEncoder().encode(","))) * .pipeThrough(new TextDecoderStream()); + * + * assertEquals(await Array.fromAsync(transformed), ["foo", "bar", "baz"]); * ``` * * @example - * Divide a stream after semi-colons, keeping the semi-colons in the output: + * Divide a stream after semi-colons, keeping the semicolons in the output: * ```ts * import { DelimiterStream } from "@std/streams/delimiter-stream"; - * const res = await fetch("https://example.com/file.js"); - * const parts = res.body! + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const inputStream = ReadableStream.from(["foo;", "bar;baz", ";"]); + * + * const transformed = inputStream.pipeThrough(new TextEncoderStream()) * .pipeThrough( - * new DelimiterStream( - * new TextEncoder().encode(";"), - * { disposition: "suffix" }, - * ) - * ) - * .pipeThrough(new TextDecoderStream()); + * new DelimiterStream(new TextEncoder().encode(";"), { + * disposition: "suffix", + * }), + * ).pipeThrough(new TextDecoderStream()); + * + * assertEquals(await Array.fromAsync(transformed), ["foo;", "bar;", "baz;"]); * ``` */ export class DelimiterStream extends TransformStream { @@ -55,10 +69,31 @@ export class DelimiterStream extends TransformStream { #delimLPS: Uint8Array | null; #disp: DelimiterDisposition; - /** Constructs a new instance. */ + /** + * Constructs a new instance. + * + * @param delimiter A delimiter to split the stream by. + * @param options Options for the delimiter stream. + * + * @example comma as a delimiter + * ```ts + * import { DelimiterStream } from "@std/streams/delimiter-stream"; + * + * const delimiterStream = new DelimiterStream(new TextEncoder().encode(",")); + * ``` + * + * @example semicolon as a delimiter, and disposition set to `"suffix"` + * ```ts + * import { DelimiterStream } from "@std/streams/delimiter-stream"; + * + * const delimiterStream = new DelimiterStream(new TextEncoder().encode(";"), { + * disposition: "suffix", + * }); + * ``` + */ constructor( delimiter: Uint8Array, - options?: DelimiterStreamOptions, + options: DelimiterStreamOptions = { disposition: "discard" }, ) { super({ transform: (chunk, controller) => @@ -70,7 +105,7 @@ export class DelimiterStream extends TransformStream { this.#delimiter = delimiter; this.#delimLPS = delimiter.length > 1 ? createLPS(delimiter) : null; - this.#disp = options?.disposition ?? "discard"; + this.#disp = options.disposition ?? "discard"; } #handle( diff --git a/streams/early_zip_readable_streams.ts b/streams/early_zip_readable_streams.ts index 32331db13b16..800392dabc6a 100644 --- a/streams/early_zip_readable_streams.ts +++ b/streams/early_zip_readable_streams.ts @@ -2,19 +2,83 @@ // This module is browser compatible. /** - * Merge multiple streams into a single one, taking order into account, and each stream - * will wait for a chunk to enqueue before the next stream can append another chunk. - * If a stream ends before other ones, the others will be cancelled. + * Merge multiple streams into a single one, taking order into account, and each + * stream will wait for a chunk to enqueue before the next stream can append + * another chunk. * - * @example + * If a stream ends before other ones, the others will be cancelled after the + * last chunk of said stream is read. See the examples below for more + * comprehensible information. If you want to continue reading the other streams + * even after one of them ends, use {@linkcode zipReadableStreams}. + * + * @typeparam T The type of the chunks in the input streams. + * @returns A `ReadableStream` that will emit the zipped chunks + * + * @example Zip 2 streams with the same length * ```ts * import { earlyZipReadableStreams } from "@std/streams/early-zip-readable-streams"; + * import { assertEquals } from "@std/assert/assert-equals"; * * const stream1 = ReadableStream.from(["1", "2", "3"]); * const stream2 = ReadableStream.from(["a", "b", "c"]); * const zippedStream = earlyZipReadableStreams(stream1, stream2); * - * await Array.fromAsync(zippedStream); // ["1", "a", "2", "b", "3", "c"]; + * assertEquals( + * await Array.fromAsync(zippedStream), + * ["1", "a", "2", "b", "3", "c"], + * ); + * ``` + * + * @example Zip 2 streams with different length (first one is shorter) + * ```ts + * import { earlyZipReadableStreams } from "@std/streams/early-zip-readable-streams"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream1 = ReadableStream.from(["1", "2"]); + * const stream2 = ReadableStream.from(["a", "b", "c", "d"]); + * const zippedStream = earlyZipReadableStreams(stream1, stream2); + * + * // The first stream ends before the second one. When the first stream ends, + * // the second one is cancelled and no more data is read or added to the + * // zipped stream. + * assertEquals( + * await Array.fromAsync(zippedStream), + * ["1", "a", "2", "b"], + * ); + * ``` + * + * @example Zip 2 streams with different length (first one is longer) + * ```ts + * import { earlyZipReadableStreams } from "@std/streams/early-zip-readable-streams"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream1 = ReadableStream.from(["1", "2", "3", "4"]); + * const stream2 = ReadableStream.from(["a", "b"]); + * const zippedStream = earlyZipReadableStreams(stream1, stream2); + * + * // The second stream ends before the first one. When the second stream ends, + * // the first one is cancelled, but the chunk of "3" is already read so it + * // is added to the zipped stream. + * assertEquals( + * await Array.fromAsync(zippedStream), + * ["1", "a", "2", "b", "3"], + * ); + * ``` + * + * @example Zip 3 streams + * ```ts + * import { earlyZipReadableStreams } from "@std/streams/early-zip-readable-streams"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream1 = ReadableStream.from(["1"]); + * const stream2 = ReadableStream.from(["a", "b"]); + * const stream3 = ReadableStream.from(["A", "B", "C"]); + * const zippedStream = earlyZipReadableStreams(stream1, stream2, stream3); + * + * assertEquals( + * await Array.fromAsync(zippedStream), + * ["1", "a", "A"], + * ); * ``` */ export function earlyZipReadableStreams( diff --git a/streams/early_zip_readable_streams_test.ts b/streams/early_zip_readable_streams_test.ts index 914e7ca2fcab..b6d4529ffe9e 100644 --- a/streams/early_zip_readable_streams_test.ts +++ b/streams/early_zip_readable_streams_test.ts @@ -39,3 +39,24 @@ Deno.test("earlyZipReadableStreams() handles long first", async () => { "d", ]); }); + +Deno.test("earlyZipReadableStreams() can zip three streams", async () => { + const textStream = ReadableStream.from(["a", "b", "c", "d", "e"]); + const textStream2 = ReadableStream.from(["1", "2", "3"]); + const textStream3 = ReadableStream.from(["x", "y"]); + + const buf = await Array.fromAsync( + earlyZipReadableStreams(textStream, textStream2, textStream3), + ); + + assertEquals(buf, [ + "a", + "1", + "x", + "b", + "2", + "y", + "c", + "3", + ]); +}); diff --git a/streams/iterate_reader.ts b/streams/iterate_reader.ts index d1db22eb4572..cd51bd617e1c 100644 --- a/streams/iterate_reader.ts +++ b/streams/iterate_reader.ts @@ -10,26 +10,27 @@ import type { Reader, ReaderSync } from "@std/io/types"; export type { Reader, ReaderSync }; /** - * Turns a {@linkcode Reader}, `r`, into an async iterator. + * Turns a {@linkcode https://jsr.io/@std/io/doc/types/~/Reader | Reader}, `r`, into an async iterator. * - * @example + * @param r A reader to turn into an async iterator. + * @param options Options for the iterateReader function. + * @returns An async iterator that yields Uint8Array. + * + * @example Convert a `Deno.FsFile` into an async iterator and iterate over it * ```ts * import { iterateReader } from "@std/streams/iterate-reader"; * - * using f = await Deno.open("/etc/passwd"); + * using f = await Deno.open("./README.md"); * for await (const chunk of iterateReader(f)) { * console.log(chunk); * } * ``` * - * Second argument can be used to tune size of a buffer. - * Default size of the buffer is 32kB. - * - * @example + * @example Specify a buffer size of 1MiB * ```ts * import { iterateReader } from "@std/streams/iterate-reader"; * - * using f = await Deno.open("/etc/passwd"); + * using f = await Deno.open("./README.md"); * const it = iterateReader(f, { * bufSize: 1024 * 1024 * }); @@ -39,7 +40,7 @@ export type { Reader, ReaderSync }; * ``` * * @deprecated This will be removed in 1.0.0. Import from - * {@linkhttps://jsr.io/@std/io | @std/io} instead. + * {@link https://jsr.io/@std/io | @std/io} instead. */ export function iterateReader( r: Reader, @@ -51,24 +52,27 @@ export function iterateReader( } /** - * Turns a {@linkcode ReaderSync}, `r`, into an iterator. + * Turns a {@linkcode https://jsr.io/@std/io/doc/types/~/ReaderSync | ReaderSync}, `r`, into an iterator. + * + * @param r A reader to turn into an iterator. + * @param options Options for the iterateReaderSync function. + * @returns An iterator that yields Uint8Array. * + * @example Convert a `Deno.FsFile` into an iterator and iterate over it * ```ts * import { iterateReaderSync } from "@std/streams/iterate-reader"; * - * using f = Deno.openSync("/etc/passwd"); + * using f = Deno.openSync("./README.md"); * for (const chunk of iterateReaderSync(f)) { * console.log(chunk); * } * ``` * - * Second argument can be used to tune size of a buffer. - * Default size of the buffer is 32kB. - * + * @example Specify a buffer size of 1MiB * ```ts * import { iterateReaderSync } from "@std/streams/iterate-reader"; - - * using f = await Deno.open("/etc/passwd"); + * + * using f = await Deno.open("./README.md"); * const iter = iterateReaderSync(f, { * bufSize: 1024 * 1024 * }); diff --git a/streams/limited_bytes_transform_stream.ts b/streams/limited_bytes_transform_stream.ts index 96048f61be63..a8f6823ec534 100644 --- a/streams/limited_bytes_transform_stream.ts +++ b/streams/limited_bytes_transform_stream.ts @@ -1,29 +1,121 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. // This module is browser compatible. +/** Options for {@linkcode LimitedBytesTransformStream}. */ +export interface LimitedBytesTransformStreamOptions { + /** + * If true, a {@linkcode RangeError} is thrown when queueing the current chunk + * would exceed the specified size limit. + * + * @default {false} + */ + error?: boolean; +} + /** - * A {@linkcode TransformStream} that will only read & enqueue `size` amount of - * bytes. This operation is chunk based and not BYOB based, and as such will - * read more than needed. + * A {@linkcode TransformStream} that will only read & enqueue chunks until the + * total amount of enqueued data exceeds `size`. The last chunk that would + * exceed the limit will NOT be enqueued, in which case a {@linkcode RangeError} + * is thrown when `options.error` is set to true, otherwise the stream is just + * terminated. + * + * @example `size` is equal to the total byte length of the chunks + * ```ts + * import { LimitedBytesTransformStream } from "@std/streams/limited-bytes-transform-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream = ReadableStream.from(["1234", "5678"]); + * const transformed = stream.pipeThrough(new TextEncoderStream()).pipeThrough( + * new LimitedBytesTransformStream(8), + * ).pipeThrough(new TextDecoderStream()); + * + * assertEquals( + * await Array.fromAsync(transformed), + * ["1234", "5678"], + * ); + * ``` + * + * @example `size` is less than the total byte length of the chunks, and at the + * boundary of the chunks + * ```ts + * import { LimitedBytesTransformStream } from "@std/streams/limited-bytes-transform-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream = ReadableStream.from(["1234", "5678"]); + * const transformed = stream.pipeThrough(new TextEncoderStream()).pipeThrough( + * // `4` is the boundary of the chunks + * new LimitedBytesTransformStream(4), + * ).pipeThrough(new TextDecoderStream()); + * + * assertEquals( + * await Array.fromAsync(transformed), + * // The first chunk was read, but the second chunk was not + * ["1234"], + * ); + * ``` + * + * @example `size` is less than the total byte length of the chunks, and not at + * the boundary of the chunks + * ```ts + * import { LimitedBytesTransformStream } from "@std/streams/limited-bytes-transform-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; * - * If `options.error` is set, then instead of terminating the stream, - * an error will be thrown. + * const stream = ReadableStream.from(["1234", "5678"]); + * const transformed = stream.pipeThrough(new TextEncoderStream()).pipeThrough( + * // `5` is not the boundary of the chunks + * new LimitedBytesTransformStream(5), + * ).pipeThrough(new TextDecoderStream()); * - * @example + * assertEquals( + * await Array.fromAsync(transformed), + * // The second chunk was not read because it would exceed the specified size + * ["1234"], + * ); + * ``` + * + * @example error: true * ```ts * import { LimitedBytesTransformStream } from "@std/streams/limited-bytes-transform-stream"; + * import { assertRejects } from "@std/assert/assert-rejects"; + * + * const stream = ReadableStream.from(["1234", "5678"]); + * const transformed = stream.pipeThrough(new TextEncoderStream()).pipeThrough( + * new LimitedBytesTransformStream(5, { error: true }), + * ).pipeThrough(new TextDecoderStream()); * - * const res = await fetch("https://example.com"); - * const parts = res.body! - * .pipeThrough(new LimitedBytesTransformStream(512 * 1024)); + * await assertRejects(async () => { + * await Array.fromAsync(transformed); + * }, RangeError); * ``` */ export class LimitedBytesTransformStream extends TransformStream { #read = 0; - /** Constructs a new instance. */ - constructor(size: number, options: { error?: boolean } = {}) { + /** + * Constructs a new instance. + * + * @param size A size limit in bytes. + * @param options Options for the stream. + * + * @example size = 42 + * ```ts + * import { LimitedBytesTransformStream } from "@std/streams/limited-bytes-transform-stream"; + * + * const limitedBytesTransformStream = new LimitedBytesTransformStream(42); + * ``` + * + * @example size = 42, error = true + * ```ts + * import { LimitedBytesTransformStream } from "@std/streams/limited-bytes-transform-stream"; + * + * const limitedBytesTransformStream = new LimitedBytesTransformStream(42, { error: true }); + * ``` + */ + constructor( + size: number, + options: LimitedBytesTransformStreamOptions = { error: false }, + ) { super({ transform: (chunk, controller) => { if ((this.#read + chunk.byteLength) > size) { diff --git a/streams/limited_bytes_transform_stream_test.ts b/streams/limited_bytes_transform_stream_test.ts index 74042288c0a9..339a42e29ec9 100644 --- a/streams/limited_bytes_transform_stream_test.ts +++ b/streams/limited_bytes_transform_stream_test.ts @@ -3,7 +3,24 @@ import { assertEquals, assertRejects } from "@std/assert"; import { LimitedBytesTransformStream } from "./limited_bytes_transform_stream.ts"; -Deno.test("LimitedBytesTransformStream", async function () { +Deno.test("LimitedBytesTransformStream - specified size is the boundary of the chunks", async function () { + const r = ReadableStream.from([ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]).pipeThrough(new LimitedBytesTransformStream(6)); + + const chunks = await Array.fromAsync(r); + assertEquals(chunks, [ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + ]); +}); + +Deno.test("LimitedBytesTransformStream - specified size is not the boundary of the chunks", async function () { const r = ReadableStream.from([ new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6]), @@ -14,10 +31,95 @@ Deno.test("LimitedBytesTransformStream", async function () { ]).pipeThrough(new LimitedBytesTransformStream(7)); const chunks = await Array.fromAsync(r); - assertEquals(chunks.length, 2); + assertEquals(chunks, [ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + ]); +}); + +Deno.test("LimitedBytesTransformStream - specified size is 0", async function () { + const r = ReadableStream.from([ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]).pipeThrough(new LimitedBytesTransformStream(0)); + + const chunks = await Array.fromAsync(r); + assertEquals(chunks.length, 0); +}); + +Deno.test("LimitedBytesTransformStream - specified size is equal to the total size of the chunks", async function () { + const r = ReadableStream.from([ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]).pipeThrough(new LimitedBytesTransformStream(18)); + + const chunks = await Array.fromAsync(r); + assertEquals(chunks, [ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]); +}); + +Deno.test("LimitedBytesTransformStream - specified size is greater than the total size of the chunks", async function () { + const r = ReadableStream.from([ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]).pipeThrough(new LimitedBytesTransformStream(19)); + + const chunks = await Array.fromAsync(r); + assertEquals(chunks, [ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]); }); -Deno.test("LimitedBytesTransformStream handles error", async function () { +Deno.test("LimitedBytesTransformStream - error is set to true and the specified size is 0", async function () { + const r = ReadableStream.from([ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]).pipeThrough(new LimitedBytesTransformStream(0, { error: true })); + + await assertRejects(async () => await Array.fromAsync(r), RangeError); +}); + +Deno.test("LimitedBytesTransformStream - error is set to true and the specified size is the boundary of the chunks", async function () { + const r = ReadableStream.from([ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]).pipeThrough(new LimitedBytesTransformStream(6, { error: true })); + + await assertRejects(async () => await Array.fromAsync(r), RangeError); +}); + +Deno.test("LimitedBytesTransformStream - error is set to true and the specified size is not the boundary of the chunks", async function () { const r = ReadableStream.from([ new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6]), @@ -29,3 +131,45 @@ Deno.test("LimitedBytesTransformStream handles error", async function () { await assertRejects(async () => await Array.fromAsync(r), RangeError); }); + +Deno.test("LimitedBytesTransformStream - error is set to true and specified size is equal to the total size of the chunks", async function () { + const r = ReadableStream.from([ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]).pipeThrough(new LimitedBytesTransformStream(18, { error: true })); + + const chunks = await Array.fromAsync(r); + assertEquals(chunks, [ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]); +}); + +Deno.test("LimitedBytesTransformStream - error is set to true and specified size is greater than the total size of the chunks", async function () { + const r = ReadableStream.from([ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]).pipeThrough(new LimitedBytesTransformStream(19, { error: true })); + + const chunks = await Array.fromAsync(r); + assertEquals(chunks, [ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]); +}); diff --git a/streams/limited_transform_stream.ts b/streams/limited_transform_stream.ts index ce07f52c5f1e..3a337c94a8ae 100644 --- a/streams/limited_transform_stream.ts +++ b/streams/limited_transform_stream.ts @@ -1,25 +1,103 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. // This module is browser compatible. +/** Options for {@linkcode LimitedTransformStream} */ +export interface LimitedTransformStreamOptions { + /** + * If true, a {@linkcode RangeError} is thrown when the total number of + * enqueued chunks is about to exceed the specified limit. + * + * @default {false} + */ + error?: boolean; +} + /** * A {@linkcode TransformStream} that will only read & enqueue `size` amount of * chunks. * * If `options.error` is set, then instead of terminating the stream, - * an error will be thrown. + * a {@linkcode RangeError} will be thrown when the total number of enqueued + * chunks is about to exceed the specified size. + * + * @typeparam T The type the chunks in the stream. + * + * @example `size` is equal to the total number of chunks + * ```ts + * import { LimitedTransformStream } from "@std/streams/limited-transform-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream = ReadableStream.from(["1234", "5678"]); + * const transformed = stream.pipeThrough( + * new LimitedTransformStream(2), + * ); + * + * // All chunks were read + * assertEquals( + * await Array.fromAsync(transformed), + * ["1234", "5678"], + * ); + * ``` * - * @example + * @example `size` is less than the total number of chunks * ```ts * import { LimitedTransformStream } from "@std/streams/limited-transform-stream"; - * const res = await fetch("https://example.com"); - * const parts = res.body!.pipeThrough(new LimitedTransformStream(50)); + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream = ReadableStream.from(["1234", "5678"]); + * const transformed = stream.pipeThrough( + * new LimitedTransformStream(1), + * ); + * + * // Only the first chunk was read + * assertEquals( + * await Array.fromAsync(transformed), + * ["1234"], + * ); + * ``` + * + * @example error: true + * ```ts + * import { LimitedTransformStream } from "@std/streams/limited-transform-stream"; + * import { assertRejects } from "@std/assert/assert-rejects"; + * + * const stream = ReadableStream.from(["1234", "5678"]); + * const transformed = stream.pipeThrough( + * new LimitedTransformStream(1, { error: true }), + * ); + * + * await assertRejects(async () => { + * await Array.fromAsync(transformed); + * }, RangeError); * ``` */ export class LimitedTransformStream extends TransformStream { #read = 0; - /** Constructs a new instance. */ - constructor(size: number, options: { error?: boolean } = {}) { + /** + * Constructs a new instance. + * + * @param size The maximum number of chunks to read. + * @param options Options for the stream. + * + * @example size = 42 + * ```ts + * import { LimitedTransformStream } from "@std/streams/limited-transform-stream"; + * + * const limitedTransformStream = new LimitedTransformStream(42); + * ``` + * + * @example size = 42, error = true + * ```ts + * import { LimitedTransformStream } from "@std/streams/limited-transform-stream"; + * + * const limitedTransformStream = new LimitedTransformStream(42, { error: true }); + * ``` + */ + constructor( + size: number, + options: LimitedTransformStreamOptions = { error: false }, + ) { super({ transform: (chunk, controller) => { if ((this.#read + 1) > size) { diff --git a/streams/limited_transform_stream_test.ts b/streams/limited_transform_stream_test.ts index da4975baa329..b37c780a36c7 100644 --- a/streams/limited_transform_stream_test.ts +++ b/streams/limited_transform_stream_test.ts @@ -14,7 +14,11 @@ Deno.test("LimitedTransformStream", async function () { ]).pipeThrough(new LimitedTransformStream(3)); const chunks = await Array.fromAsync(r); - assertEquals(chunks.length, 3); + assertEquals(chunks, [ + "foo", + "foo", + "foo", + ]); }); Deno.test("LimitedTransformStream handles error", async function () { diff --git a/streams/merge_readable_streams.ts b/streams/merge_readable_streams.ts index 3c2be530875e..b1ed79c52e78 100644 --- a/streams/merge_readable_streams.ts +++ b/streams/merge_readable_streams.ts @@ -5,15 +5,35 @@ * If a stream ends before other ones, the other will continue adding data, * and the finished one will not add any more data. * - * @example + * @typeparam T The type of the chunks in the input/output streams. + * @param streams An iterable of `ReadableStream`s to merge. + * @returns A `ReadableStream` that will emit the merged chunks. + * + * @example Merge 2 streams + * ```ts + * import { mergeReadableStreams } from "@std/streams/merge-readable-streams"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream1 = ReadableStream.from([1, 2]); + * const stream2 = ReadableStream.from([3, 4, 5]); + * + * const mergedStream = mergeReadableStreams(stream1, stream2); + * const merged = await Array.fromAsync(mergedStream); + * assertEquals(merged.toSorted(), [1, 2, 3, 4, 5]); + * ``` + * + * @example Merge 3 streams * ```ts * import { mergeReadableStreams } from "@std/streams/merge-readable-streams"; + * import { assertEquals } from "@std/assert/assert-equals"; * - * const stream1 = ReadableStream.from(["1", "2", "3"]); - * const stream2 = ReadableStream.from(["a", "b", "c"]); + * const stream1 = ReadableStream.from([1, 2]); + * const stream2 = ReadableStream.from([3, 4, 5]); + * const stream3 = ReadableStream.from([6]); * - * // ["2", "c", "a", "b", "3", "1"] - * await Array.fromAsync(mergeReadableStreams(stream1, stream2)); + * const mergedStream = mergeReadableStreams(stream1, stream2, stream3); + * const merged = await Array.fromAsync(mergedStream); + * assertEquals(merged.toSorted(), [1, 2, 3, 4, 5, 6]); * ``` */ export function mergeReadableStreams( diff --git a/streams/readable_stream_from_reader.ts b/streams/readable_stream_from_reader.ts index 212758e5d3d7..581fe67235ce 100644 --- a/streams/readable_stream_from_reader.ts +++ b/streams/readable_stream_from_reader.ts @@ -8,7 +8,7 @@ export type { Closer }; /** * Options for {@linkcode readableStreamFromReader}. * - * @deprecated This will be removed in 1.0.0. Use {@linkcode toReadableStream} instead. + * @deprecated This will be removed in 1.0.0. Use {@linkcode https://jsr.io/@std/io/doc/~/toReadableStream | toReadableStream} instead. */ export interface ReadableStreamFromReaderOptions { /** If the `reader` is also a `Closer`, automatically close the `reader` @@ -28,22 +28,25 @@ export interface ReadableStreamFromReaderOptions { /** * Create a {@linkcode ReadableStream} of {@linkcode Uint8Array}s from a - * {@linkcode Reader}. + * {@linkcode https://jsr.io/@std/io/doc/types/~/Reader | Reader}. * * When the pull algorithm is called on the stream, a chunk from the reader * will be read. When `null` is returned from the reader, the stream will be - * closed along with the reader (if it is also a `Closer`). + * closed along with the reader (if it is also a {@linkcode https://jsr.io/@std/io/doc/types/~/Closer | Closer}). * - * An example converting a `Deno.FsFile` into a readable stream: + * @param reader A reader to convert into a `ReadableStream`. + * @param options Options for the `readableStreamFromReader` function. + * @returns A `ReadableStream` of `Uint8Array`s. * + * @example Convert a `Deno.FsFile` into a readable stream: * ```ts * import { readableStreamFromReader } from "@std/streams/readable-stream-from-reader"; * - * const file = await Deno.open("./file.txt", { read: true }); + * using file = await Deno.open("./README.md", { read: true }); * const fileStream = readableStreamFromReader(file); * ``` * - * @deprecated This will be removed in 1.0.0. Use {@linkcode toReadableStream} instead. + * @deprecated This will be removed in 1.0.0. Use {@linkcode https://jsr.io/@std/io/doc/~/toReadableStream | toReadableStream} instead. */ export function readableStreamFromReader( reader: Reader | (Reader & Closer), diff --git a/streams/reader_from_iterable.ts b/streams/reader_from_iterable.ts index 8108db17aa8b..ae498b5bd9d6 100644 --- a/streams/reader_from_iterable.ts +++ b/streams/reader_from_iterable.ts @@ -6,21 +6,28 @@ import { writeAll } from "@std/io/write-all"; import type { Reader } from "@std/io/types"; /** - * Create a {@linkcode Reader} from an iterable of {@linkcode Uint8Array}s. + * Create a {@linkcode https://jsr.io/@std/io/doc/types/~/Reader | Reader} from an iterable of {@linkcode Uint8Array}s. * + * @param iterable An iterable or async iterable of `Uint8Array`s to convert into a `Reader`. + * @returns A `Reader` that reads from the iterable. + * + * @example Write `Deno.build` information to the blackhole 3 times every second * ```ts * import { readerFromIterable } from "@std/streams/reader-from-iterable"; * import { copy } from "@std/io/copy"; + * import { delay } from "@std/async/delay"; + * import { devNull } from "node:os"; * - * const file = await Deno.open("build.txt", { write: true }); * const reader = readerFromIterable((async function* () { - * while (true) { - * await new Promise((r) => setTimeout(r, 1000)); + * for (let i = 0; i < 3; i++) { + * await delay(1000); * const message = `data: ${JSON.stringify(Deno.build)}\n\n`; * yield new TextEncoder().encode(message); * } * })()); - * await copy(reader, file); + * + * using blackhole = await Deno.open(devNull, { write: true }); + * await copy(reader, blackhole); * ``` * * @deprecated This will be removed in 1.0.0. Use {@linkcode ReadableStream.from} instead. diff --git a/streams/reader_from_stream_reader.ts b/streams/reader_from_stream_reader.ts index 972f6299a02f..960a831107e1 100644 --- a/streams/reader_from_stream_reader.ts +++ b/streams/reader_from_stream_reader.ts @@ -5,18 +5,22 @@ import { readerFromStreamReader as _readerFromStreamReader } from "@std/io/reade import type { Reader } from "@std/io/types"; /** - * Create a {@linkcode Reader} from a {@linkcode ReadableStreamDefaultReader}. + * Create a {@linkcode https://jsr.io/@std/io/doc/types/~/Reader | Reader} from a {@linkcode ReadableStreamDefaultReader}. * - * @example + * @param streamReader A `ReadableStreamDefaultReader` to convert into a `Reader`. + * @returns A `Reader` that reads from the `streamReader`. + * + * @example Copy the response body of a fetch request to the blackhole * ```ts * import { copy } from "@std/io/copy"; * import { readerFromStreamReader } from "@std/streams/reader-from-stream-reader"; + * import { devNull } from "node:os"; * * const res = await fetch("https://deno.land"); - * using file = await Deno.open("./deno.land.html", { create: true, write: true }); + * using blackhole = await Deno.open(devNull, { write: true }); * * const reader = readerFromStreamReader(res.body!.getReader()); - * await copy(reader, file); + * await copy(reader, blackhole); * ``` * * @deprecated This will be removed in 1.0.0. Import from diff --git a/streams/text_delimiter_stream.ts b/streams/text_delimiter_stream.ts index 7f0a3d58e06a..b36a7a2d722f 100644 --- a/streams/text_delimiter_stream.ts +++ b/streams/text_delimiter_stream.ts @@ -9,15 +9,49 @@ import type { } from "./delimiter_stream.ts"; /** - * Transform a stream into a stream where each chunk is divided by a given delimiter. + * Transform a stream `string` into a stream where each chunk is divided by a + * given delimiter. * - * @example + * If you are working with a stream of `Uint8Array`, consider using {@linkcode DelimiterStream}. + * + * If you want to split by a newline, consider using {@linkcode TextLineStream}. + * + * @example Comma-separated values * ```ts * import { TextDelimiterStream } from "@std/streams/text-delimiter-stream"; - * const res = await fetch("https://example.com"); - * const parts = res.body! - * .pipeThrough(new TextDecoderStream()) - * .pipeThrough(new TextDelimiterStream("foo")); + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream = ReadableStream.from([ + * "alice,20,", + * ",US,", + * ]); + * + * const valueStream = stream.pipeThrough(new TextDelimiterStream(",")); + * + * assertEquals( + * await Array.fromAsync(valueStream), + * ["alice", "20", "", "US", ""], + * ); + * ``` + * + * @example Semicolon-separated values with suffix disposition + * ```ts + * import { TextDelimiterStream } from "@std/streams/text-delimiter-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream = ReadableStream.from([ + * "const a = 42;;let b =", + * " true;", + * ]); + * + * const valueStream = stream.pipeThrough( + * new TextDelimiterStream(";", { disposition: "suffix" }), + * ); + * + * assertEquals( + * await Array.fromAsync(valueStream), + * ["const a = 42;", ";", "let b = true;", ""], + * ); * ``` */ export class TextDelimiterStream extends TransformStream { @@ -28,8 +62,32 @@ export class TextDelimiterStream extends TransformStream { #delimLPS: Uint8Array; #disp: DelimiterDisposition; - /** Constructs a new instance. */ - constructor(delimiter: string, options?: DelimiterStreamOptions) { + /** + * Constructs a new instance. + * + * @param delimiter A delimiter to split the stream by. + * @param options Options for the stream. + * + * @example comma as a delimiter + * ```ts + * import { TextDelimiterStream } from "@std/streams/text-delimiter-stream"; + * + * const delimiterStream = new TextDelimiterStream(","); + * ``` + * + * @example semicolon as a delimiter, and disposition set to `"suffix"` + * ```ts + * import { TextDelimiterStream } from "@std/streams/text-delimiter-stream"; + * + * const delimiterStream = new TextDelimiterStream(",", { + * disposition: "suffix", + * }); + * ``` + */ + constructor( + delimiter: string, + options: DelimiterStreamOptions = { disposition: "discard" }, + ) { super({ transform: (chunk, controller) => { this.#handle(chunk, controller); @@ -41,7 +99,7 @@ export class TextDelimiterStream extends TransformStream { this.#delimiter = delimiter; this.#delimLPS = createLPS(new TextEncoder().encode(delimiter)); - this.#disp = options?.disposition ?? "discard"; + this.#disp = options.disposition ?? "discard"; } #handle( diff --git a/streams/text_delimiter_stream_test.ts b/streams/text_delimiter_stream_test.ts index 90e912b8caef..3951de38adae 100644 --- a/streams/text_delimiter_stream_test.ts +++ b/streams/text_delimiter_stream_test.ts @@ -77,3 +77,21 @@ Deno.test("TextDelimiterStream handles prefix", async () => { await testTransformStream(delimStream, inputs, outputs); }); + +Deno.test("TextDelimiterStream handles JSONL with an empty line in the middle and trailing newline", async () => { + const delimStream = new TextDelimiterStream("\n"); + + const inputs = [ + '{"a": 1}\n', + '\n{"a', + '": 2, "b": true}\n', + ]; + const outputs = [ + '{"a": 1}', + "", + '{"a": 2, "b": true}', + "", + ]; + + await testTransformStream(delimStream, inputs, outputs); +}); diff --git a/streams/text_line_stream.ts b/streams/text_line_stream.ts index f20add6ad2a5..f68328d99c43 100644 --- a/streams/text_line_stream.ts +++ b/streams/text_line_stream.ts @@ -15,20 +15,79 @@ export interface TextLineStreamOptions { * Transform a stream into a stream where each chunk is divided by a newline, * be it `\n` or `\r\n`. `\r` can be enabled via the `allowCR` option. * - * @example + * If you want to split by a custom delimiter, consider using {@linkcode TextDelimiterStream}. + * + * @example JSON Lines + * ```ts + * import { TextLineStream } from "@std/streams/text-line-stream"; + * import { toTransformStream } from "@std/streams/to-transform-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream = ReadableStream.from([ + * '{"name": "Alice", "age": ', + * '30}\n{"name": "Bob", "age"', + * ": 25}\n", + * ]); + * + * type Person = { name: string; age: number }; + * + * // Split the stream by newline and parse each line as a JSON object + * const jsonStream = stream.pipeThrough(new TextLineStream()) + * .pipeThrough(toTransformStream(async function* (src) { + * for await (const chunk of src) { + * if (chunk.trim().length === 0) { + * continue; + * } + * yield JSON.parse(chunk) as Person; + * } + * })); + * + * assertEquals( + * await Array.fromAsync(jsonStream), + * [{ "name": "Alice", "age": 30 }, { "name": "Bob", "age": 25 }], + * ); + * ``` + * + * @example allowCR: true * ```ts * import { TextLineStream } from "@std/streams/text-line-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; * - * const res = await fetch("https://example.com"); - * const lines = res.body! - * .pipeThrough(new TextDecoderStream()) - * .pipeThrough(new TextLineStream()); + * const stream = ReadableStream.from([ + * "CR\rLF", + * "\nCRLF\r\ndone", + * ]); + * + * const lineStream = stream.pipeThrough(new TextLineStream({ allowCR: true })); + * + * assertEquals( + * await Array.fromAsync(lineStream), + * ["CR", "LF", "CRLF", "done"], + * ); * ``` */ export class TextLineStream extends TransformStream { #currentLine = ""; - /** Constructs a new instance. */ + /** + * Constructs a new instance. + * + * @param options Options for the stream. + * + * @example No parameters + * ```ts + * import { TextLineStream } from "@std/streams/text-line-stream"; + * + * const textLineStream = new TextLineStream(); + * ``` + * + * @example allowCR = true + * ```ts + * import { TextLineStream } from "@std/streams/text-line-stream"; + * + * const textLineStream = new TextLineStream({ allowCR: true }); + * ``` + */ constructor(options: TextLineStreamOptions = { allowCR: false }) { super({ transform: (chars, controller) => { diff --git a/streams/to_array_buffer.ts b/streams/to_array_buffer.ts index f60aae3a8882..292be918345d 100644 --- a/streams/to_array_buffer.ts +++ b/streams/to_array_buffer.ts @@ -5,17 +5,22 @@ import { concat } from "@std/bytes/concat"; /** * Converts a {@linkcode ReadableStream} of {@linkcode Uint8Array}s to an - * {@linkcode ArrayBuffer}. Works the same as{@linkcode Response.arrayBuffer}. + * {@linkcode ArrayBuffer}. Works the same as {@linkcode Response.arrayBuffer}. * - * @example + * @param readableStream A `ReadableStream` of `Uint8Array`s to convert into an `ArrayBuffer`. + * @returns A promise that resolves with the `ArrayBuffer` containing all the data from the stream. + * + * @example Basic usage * ```ts * import { toArrayBuffer } from "@std/streams/to-array-buffer"; + * import { assertEquals } from "@std/assert/assert-equals"; * * const stream = ReadableStream.from([ * new Uint8Array([1, 2]), - * new Uint8Array([3, 4]), + * new Uint8Array([3, 4, 5]), * ]); - * await toArrayBuffer(stream); // ArrayBuffer { [Uint8Contents]: <01 02 03 04>, byteLength: 4 } + * const buf = await toArrayBuffer(stream); + * assertEquals(buf.byteLength, 5); * ``` */ export async function toArrayBuffer( diff --git a/streams/to_blob.ts b/streams/to_blob.ts index a9f38da0f0a4..a467102d73e3 100644 --- a/streams/to_blob.ts +++ b/streams/to_blob.ts @@ -5,12 +5,20 @@ * Converts a {@linkcode ReadableStream} of {@linkcode Uint8Array}s to a * {@linkcode Blob}. Works the same as {@linkcode Response.blob}. * - * @example + * @param stream A `ReadableStream` of `Uint8Array`s to convert into a `Blob`. + * @returns A `Promise` that resolves to the `Blob`. + * + * @example Basic usage * ```ts * import { toBlob } from "@std/streams/to-blob"; + * import { assertEquals } from "@std/assert/assert-equals"; * - * const stream = ReadableStream.from([new Uint8Array(1), new Uint8Array(2)]); - * await toBlob(stream); // Blob { size: 3, type: "" } + * const stream = ReadableStream.from([ + * new Uint8Array([1, 2]), + * new Uint8Array([3, 4, 5]), + * ]); + * const blob = await toBlob(stream); + * assertEquals(blob.size, 5); * ``` */ export async function toBlob( diff --git a/streams/to_json.ts b/streams/to_json.ts index 2365e1fb9699..4269b2b1a8fd 100644 --- a/streams/to_json.ts +++ b/streams/to_json.ts @@ -8,12 +8,21 @@ import { toText } from "./to_text.ts"; * {@linkcode Uint8Array}s to an object. Works the same as * {@linkcode Response.json}. * - * @example + * @param readableStream A `ReadableStream` whose chunks compose a JSON. + * @returns A promise that resolves to the parsed JSON. + * + * @example Basic usage * ```ts * import { toJson } from "@std/streams/to-json"; + * import { assertEquals } from "@std/assert/assert-equals"; * - * const stream = ReadableStream.from([JSON.stringify({ hello: "world" })]); - * await toJson(stream); // { hello: "world" } + * const stream = ReadableStream.from([ + * "[1, true", + * ', [], {}, "hello', + * '", null]', + * ]); + * const json = await toJson(stream); + * assertEquals(json, [1, true, [], {}, "hello", null]); * ``` */ export function toJson( diff --git a/streams/to_text.ts b/streams/to_text.ts index 414690123580..c0f1a74033c5 100644 --- a/streams/to_text.ts +++ b/streams/to_text.ts @@ -7,12 +7,16 @@ const textDecoder = new TextDecoder(); * Converts a {@linkcode ReadableSteam} of strings or {@linkcode Uint8Array}s * to a single string. Works the same as {@linkcode Response.text}. * - * @example + * @param readableStream A `ReadableStream` to convert into a `string`. + * @returns A `Promise` that resolves to the `string`. + * + * @example Basic usage * ```ts * import { toText } from "@std/streams/to-text"; + * import { assertEquals } from "@std/assert/assert-equals"; * * const stream = ReadableStream.from(["Hello, ", "world!"]); - * await toText(stream); // "Hello, world!" + * assertEquals(await toText(stream), "Hello, world!"); * ``` */ export async function toText( diff --git a/streams/to_transform_stream.ts b/streams/to_transform_stream.ts index 4c1c4ee35830..7cdfea8b4d88 100644 --- a/streams/to_transform_stream.ts +++ b/streams/to_transform_stream.ts @@ -4,26 +4,61 @@ /** * Convert the generator function into a {@linkcode TransformStream}. * - * @example + * @typeparam I The type of the chunks in the source stream. + * @typeparam O The type of the chunks in the transformed stream. + * @param transformer A function to transform. + * @param writableStrategy An object that optionally defines a queuing strategy for the stream. + * @param readableStrategy An object that optionally defines a queuing strategy for the stream. + * @returns A {@linkcode TransformStream} that transforms the source stream as defined by the provided transformer. + * + * @example Build a transform stream that multiplies each value by 100 * ```ts * import { toTransformStream } from "@std/streams/to-transform-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; * - * const readable = ReadableStream.from([0, 1, 2]) + * const stream = ReadableStream.from([0, 1, 2]) * .pipeThrough(toTransformStream(async function* (src) { * for await (const chunk of src) { * yield chunk * 100; * } * })); * - * for await (const chunk of readable) { - * console.log(chunk); - * } - * // output: 0, 100, 200 + * assertEquals( + * await Array.fromAsync(stream), + * [0, 100, 200], + * ); * ``` * - * @param transformer A function to transform. - * @param writableStrategy An object that optionally defines a queuing strategy for the stream. - * @param readableStrategy An object that optionally defines a queuing strategy for the stream. + * @example JSON Lines + * ```ts + * import { TextLineStream } from "@std/streams/text-line-stream"; + * import { toTransformStream } from "@std/streams/to-transform-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream = ReadableStream.from([ + * '{"name": "Alice", "age": ', + * '30}\n{"name": "Bob", "age"', + * ": 25}\n", + * ]); + * + * type Person = { name: string; age: number }; + * + * // Split the stream by newline and parse each line as a JSON object + * const jsonStream = stream.pipeThrough(new TextLineStream()) + * .pipeThrough(toTransformStream(async function* (src) { + * for await (const chunk of src) { + * if (chunk.trim().length === 0) { + * continue; + * } + * yield JSON.parse(chunk) as Person; + * } + * })); + * + * assertEquals( + * await Array.fromAsync(jsonStream), + * [{ "name": "Alice", "age": 30 }, { "name": "Bob", "age": 25 }], + * ); + * ``` */ export function toTransformStream( transformer: (src: ReadableStream) => Iterable | AsyncIterable, diff --git a/streams/writable_stream_from_writer.ts b/streams/writable_stream_from_writer.ts index 7965cafab313..52df58ea38ff 100644 --- a/streams/writable_stream_from_writer.ts +++ b/streams/writable_stream_from_writer.ts @@ -7,7 +7,7 @@ import { toWritableStream } from "@std/io/to-writable-stream"; /** * Options for {@linkcode writableStreamFromWriter}. * - * @deprecated This will be removed in 1.0.0. Use {@linkcode toWritableStream} instead. + * @deprecated This will be removed in 1.0.0. Use {@linkcode https://jsr.io/@std/io/doc/~/toWritableStream | toWritableStream} instead. */ export interface WritableStreamFromWriterOptions { /** @@ -20,9 +20,24 @@ export interface WritableStreamFromWriterOptions { } /** - * Create a {@linkcode WritableStream} from a {@linkcode Writer}. + * Create a {@linkcode WritableStream} from a {@linkcode https://jsr.io/@std/io/doc/types/~/Writer | Writer}. * - * @deprecated This will be removed in 1.0.0. Use {@linkcode toWritableStream} instead. + * @param writer A `Writer` to convert into a `WritableStream`. + * @param options Options for the `writableStreamFromWriter` function. + * @returns A `WritableStream` of `Uint8Array`s. + * + * @example Convert `Deno.stdout` into a writable stream + * ```ts + * // Note that you can directly get the writer from `Deno.stdout` by + * // `Deno.stdout.writable`. This example is just for demonstration purposes; + * // definitely not a recommended way. + * + * import { writableStreamFromWriter } from "@std/streams/writable-stream-from-writer"; + * + * const stdoutStream = writableStreamFromWriter(Deno.stdout); + * ``` + * + * @deprecated This will be removed in 1.0.0. Use {@linkcode https://jsr.io/@std/io/doc/~/toWritableStream | toWritableStream} instead. */ export function writableStreamFromWriter( writer: Writer, diff --git a/streams/writer_from_stream_writer.ts b/streams/writer_from_stream_writer.ts index 7b36b47fb81f..7e9c875dc316 100644 --- a/streams/writer_from_stream_writer.ts +++ b/streams/writer_from_stream_writer.ts @@ -3,15 +3,20 @@ import type { Writer } from "@std/io/types"; +export type { Writer }; + /** - * Create a {@linkcode Writer} from a {@linkcode WritableStreamDefaultWriter}. + * Create a {@linkcode https://jsr.io/@std/io/doc/types/~/Writer | Writer} from a {@linkcode WritableStreamDefaultWriter}. + * + * @param streamWriter A `WritableStreamDefaultWriter` to convert into a `Writer`. + * @returns A `Writer` that writes to the `WritableStreamDefaultWriter`. * - * @example + * @example Read from a file and write to stdout using a writable stream * ```ts * import { copy } from "@std/io/copy"; * import { writerFromStreamWriter } from "@std/streams/writer-from-stream-writer"; * - * using file = await Deno.open("./deno.land.html", { read: true }); + * using file = await Deno.open("./README.md", { read: true }); * * const writableStream = new WritableStream({ * write(chunk): void { diff --git a/streams/zip_readable_streams.ts b/streams/zip_readable_streams.ts index 76d547fffa6c..0fc890aebb38 100644 --- a/streams/zip_readable_streams.ts +++ b/streams/zip_readable_streams.ts @@ -4,19 +4,74 @@ /** * Merge multiple streams into a single one, taking order into account, and * each stream will wait for a chunk to enqueue before the next stream can - * append another chunk. If a stream ends before other ones, the others will - * continue adding data in order, and the finished one will not add any more - * data. + * append another chunk. * - * @example + * If a stream ends before other ones, the others will continue adding data in + * order, and the finished one will not add any more data. If you want to cancel + * the other streams when one of them ends, use {@linkcode earlyZipReadableStreams}. + * + * @typeparam T The type of the chunks in the input/output streams. + * @returns A `ReadableStream` that will emit the zipped chunks. + * + * @example Zip 2 streams with the same length * ```ts * import { zipReadableStreams } from "@std/streams/zip-readable-streams"; + * import { assertEquals } from "@std/assert/assert-equals"; * * const stream1 = ReadableStream.from(["1", "2", "3"]); * const stream2 = ReadableStream.from(["a", "b", "c"]); * const zippedStream = zipReadableStreams(stream1, stream2); * - * await Array.fromAsync(zippedStream); // ["1", "a", "2", "b", "3", "c"]; + * assertEquals( + * await Array.fromAsync(zippedStream), + * ["1", "a", "2", "b", "3", "c"], + * ); + * ``` + * + * @example Zip 2 streams with different length (first one is shorter) + * ```ts + * import { zipReadableStreams } from "@std/streams/zip-readable-streams"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream1 = ReadableStream.from(["1", "2"]); + * const stream2 = ReadableStream.from(["a", "b", "c", "d"]); + * const zippedStream = zipReadableStreams(stream1, stream2); + * + * assertEquals( + * await Array.fromAsync(zippedStream), + * ["1", "a", "2", "b", "c", "d"], + * ); + * ``` + * + * @example Zip 2 streams with different length (first one is longer) + * ```ts + * import { zipReadableStreams } from "@std/streams/zip-readable-streams"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream1 = ReadableStream.from(["1", "2", "3", "4"]); + * const stream2 = ReadableStream.from(["a", "b"]); + * const zippedStream = zipReadableStreams(stream1, stream2); + * + * assertEquals( + * await Array.fromAsync(zippedStream), + * ["1", "a", "2", "b", "3", "4"], + * ); + * ``` + * + * @example Zip 3 streams + * ```ts + * import { zipReadableStreams } from "@std/streams/zip-readable-streams"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream1 = ReadableStream.from(["1"]); + * const stream2 = ReadableStream.from(["a", "b"]); + * const stream3 = ReadableStream.from(["A", "B", "C"]); + * const zippedStream = zipReadableStreams(stream1, stream2, stream3); + * + * assertEquals( + * await Array.fromAsync(zippedStream), + * ["1", "a", "A", "b", "B", "C"], + * ); * ``` */ export function zipReadableStreams(