From fba942aac5a9c6696a234c937f1472476cd81b00 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Thu, 23 May 2024 15:03:18 +0900 Subject: [PATCH 01/34] improve streams/buffer.ts --- streams/buffer.ts | 265 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 248 insertions(+), 17 deletions(-) diff --git a/streams/buffer.ts b/streams/buffer.ts index 0a47c73b9b20..28d2b9124a80 100644 --- a/streams/buffer.ts +++ b/streams/buffer.ts @@ -7,7 +7,9 @@ import { copy } from "@std/bytes/copy"; const MAX_SIZE = 2 ** 32 - 2; const DEFAULT_CHUNK_SIZE = 16_640; -/** A variable-sized buffer of bytes with `read()` and `write()` methods. +/** + * A variable-sized buffer of bytes with `readable` and `writable` getters that + * allows you to work with {@link https://developer.mozilla.org/en-US/docs/Web/API/Streams_API | Web Streams API}. * * Buffer is almost always used with some I/O like files and sockets. It allows * one to buffer up a download from a socket. Buffer grows and shrinks as @@ -20,7 +22,32 @@ const DEFAULT_CHUNK_SIZE = 16_640; * ArrayBuffer is a fixed memory allocation. Buffer is implemented on top of * ArrayBuffer. * - * Based on {@link https://golang.org/pkg/bytes/#Buffer | Go Buffer}. */ + * Based on {@link https://golang.org/pkg/bytes/#Buffer | Go Buffer}. + * + * @example Copy a file to another file via a buffer + * ```ts + * // File copy can be done with various ways. This example aims to demonstrate + * // how to use Buffer with other ReadableStream and WritableStream. + * + * import { assert } from "@std/assert/assert"; + * import { assertEquals } from "@std/assert/assert-equals"; + * import { Buffer } from "@std/streams/buffer"; + * + * const buf = new Buffer(); + * assertEquals(buf.capacity, 0); + * assertEquals(buf.length, 0); + * + * using input = await Deno.open("input.txt"); + * using output = await Deno.open("output.txt", { write: true, create: true }); + * + * await input.readable.pipeTo(buf.writable); + * assert(buf.capacity > 0); + * assert(buf.length > 0); + * + * await buf.readable.pipeTo(output.writable); + * assert(buf.empty()); + * ``` + */ export class Buffer { #buf: Uint8Array; // contents are the bytes buf[off : len(buf)] #off = 0; // read at buf[off], write at buf[buf.byteLength] @@ -42,7 +69,17 @@ export class Buffer { autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, }); - /** Getter returning the instance's {@linkcode ReadableStream}. */ + /** + * Getter returning the instance's {@linkcode ReadableStream}. + * + * @example Read the content out of the buffer to stdout + * ```ts + * import { Buffer } from "@std/streams/buffer"; + * + * const buf = new Buffer(); + * await buf.readable.pipeTo(Deno.stdout.writable); + * ``` + */ get readable(): ReadableStream { return this.#readable; } @@ -54,41 +91,190 @@ export class Buffer { }, }); - /** Getter returning the instance's {@linkcode WritableStream}. */ + /** + * Getter returning the instance's {@linkcode WritableStream}. + * + * @example Write the data from stdin to the buffer + * ```ts + * import { Buffer } from "@std/streams/buffer"; + * + * const buf = new Buffer(); + * await Deno.stdin.readable.pipeTo(buf.writable); + * ``` + */ get writable(): WritableStream { return this.#writable; } - /** Constructs a new instance. */ + /** + * Constructs a new instance. + * + * @example No initial buffer provided + * ```ts + * import { Buffer } from "@std/streams/buffer"; + * + * const buf = new Buffer(); + * ``` + * + * @example With a pre-allocated buffer + * ```ts + * import { Buffer } from "@std/streams/buffer"; + * + * const arrayBuffer = new ArrayBuffer(8); + * const buf = new Buffer(arrayBuffer); + * ``` + * + * @example From Uint8Array + * ```ts + * import { Buffer } from "@std/streams/buffer"; + * + * const array = new Uint8Array([0, 1, 2]); + * const buf = new Buffer(array.buffer); + * ``` + */ constructor(ab?: ArrayBufferLike | ArrayLike) { this.#buf = ab === undefined ? new Uint8Array(0) : new Uint8Array(ab); } - /** Returns a slice holding the unread portion of the buffer. + /** + * Returns a slice holding the unread portion of the buffer. * * The slice is valid for use only until the next buffer modification (that - * is, only until the next call to a method like `read()`, `write()`, - * `reset()`, or `truncate()`). If `options.copy` is false the slice aliases - * the buffer content at least until the next buffer modification, so - * immediate changes to the slice will affect the result of future reads. + * is, only until the next call to a method that mutates or consumes the + * buffer, like reading data out via `readable`, `reset()`, or `truncate()`). + * + * If `options.copy` is false the slice aliases the buffer content at least + * until the next buffer modification, so immediate changes to the slice will + * affect the result of future reads. If `options` is not provided, + * `options.copy` defaults to `true`. + * + * @example Copy the buffer + * ```ts + * import { assertEquals } from "@std/assert/assert-equals"; + * import { assertNotEquals } from "@std/assert/assert-not-equals"; + * import { Buffer } from "@std/streams/buffer"; + * + * const array = new Uint8Array([0, 1, 2]); + * const buf = new Buffer(array.buffer); + * const copied = buf.bytes(); + * assertEquals(copied.length, array.length); + * + * // Modify an element in the original array + * array[1] = 99; + * assertEquals(copied[0], array[0]); + * // The copied buffer is not affected by the modification + * assertNotEquals(copied[1], array[1]); + * assertEquals(copied[2], array[2]); + * ``` + * + * @example Get a slice to the buffer + * ```ts + * import { assertEquals } from "@std/assert/assert-equals"; + * import { Buffer } from "@std/streams/buffer"; + * + * const array = new Uint8Array([0, 1, 2]); + * const buf = new Buffer(array.buffer); + * const slice = buf.bytes({ copy: false }); + * assertEquals(slice.length, array.length); + * + * // Modify an element in the original array + * array[1] = 99; + * assertEquals(slice[0], array[0]); + * // The slice _is_ affected by the modification + * assertEquals(slice[1], array[1]); + * assertEquals(slice[2], array[2]); + * ``` */ bytes(options = { copy: true }): Uint8Array { if (options.copy === false) return this.#buf.subarray(this.#off); return this.#buf.slice(this.#off); } - /** Returns whether the unread portion of the buffer is empty. */ + /** + * Returns whether the unread portion of the buffer is empty. + * + * @example Empty buffer + * ```ts + * import { assert } from "@std/assert/assert"; + * import { Buffer } from "@std/streams/buffer"; + * + * const buf = new Buffer(); + * assert(buf.empty()); + * ``` + * + * @example Non-empty buffer + * ```ts + * import { assert } from "@std/assert/assert"; + * import { Buffer } from "@std/streams/buffer"; + * + * const array = new Uint8Array([42]); + * const buf = new Buffer(array.buffer); + * assert(!buf.empty()); + * ``` + * + * @example Non-empty, but the content was already read + * ```ts + * import { assert } from "@std/assert/assert"; + * import { Buffer } from "@std/streams/buffer"; + * + * const array = new Uint8Array([42]); + * const buf = new Buffer(array.buffer); + * assert(!buf.empty()); + * // Read the content out of the buffer + * await buf.readable.pipeTo(Deno.stdout.writable); + * // The buffer is now empty + * assert(buf.empty()); + * ``` + */ empty(): boolean { return this.#buf.byteLength <= this.#off; } - /** A read only number of bytes of the unread portion of the buffer. */ + /** + * A read only number of bytes of the unread portion of the buffer. + * + * @example Basic usage + * ```ts + * import { assertEquals } from "@std/assert/assert-equals"; + * import { Buffer } from "@std/streams/buffer"; + * + * const array = new Uint8Array([0, 1, 2]); + * const buf = new Buffer(array.buffer); + * assertEquals(buf.length, 3); + * ``` + * + * @example Length becomes 0 after the content is read + * ```ts + * import { assertEquals } from "@std/assert/assert-equals"; + * import { Buffer } from "@std/streams/buffer"; + * + * const array = new Uint8Array([42]); + * const buf = new Buffer(array.buffer); + * assertEquals(buf.length, 1); + * // Read the content out of the buffer + * await buf.readable.pipeTo(Deno.stdout.writable); + * // The length is now 0 + * assertEquals(buf.length, 0); + * ``` + */ get length(): number { return this.#buf.byteLength - this.#off; } - /** The read only capacity of the buffer's underlying byte slice, that is, - * the total space allocated for the buffer's data. */ + /** + * The read only capacity of the buffer's underlying byte slice, that is, + * the total space allocated for the buffer's data. + * + * @example Basic usage + * ```ts + * import { assertEquals } from "@std/assert/assert-equals"; + * import { Buffer } from "@std/streams/buffer"; + * + * const arrayBuffer = new ArrayBuffer(256); + * const buf = new Buffer(arrayBuffer); + * assertEquals(buf.capacity, 256); + * ``` + */ get capacity(): number { return this.#buf.buffer.byteLength; } @@ -97,6 +283,20 @@ export class Buffer { * Discards all but the first `n` unread bytes from the buffer but * continues to use the same allocated storage. It throws if `n` is * negative or greater than the length of the buffer. + * + * @example Basic usage + * ```ts + * import { assertEquals } from "@std/assert/assert-equals"; + * import { Buffer } from "@std/streams/buffer"; + * + * const array = new Uint8Array([0, 1, 2]); + * const buf = new Buffer(array.buffer); + * assertEquals(buf.bytes(), array); + * + * // Discard all but the first 2 bytes + * buf.truncate(2); + * assertEquals(buf.bytes(), array.slice(0, 2)); + * ``` */ truncate(n: number): void { if (n === 0) { @@ -109,7 +309,23 @@ export class Buffer { this.#reslice(this.#off + n); } - /** Resets to an empty buffer. */ + /** + * Resets to an empty buffer. + * + * @example Basic usage + * ```ts + * import { assert } from "@std/assert/assert"; + * import { Buffer } from "@std/streams/buffer"; + * + * const array = new Uint8Array([0, 1, 2]); + * const buf = new Buffer(array.buffer); + * assert(!buf.empty()); + * + * // Reset + * buf.reset(); + * assert(buf.empty()); + * ``` + */ reset() { this.#reslice(0); this.#off = 0; @@ -161,13 +377,28 @@ export class Buffer { return m; } - /** Grows the buffer's capacity, if necessary, to guarantee space for + /** + * Grows the buffer's capacity, if necessary, to guarantee space for * another `n` bytes. After `.grow(n)`, at least `n` bytes can be written to * the buffer without another allocation. If `n` is negative, `.grow()` will * throw. If the buffer can't grow it will throw an error. * * Based on Go Lang's - * {@link https://golang.org/pkg/bytes/#Buffer.Grow | Buffer.Grow}. */ + * {@link https://golang.org/pkg/bytes/#Buffer.Grow | Buffer.Grow}. + * + * @example Basic usage + * ```ts + * import { assert } from "@std/assert/assert"; + * import { assertEquals } from "@std/assert/assert-equals"; + * import { Buffer } from "@std/streams/buffer"; + * + * const buf = new Buffer(); + * assertEquals(buf.capacity, 0); + * + * buf.grow(200); + * assert(buf.capacity >= 200); + * ``` + */ grow(n: number) { if (n < 0) { throw Error("Buffer.grow: negative count"); From 65ba402685df4b4ca0bba0bcecf2f934a77671bb Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Thu, 23 May 2024 15:51:22 +0900 Subject: [PATCH 02/34] improve streams/byte_slice_stream.ts --- streams/byte_slice_stream.ts | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/streams/byte_slice_stream.ts b/streams/byte_slice_stream.ts index 52e46290dda9..5b4f0e3e5a60 100644 --- a/streams/byte_slice_stream.ts +++ b/streams/byte_slice_stream.ts @@ -7,13 +7,33 @@ import { assert } from "@std/assert/assert"; * A transform stream that only transforms from the zero-indexed `start` and * `end` bytes (both inclusive). * - * @example + * @example Basic usage * ```ts * import { ByteSliceStream } from "@std/streams/byte-slice-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream = ReadableStream.from([ + * new Uint8Array([0, 1]), + * new Uint8Array([2, 3, 4]), + * ]); + * const slicedStream = stream.pipeThrough(new ByteSliceStream(1, 3)); + * + * assertEquals( + * await Array.fromAsync(slicedStream), + * [new Uint8Array([1]), new Uint8Array([2, 3])] + * ); + * ``` + * + * @example Get a range of bytes from a fetch response body + * ```ts + * import { ByteSliceStream } from "@std/streams/byte-slice-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; * * const response = await fetch("https://example.com"); * const rangedStream = response.body! * .pipeThrough(new ByteSliceStream(3, 8)); + * const collected = await Array.fromAsync(rangedStream); + * assertEquals(collected[0].length, 6); * ``` */ export class ByteSliceStream extends TransformStream { From b56c2a6637e93cabd4d95a7cf6ad1454f0603317 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Thu, 23 May 2024 17:18:56 +0900 Subject: [PATCH 03/34] improve streams/delimiter_stream.ts --- streams/delimiter_stream.ts | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/streams/delimiter_stream.ts b/streams/delimiter_stream.ts index 96aa9fcd8b51..82b6f30a98c1 100644 --- a/streams/delimiter_stream.ts +++ b/streams/delimiter_stream.ts @@ -27,25 +27,33 @@ export interface DelimiterStreamOptions { * Divide a CSV stream by commas, discarding the commas: * ```ts * import { DelimiterStream } from "@std/streams/delimiter-stream"; - * const res = await fetch("https://example.com/data.csv"); - * const parts = res.body! + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const inputStream = ReadableStream.from(["foo,bar", ",baz"]); + * + * const transformed = inputStream.pipeThrough(new TextEncoderStream()) * .pipeThrough(new DelimiterStream(new TextEncoder().encode(","))) * .pipeThrough(new TextDecoderStream()); + * + * assertEquals(await Array.fromAsync(transformed), ["foo", "bar", "baz"]); * ``` * * @example * Divide a stream after semi-colons, keeping the semi-colons in the output: * ```ts * import { DelimiterStream } from "@std/streams/delimiter-stream"; - * const res = await fetch("https://example.com/file.js"); - * const parts = res.body! + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const inputStream = ReadableStream.from(["foo;", "bar;ba", "z;"]); + * + * const transformed = inputStream.pipeThrough(new TextEncoderStream()) * .pipeThrough( - * new DelimiterStream( - * new TextEncoder().encode(";"), - * { disposition: "suffix" }, - * ) - * ) - * .pipeThrough(new TextDecoderStream()); + * new DelimiterStream(new TextEncoder().encode(";"), { + * disposition: "suffix", + * }), + * ).pipeThrough(new TextDecoderStream()); + * + * assertEquals(await Array.fromAsync(transformed), ["foo;", "bar;", "baz;"]); * ``` */ export class DelimiterStream extends TransformStream { From a3596c812974b3c81dd3527ce3450ac196a74b51 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Thu, 23 May 2024 19:08:15 +0900 Subject: [PATCH 04/34] improve streams/early_zip_readable_streams.ts --- streams/early_zip_readable_streams.ts | 39 +++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/streams/early_zip_readable_streams.ts b/streams/early_zip_readable_streams.ts index 32331db13b16..613b613bfb4a 100644 --- a/streams/early_zip_readable_streams.ts +++ b/streams/early_zip_readable_streams.ts @@ -6,15 +6,50 @@ * will wait for a chunk to enqueue before the next stream can append another chunk. * If a stream ends before other ones, the others will be cancelled. * - * @example + * @example Zip 2 streams with the same length * ```ts * import { earlyZipReadableStreams } from "@std/streams/early-zip-readable-streams"; + * import { assertEquals } from "@std/assert/assert-equals"; * * const stream1 = ReadableStream.from(["1", "2", "3"]); * const stream2 = ReadableStream.from(["a", "b", "c"]); * const zippedStream = earlyZipReadableStreams(stream1, stream2); * - * await Array.fromAsync(zippedStream); // ["1", "a", "2", "b", "3", "c"]; + * assertEquals( + * await Array.fromAsync(zippedStream), + * ["1", "a", "2", "b", "3", "c"], + * ); + * ``` + * + * @example Zip 2 streams with differen length + * ```ts + * import { earlyZipReadableStreams } from "@std/streams/early-zip-readable-streams"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream1 = ReadableStream.from(["1", "2"]); + * const stream2 = ReadableStream.from(["a", "b", "c"]); + * const zippedStream = earlyZipReadableStreams(stream1, stream2); + * + * assertEquals( + * await Array.fromAsync(zippedStream), + * ["1", "a", "2", "b"], + * ); + * ``` + * + * @example Zip 3 streams with differen length + * ```ts + * import { earlyZipReadableStreams } from "@std/streams/early-zip-readable-streams"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream1 = ReadableStream.from(["1"]); + * const stream2 = ReadableStream.from(["a", "b"]); + * const stream3 = ReadableStream.from(["A", "B", "C"]); + * const zippedStream = earlyZipReadableStreams(stream1, stream2, stream3); + * + * assertEquals( + * await Array.fromAsync(zippedStream), + * ["1", "a", "A"], + * ); * ``` */ export function earlyZipReadableStreams( From 122a38e00adc176b0c970fcde69866943347e08a Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Thu, 23 May 2024 19:17:24 +0900 Subject: [PATCH 05/34] improve streams/iterate_reader.ts --- streams/iterate_reader.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/iterate_reader.ts b/streams/iterate_reader.ts index d1db22eb4572..1799071f3042 100644 --- a/streams/iterate_reader.ts +++ b/streams/iterate_reader.ts @@ -39,7 +39,7 @@ export type { Reader, ReaderSync }; * ``` * * @deprecated This will be removed in 1.0.0. Import from - * {@linkhttps://jsr.io/@std/io | @std/io} instead. + * {@link https://jsr.io/@std/io | @std/io} instead. */ export function iterateReader( r: Reader, From 23fa26a8c0030a9ac4db68de033227ce58a5eb85 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 00:15:23 +0900 Subject: [PATCH 06/34] improve streams/limited_bytes_transform_stream.ts --- streams/limited_bytes_transform_stream.ts | 77 +++++++-- .../limited_bytes_transform_stream_test.ts | 150 +++++++++++++++++- 2 files changed, 215 insertions(+), 12 deletions(-) diff --git a/streams/limited_bytes_transform_stream.ts b/streams/limited_bytes_transform_stream.ts index 96048f61be63..d10c8fe0c689 100644 --- a/streams/limited_bytes_transform_stream.ts +++ b/streams/limited_bytes_transform_stream.ts @@ -2,20 +2,79 @@ // This module is browser compatible. /** - * A {@linkcode TransformStream} that will only read & enqueue `size` amount of - * bytes. This operation is chunk based and not BYOB based, and as such will - * read more than needed. + * A {@linkcode TransformStream} that will only read & enqueue chunks until the + * total amount of enqueued data exceeds `size`. The last chunk that would + * exceed the limit will NOT be enqueued, in which case a {@linkcode RangeError} + * is thrown when `options.error` is set to true, otherwise the stream is just + * terminated. * - * If `options.error` is set, then instead of terminating the stream, - * an error will be thrown. + * @example `size` is equal to the total byte length of the chunks + * ```ts + * import { LimitedBytesTransformStream } from "@std/streams/limited-bytes-transform-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream = ReadableStream.from(["1234", "5678"]); + * const transformed = stream.pipeThrough(new TextEncoderStream()).pipeThrough( + * new LimitedBytesTransformStream(8), + * ).pipeThrough(new TextDecoderStream()); + * + * assertEquals( + * await Array.fromAsync(transformed), + * ["1234", "5678"], + * ); + * ``` + * + * @example `size` is less than the total byte length of the chunks, and at the + * boundary of the chunks + * ```ts + * import { LimitedBytesTransformStream } from "@std/streams/limited-bytes-transform-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream = ReadableStream.from(["1234", "5678"]); + * const transformed = stream.pipeThrough(new TextEncoderStream()).pipeThrough( + * // `4` is the boundary of the chunks + * new LimitedBytesTransformStream(4), + * ).pipeThrough(new TextDecoderStream()); * - * @example + * assertEquals( + * await Array.fromAsync(transformed), + * // The first chunk was read, but the second chunk was not + * ["1234"], + * ); + * ``` + * + * @example `size` is less than the total byte length of the chunks, and not at + * the boundary of the chunks * ```ts * import { LimitedBytesTransformStream } from "@std/streams/limited-bytes-transform-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream = ReadableStream.from(["1234", "5678"]); + * const transformed = stream.pipeThrough(new TextEncoderStream()).pipeThrough( + * // `5` is not the boundary of the chunks + * new LimitedBytesTransformStream(5), + * ).pipeThrough(new TextDecoderStream()); + * + * assertEquals( + * await Array.fromAsync(transformed), + * // The second chunk was not read because it would exceed the specified size + * ["1234"], + * ); + * ``` + * + * @example error: true + * ```ts + * import { LimitedBytesTransformStream } from "@std/streams/limited-bytes-transform-stream"; + * import { assertRejects } from "@std/assert/assert-rejects"; + * + * const stream = ReadableStream.from(["1234", "5678"]); + * const transformed = stream.pipeThrough(new TextEncoderStream()).pipeThrough( + * new LimitedBytesTransformStream(5), + * ).pipeThrough(new TextDecoderStream()); * - * const res = await fetch("https://example.com"); - * const parts = res.body! - * .pipeThrough(new LimitedBytesTransformStream(512 * 1024)); + * await assertRejects(async () => { + * await Array.fromAsync(transformed); + * }, RangeError); * ``` */ export class LimitedBytesTransformStream diff --git a/streams/limited_bytes_transform_stream_test.ts b/streams/limited_bytes_transform_stream_test.ts index 74042288c0a9..339a42e29ec9 100644 --- a/streams/limited_bytes_transform_stream_test.ts +++ b/streams/limited_bytes_transform_stream_test.ts @@ -3,7 +3,24 @@ import { assertEquals, assertRejects } from "@std/assert"; import { LimitedBytesTransformStream } from "./limited_bytes_transform_stream.ts"; -Deno.test("LimitedBytesTransformStream", async function () { +Deno.test("LimitedBytesTransformStream - specified size is the boundary of the chunks", async function () { + const r = ReadableStream.from([ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]).pipeThrough(new LimitedBytesTransformStream(6)); + + const chunks = await Array.fromAsync(r); + assertEquals(chunks, [ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + ]); +}); + +Deno.test("LimitedBytesTransformStream - specified size is not the boundary of the chunks", async function () { const r = ReadableStream.from([ new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6]), @@ -14,10 +31,95 @@ Deno.test("LimitedBytesTransformStream", async function () { ]).pipeThrough(new LimitedBytesTransformStream(7)); const chunks = await Array.fromAsync(r); - assertEquals(chunks.length, 2); + assertEquals(chunks, [ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + ]); +}); + +Deno.test("LimitedBytesTransformStream - specified size is 0", async function () { + const r = ReadableStream.from([ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]).pipeThrough(new LimitedBytesTransformStream(0)); + + const chunks = await Array.fromAsync(r); + assertEquals(chunks.length, 0); +}); + +Deno.test("LimitedBytesTransformStream - specified size is equal to the total size of the chunks", async function () { + const r = ReadableStream.from([ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]).pipeThrough(new LimitedBytesTransformStream(18)); + + const chunks = await Array.fromAsync(r); + assertEquals(chunks, [ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]); +}); + +Deno.test("LimitedBytesTransformStream - specified size is greater than the total size of the chunks", async function () { + const r = ReadableStream.from([ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]).pipeThrough(new LimitedBytesTransformStream(19)); + + const chunks = await Array.fromAsync(r); + assertEquals(chunks, [ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]); }); -Deno.test("LimitedBytesTransformStream handles error", async function () { +Deno.test("LimitedBytesTransformStream - error is set to true and the specified size is 0", async function () { + const r = ReadableStream.from([ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]).pipeThrough(new LimitedBytesTransformStream(0, { error: true })); + + await assertRejects(async () => await Array.fromAsync(r), RangeError); +}); + +Deno.test("LimitedBytesTransformStream - error is set to true and the specified size is the boundary of the chunks", async function () { + const r = ReadableStream.from([ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]).pipeThrough(new LimitedBytesTransformStream(6, { error: true })); + + await assertRejects(async () => await Array.fromAsync(r), RangeError); +}); + +Deno.test("LimitedBytesTransformStream - error is set to true and the specified size is not the boundary of the chunks", async function () { const r = ReadableStream.from([ new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6]), @@ -29,3 +131,45 @@ Deno.test("LimitedBytesTransformStream handles error", async function () { await assertRejects(async () => await Array.fromAsync(r), RangeError); }); + +Deno.test("LimitedBytesTransformStream - error is set to true and specified size is equal to the total size of the chunks", async function () { + const r = ReadableStream.from([ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]).pipeThrough(new LimitedBytesTransformStream(18, { error: true })); + + const chunks = await Array.fromAsync(r); + assertEquals(chunks, [ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]); +}); + +Deno.test("LimitedBytesTransformStream - error is set to true and specified size is greater than the total size of the chunks", async function () { + const r = ReadableStream.from([ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]).pipeThrough(new LimitedBytesTransformStream(19, { error: true })); + + const chunks = await Array.fromAsync(r); + assertEquals(chunks, [ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + new Uint8Array([7, 8, 9]), + new Uint8Array([10, 11, 12]), + new Uint8Array([13, 14, 15]), + new Uint8Array([16, 17, 18]), + ]); +}); From 5ed8f71818b33e94f73c4fd09eb2f7f157d6c4c6 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 00:45:03 +0900 Subject: [PATCH 07/34] improve streams/limited_transform_stream.ts --- streams/limited_transform_stream.ts | 51 ++++++++++++++++++++++-- streams/limited_transform_stream_test.ts | 6 ++- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/streams/limited_transform_stream.ts b/streams/limited_transform_stream.ts index ce07f52c5f1e..407c253a9bfc 100644 --- a/streams/limited_transform_stream.ts +++ b/streams/limited_transform_stream.ts @@ -6,13 +6,56 @@ * chunks. * * If `options.error` is set, then instead of terminating the stream, - * an error will be thrown. + * a {@linkcode RangeError} will be thrown when the total number of enqueued + * chunks is about to exceed the specified size. * - * @example + * @example `size` is equal to the total number of chunks * ```ts * import { LimitedTransformStream } from "@std/streams/limited-transform-stream"; - * const res = await fetch("https://example.com"); - * const parts = res.body!.pipeThrough(new LimitedTransformStream(50)); + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream = ReadableStream.from(["1234", "5678"]); + * const transformed = stream.pipeThrough( + * new LimitedTransformStream(2), + * ); + * + * // All chunks were read + * assertEquals( + * await Array.fromAsync(transformed), + * ["1234", "5678"], + * ); + * ``` + * + * @example `size` is less than the total number of chunks + * ```ts + * import { LimitedTransformStream } from "@std/streams/limited-transform-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream = ReadableStream.from(["1234", "5678"]); + * const transformed = stream.pipeThrough( + * new LimitedTransformStream(1), + * ); + * + * // Only the first chunk was read + * assertEquals( + * await Array.fromAsync(transformed), + * ["1234"], + * ); + * ``` + * + * @example error: true + * ```ts + * import { LimitedTransformStream } from "@std/streams/limited-transform-stream"; + * import { assertRejects } from "@std/assert/assert-rejects"; + * + * const stream = ReadableStream.from(["1234", "5678"]); + * const transformed = stream.pipeThrough( + * new LimitedTransformStream(1, { error: true }), + * ); + * + * await assertRejects(async () => { + * await Array.fromAsync(transformed); + * }, RangeError); * ``` */ export class LimitedTransformStream extends TransformStream { diff --git a/streams/limited_transform_stream_test.ts b/streams/limited_transform_stream_test.ts index da4975baa329..b37c780a36c7 100644 --- a/streams/limited_transform_stream_test.ts +++ b/streams/limited_transform_stream_test.ts @@ -14,7 +14,11 @@ Deno.test("LimitedTransformStream", async function () { ]).pipeThrough(new LimitedTransformStream(3)); const chunks = await Array.fromAsync(r); - assertEquals(chunks.length, 3); + assertEquals(chunks, [ + "foo", + "foo", + "foo", + ]); }); Deno.test("LimitedTransformStream handles error", async function () { From 0bc0ad1d2e6bfbec0b9f74ff72b1a7bbb8c25abd Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 01:00:00 +0900 Subject: [PATCH 08/34] further improve streams/early_zip_readable_streams.ts --- streams/early_zip_readable_streams.ts | 36 ++++++++++++++++++---- streams/early_zip_readable_streams_test.ts | 21 +++++++++++++ 2 files changed, 51 insertions(+), 6 deletions(-) diff --git a/streams/early_zip_readable_streams.ts b/streams/early_zip_readable_streams.ts index 613b613bfb4a..d1c984bd8ad7 100644 --- a/streams/early_zip_readable_streams.ts +++ b/streams/early_zip_readable_streams.ts @@ -2,9 +2,12 @@ // This module is browser compatible. /** - * Merge multiple streams into a single one, taking order into account, and each stream - * will wait for a chunk to enqueue before the next stream can append another chunk. - * If a stream ends before other ones, the others will be cancelled. + * Merge multiple streams into a single one, taking order into account, and each + * stream will wait for a chunk to enqueue before the next stream can append + * another chunk. + * If a stream ends before other ones, the others will be cancelled after the + * last chunk of said straem is read. See the examples below for more + * compresensible information. * * @example Zip 2 streams with the same length * ```ts @@ -21,22 +24,43 @@ * ); * ``` * - * @example Zip 2 streams with differen length + * @example Zip 2 streams with different length (first one is shorter) * ```ts * import { earlyZipReadableStreams } from "@std/streams/early-zip-readable-streams"; * import { assertEquals } from "@std/assert/assert-equals"; * * const stream1 = ReadableStream.from(["1", "2"]); - * const stream2 = ReadableStream.from(["a", "b", "c"]); + * const stream2 = ReadableStream.from(["a", "b", "c", "d"]); * const zippedStream = earlyZipReadableStreams(stream1, stream2); * + * // The first stream ends before the second one. When the first stream ends, + * // the second one is cancelled and no more data is read or added to the + * // zipped stream. * assertEquals( * await Array.fromAsync(zippedStream), * ["1", "a", "2", "b"], * ); * ``` * - * @example Zip 3 streams with differen length + * @example Zip 2 streams with different length (first one is longer) + * ```ts + * import { earlyZipReadableStreams } from "@std/streams/early-zip-readable-streams"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream1 = ReadableStream.from(["1", "2", "3", "4"]); + * const stream2 = ReadableStream.from(["a", "b"]); + * const zippedStream = earlyZipReadableStreams(stream1, stream2); + * + * // The second stream ends before the first one. When the second stream ends, + * // the first one is cancelled, but the chunk of "3" is already read so it + * // is added to the zipped stream. + * assertEquals( + * await Array.fromAsync(zippedStream), + * ["1", "a", "2", "b", "3"], + * ); + * ``` + * + * @example Zip 3 streams * ```ts * import { earlyZipReadableStreams } from "@std/streams/early-zip-readable-streams"; * import { assertEquals } from "@std/assert/assert-equals"; diff --git a/streams/early_zip_readable_streams_test.ts b/streams/early_zip_readable_streams_test.ts index 914e7ca2fcab..b6d4529ffe9e 100644 --- a/streams/early_zip_readable_streams_test.ts +++ b/streams/early_zip_readable_streams_test.ts @@ -39,3 +39,24 @@ Deno.test("earlyZipReadableStreams() handles long first", async () => { "d", ]); }); + +Deno.test("earlyZipReadableStreams() can zip three streams", async () => { + const textStream = ReadableStream.from(["a", "b", "c", "d", "e"]); + const textStream2 = ReadableStream.from(["1", "2", "3"]); + const textStream3 = ReadableStream.from(["x", "y"]); + + const buf = await Array.fromAsync( + earlyZipReadableStreams(textStream, textStream2, textStream3), + ); + + assertEquals(buf, [ + "a", + "1", + "x", + "b", + "2", + "y", + "c", + "3", + ]); +}); From 7d0e56daea69dc889888374750172d29594dc8d6 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 01:19:50 +0900 Subject: [PATCH 09/34] improve streams/merge_readable_streams.ts --- streams/merge_readable_streams.ts | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/streams/merge_readable_streams.ts b/streams/merge_readable_streams.ts index 3c2be530875e..a4f298f43eba 100644 --- a/streams/merge_readable_streams.ts +++ b/streams/merge_readable_streams.ts @@ -5,15 +5,31 @@ * If a stream ends before other ones, the other will continue adding data, * and the finished one will not add any more data. * - * @example + * @example Merge 2 streams * ```ts * import { mergeReadableStreams } from "@std/streams/merge-readable-streams"; + * import { assertEquals } from "@std/assert/assert-equals"; * - * const stream1 = ReadableStream.from(["1", "2", "3"]); - * const stream2 = ReadableStream.from(["a", "b", "c"]); + * const stream1 = ReadableStream.from([1, 2]); + * const stream2 = ReadableStream.from([3, 4, 5]); * - * // ["2", "c", "a", "b", "3", "1"] - * await Array.fromAsync(mergeReadableStreams(stream1, stream2)); + * const mergedStream = mergeReadableStreams(stream1, stream2); + * const merged = await Array.fromAsync(mergedStream); + * assertEquals(merged.toSorted(), [1, 2, 3, 4, 5]); + * ``` + * + * @example Merge 3 streams + * ```ts + * import { mergeReadableStreams } from "@std/streams/merge-readable-streams"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream1 = ReadableStream.from([1, 2]); + * const stream2 = ReadableStream.from([3, 4, 5]); + * const stream3 = ReadableStream.from([6]); + * + * const mergedStream = mergeReadableStreams(stream1, stream2, stream3); + * const merged = await Array.fromAsync(mergedStream); + * assertEquals(merged.toSorted(), [1, 2, 3, 4, 5, 6]); * ``` */ export function mergeReadableStreams( From 0e433c3750ec2af12ca7e995f0e55e2113c8415a Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 01:43:05 +0900 Subject: [PATCH 10/34] improve streams/readable_stream_from_reader.ts --- streams/readable_stream_from_reader.ts | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/streams/readable_stream_from_reader.ts b/streams/readable_stream_from_reader.ts index 212758e5d3d7..e0fff9e50c6d 100644 --- a/streams/readable_stream_from_reader.ts +++ b/streams/readable_stream_from_reader.ts @@ -8,7 +8,7 @@ export type { Closer }; /** * Options for {@linkcode readableStreamFromReader}. * - * @deprecated This will be removed in 1.0.0. Use {@linkcode toReadableStream} instead. + * @deprecated This will be removed in 1.0.0. Use {@linkcode https://jsr.io/@std/io/doc/~/toReadableStream | toReadableStream} instead. */ export interface ReadableStreamFromReaderOptions { /** If the `reader` is also a `Closer`, automatically close the `reader` @@ -28,22 +28,21 @@ export interface ReadableStreamFromReaderOptions { /** * Create a {@linkcode ReadableStream} of {@linkcode Uint8Array}s from a - * {@linkcode Reader}. + * {@linkcode https://jsr.io/@std/io/doc/types/~/Reader | Reader}. * * When the pull algorithm is called on the stream, a chunk from the reader * will be read. When `null` is returned from the reader, the stream will be - * closed along with the reader (if it is also a `Closer`). - * - * An example converting a `Deno.FsFile` into a readable stream: + * closed along with the reader (if it is also a {@linkcode https://jsr.io/@std/io/doc/types/~/Closer | Closer}). * + * @example Convert a `Deno.FsFile` into a readable stream: * ```ts * import { readableStreamFromReader } from "@std/streams/readable-stream-from-reader"; * - * const file = await Deno.open("./file.txt", { read: true }); + * using file = await Deno.open("./file.txt", { read: true }); * const fileStream = readableStreamFromReader(file); * ``` * - * @deprecated This will be removed in 1.0.0. Use {@linkcode toReadableStream} instead. + * @deprecated This will be removed in 1.0.0. Use {@linkcode https://jsr.io/@std/io/doc/~/toReadableStream | toReadableStream} instead. */ export function readableStreamFromReader( reader: Reader | (Reader & Closer), From 5af61abca0a36046409813bcc7b1a38f69baef69 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 01:49:28 +0900 Subject: [PATCH 11/34] improve streams/readre_from_iterable.ts --- streams/reader_from_iterable.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/streams/reader_from_iterable.ts b/streams/reader_from_iterable.ts index 8108db17aa8b..29a1ca6f8a39 100644 --- a/streams/reader_from_iterable.ts +++ b/streams/reader_from_iterable.ts @@ -6,13 +6,14 @@ import { writeAll } from "@std/io/write-all"; import type { Reader } from "@std/io/types"; /** - * Create a {@linkcode Reader} from an iterable of {@linkcode Uint8Array}s. + * Create a {@linkcode https://jsr.io/@std/io/doc/types/~/Reader | Reader} from an iterable of {@linkcode Uint8Array}s. * + * @example Periodically write `Deno.build` information to `build.txt` * ```ts * import { readerFromIterable } from "@std/streams/reader-from-iterable"; * import { copy } from "@std/io/copy"; * - * const file = await Deno.open("build.txt", { write: true }); + * using file = await Deno.open("build.txt", { write: true }); * const reader = readerFromIterable((async function* () { * while (true) { * await new Promise((r) => setTimeout(r, 1000)); From 9e1f3e28305e79403c40f149e87eb699ede6e243 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 02:03:29 +0900 Subject: [PATCH 12/34] improve streams/reader_from_stream_reader.ts --- streams/reader_from_stream_reader.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streams/reader_from_stream_reader.ts b/streams/reader_from_stream_reader.ts index 972f6299a02f..c58f47aaa0f7 100644 --- a/streams/reader_from_stream_reader.ts +++ b/streams/reader_from_stream_reader.ts @@ -5,9 +5,9 @@ import { readerFromStreamReader as _readerFromStreamReader } from "@std/io/reade import type { Reader } from "@std/io/types"; /** - * Create a {@linkcode Reader} from a {@linkcode ReadableStreamDefaultReader}. + * Create a {@linkcode https://jsr.io/@std/io/doc/types/~/Reader | Reader} from a {@linkcode ReadableStreamDefaultReader}. * - * @example + * @example Copy the response body of a fetch request to a file * ```ts * import { copy } from "@std/io/copy"; * import { readerFromStreamReader } from "@std/streams/reader-from-stream-reader"; From 06543168718ef5d0b95acfba9b43329f1f857bd0 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 13:34:22 +0900 Subject: [PATCH 13/34] improve streams/text_delimiter_stream.ts --- streams/text_delimiter_stream.ts | 31 +++++++++++++++++++++------ streams/text_delimiter_stream_test.ts | 18 ++++++++++++++++ 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/streams/text_delimiter_stream.ts b/streams/text_delimiter_stream.ts index 7f0a3d58e06a..6df1c24d00d1 100644 --- a/streams/text_delimiter_stream.ts +++ b/streams/text_delimiter_stream.ts @@ -9,15 +9,32 @@ import type { } from "./delimiter_stream.ts"; /** - * Transform a stream into a stream where each chunk is divided by a given delimiter. + * Transform a stream `string` into a stream where each chunk is divided by a + * given delimiter. * - * @example + * @example JSON Lines * ```ts - * import { TextDelimiterStream } from "@std/streams/text-delimiter-stream"; - * const res = await fetch("https://example.com"); - * const parts = res.body! - * .pipeThrough(new TextDecoderStream()) - * .pipeThrough(new TextDelimiterStream("foo")); + * const stream = ReadableStream.from([ + * '{"name": "Alice", "age": ', + * '30}\n{"name": "Bob", "age"', + * ": 25}\n", + * ]); + * + * // Split the stream by newline and parse each line as a JSON object + * const jsonStream = stream.pipeThrough(new TextDelimiterStream("\n")) + * .pipeThrough(toTransformStream(async function* (src) { + * for await (const chunk of src) { + * if (chunk.trim().length === 0) { + * continue; + * } + * yield JSON.parse(chunk); + * } + * })); + * + * assertEquals( + * await Array.fromAsync(jsonStream), + * [{ "name": "Alice", "age": 30 }, { "name": "Bob", "age": 25 }], + * ); * ``` */ export class TextDelimiterStream extends TransformStream { diff --git a/streams/text_delimiter_stream_test.ts b/streams/text_delimiter_stream_test.ts index 90e912b8caef..3951de38adae 100644 --- a/streams/text_delimiter_stream_test.ts +++ b/streams/text_delimiter_stream_test.ts @@ -77,3 +77,21 @@ Deno.test("TextDelimiterStream handles prefix", async () => { await testTransformStream(delimStream, inputs, outputs); }); + +Deno.test("TextDelimiterStream handles JSONL with an empty line in the middle and trailing newline", async () => { + const delimStream = new TextDelimiterStream("\n"); + + const inputs = [ + '{"a": 1}\n', + '\n{"a', + '": 2, "b": true}\n', + ]; + const outputs = [ + '{"a": 1}', + "", + '{"a": 2, "b": true}', + "", + ]; + + await testTransformStream(delimStream, inputs, outputs); +}); From dc3110a61e061194ae6a9cc58825e066c2b870a0 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 13:45:30 +0900 Subject: [PATCH 14/34] improve streams/text_line_stream.ts --- streams/text_line_stream.ts | 47 +++++++++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/streams/text_line_stream.ts b/streams/text_line_stream.ts index f20add6ad2a5..f6684e2319cf 100644 --- a/streams/text_line_stream.ts +++ b/streams/text_line_stream.ts @@ -15,14 +15,51 @@ export interface TextLineStreamOptions { * Transform a stream into a stream where each chunk is divided by a newline, * be it `\n` or `\r\n`. `\r` can be enabled via the `allowCR` option. * - * @example + * @example JSON Lines * ```ts * import { TextLineStream } from "@std/streams/text-line-stream"; + * import { toTransformStream } from "@std/streams/to-transform-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; * - * const res = await fetch("https://example.com"); - * const lines = res.body! - * .pipeThrough(new TextDecoderStream()) - * .pipeThrough(new TextLineStream()); + * const stream = ReadableStream.from([ + * '{"name": "Alice", "age": ', + * '30}\n{"name": "Bob", "age"', + * ": 25}\n", + * ]); + * + * // Split the stream by newline and parse each line as a JSON object + * const jsonStream = stream.pipeThrough(new TextLineStream()) + * .pipeThrough(toTransformStream(async function* (src) { + * for await (const chunk of src) { + * if (chunk.trim().length === 0) { + * continue; + * } + * yield JSON.parse(chunk); + * } + * })); + * + * assertEquals( + * await Array.fromAsync(jsonStream), + * [{ "name": "Alice", "age": 30 }, { "name": "Bob", "age": 25 }], + * ); + * ``` + * + * @example allowCR: true + * ```ts + * import { TextLineStream } from "@std/streams/text-line-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream = ReadableStream.from([ + * "CR\rLF", + * "\nCRLF\r\ndone", + * ]); + * + * const lineStream = stream.pipeThrough(new TextLineStream({ allowCR: true })); + * + * assertEquals( + * await Array.fromAsync(lineStream), + * ["CR", "LF", "CRLF", "done"], + * ); * ``` */ export class TextLineStream extends TransformStream { From c1573d38a30131c73c86c67bd3129ef878b49064 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 14:02:32 +0900 Subject: [PATCH 15/34] improve streams/text_delimiter_stream.ts more --- streams/text_delimiter_stream.ts | 47 +++++++++++++++++++++----------- streams/text_line_stream.ts | 2 ++ 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/streams/text_delimiter_stream.ts b/streams/text_delimiter_stream.ts index 6df1c24d00d1..5618f5bd2484 100644 --- a/streams/text_delimiter_stream.ts +++ b/streams/text_delimiter_stream.ts @@ -12,28 +12,43 @@ import type { * Transform a stream `string` into a stream where each chunk is divided by a * given delimiter. * - * @example JSON Lines + * If you want to split by a newline, consider using {@linkcode TextLineStream}. + * + * @example Comma-separated values * ```ts + * import { TextDelimiterStream } from "@std/streams/text-delimiter-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; + * * const stream = ReadableStream.from([ - * '{"name": "Alice", "age": ', - * '30}\n{"name": "Bob", "age"', - * ": 25}\n", + * "alice,20,", + * ",US,", * ]); * - * // Split the stream by newline and parse each line as a JSON object - * const jsonStream = stream.pipeThrough(new TextDelimiterStream("\n")) - * .pipeThrough(toTransformStream(async function* (src) { - * for await (const chunk of src) { - * if (chunk.trim().length === 0) { - * continue; - * } - * yield JSON.parse(chunk); - * } - * })); + * const valueStream = stream.pipeThrough(new TextDelimiterStream(",")); + * + * assertEquals( + * await Array.fromAsync(valueStream), + * ["alice", "20", "", "US", ""], + * ); + * ``` + * + * @example Semicolon-separated values with suffix disposition + * ```ts + * import { TextDelimiterStream } from "@std/streams/text-delimiter-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream = ReadableStream.from([ + * "const a = 42;;let b =", + * " true;", + * ]); + * + * const valueStream = stream.pipeThrough( + * new TextDelimiterStream(";", { disposition: "suffix" }), + * ); * * assertEquals( - * await Array.fromAsync(jsonStream), - * [{ "name": "Alice", "age": 30 }, { "name": "Bob", "age": 25 }], + * await Array.fromAsync(valueStream), + * ["const a = 42;", ";", "let b = true;", ""], * ); * ``` */ diff --git a/streams/text_line_stream.ts b/streams/text_line_stream.ts index f6684e2319cf..9c898cc7e995 100644 --- a/streams/text_line_stream.ts +++ b/streams/text_line_stream.ts @@ -15,6 +15,8 @@ export interface TextLineStreamOptions { * Transform a stream into a stream where each chunk is divided by a newline, * be it `\n` or `\r\n`. `\r` can be enabled via the `allowCR` option. * + * If you want to split by a custom delimiter, consider using {@linkcode TextDelimiterStream}. + * * @example JSON Lines * ```ts * import { TextLineStream } from "@std/streams/text-line-stream"; From 2cb1dafea42afdea2372c0eb610e3c4e896afb01 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 14:07:46 +0900 Subject: [PATCH 16/34] improve streams/to_array_buffer.ts --- streams/to_array_buffer.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/streams/to_array_buffer.ts b/streams/to_array_buffer.ts index f60aae3a8882..b176f35a4b41 100644 --- a/streams/to_array_buffer.ts +++ b/streams/to_array_buffer.ts @@ -5,17 +5,19 @@ import { concat } from "@std/bytes/concat"; /** * Converts a {@linkcode ReadableStream} of {@linkcode Uint8Array}s to an - * {@linkcode ArrayBuffer}. Works the same as{@linkcode Response.arrayBuffer}. + * {@linkcode ArrayBuffer}. Works the same as {@linkcode Response.arrayBuffer}. * - * @example + * @example Basic usage * ```ts * import { toArrayBuffer } from "@std/streams/to-array-buffer"; + * import { assertEquals } from "@std/assert/assert-equals"; * * const stream = ReadableStream.from([ * new Uint8Array([1, 2]), - * new Uint8Array([3, 4]), + * new Uint8Array([3, 4, 5]), * ]); - * await toArrayBuffer(stream); // ArrayBuffer { [Uint8Contents]: <01 02 03 04>, byteLength: 4 } + * const buf = await toArrayBuffer(stream); + * assertEquals(buf.byteLength, 5); * ``` */ export async function toArrayBuffer( From 2fe6c516e79c301ba8cc7f5b74a168bde28b8fe6 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 14:10:50 +0900 Subject: [PATCH 17/34] improve streams/to_blob.ts --- streams/to_blob.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/streams/to_blob.ts b/streams/to_blob.ts index a9f38da0f0a4..342f27965013 100644 --- a/streams/to_blob.ts +++ b/streams/to_blob.ts @@ -5,12 +5,17 @@ * Converts a {@linkcode ReadableStream} of {@linkcode Uint8Array}s to a * {@linkcode Blob}. Works the same as {@linkcode Response.blob}. * - * @example + * @example Basic usage * ```ts * import { toBlob } from "@std/streams/to-blob"; + * import { assertEquals } from "@std/assert/assert-equals"; * - * const stream = ReadableStream.from([new Uint8Array(1), new Uint8Array(2)]); - * await toBlob(stream); // Blob { size: 3, type: "" } + * const stream = ReadableStream.from([ + * new Uint8Array([1, 2]), + * new Uint8Array([3, 4, 5]), + * ]); + * const blob = await toBlob(stream); + * assertEquals(blob.size, 5); * ``` */ export async function toBlob( From 14f9ba09334279fc0f7adca4bcf8179cf2af3544 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 14:17:22 +0900 Subject: [PATCH 18/34] improve streams/to_json.ts --- streams/to_json.ts | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/streams/to_json.ts b/streams/to_json.ts index 2365e1fb9699..798f2dbdd3ed 100644 --- a/streams/to_json.ts +++ b/streams/to_json.ts @@ -8,12 +8,18 @@ import { toText } from "./to_text.ts"; * {@linkcode Uint8Array}s to an object. Works the same as * {@linkcode Response.json}. * - * @example + * @example Basic usage * ```ts * import { toJson } from "@std/streams/to-json"; + * import { assertEquals } from "@std/assert/assert-equals"; * - * const stream = ReadableStream.from([JSON.stringify({ hello: "world" })]); - * await toJson(stream); // { hello: "world" } + * const stream = ReadableStream.from([ + * "[1, true", + * ', [], {}, "hel', + * 'lo", null]', + * ]); + * const json = await toJson(stream); + * assertEquals(json, [1, true, [], {}, "hello", null]); * ``` */ export function toJson( From 72da712b6adf3dce1ac5e424be7aa8b036a1ca34 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 14:18:19 +0900 Subject: [PATCH 19/34] improve streams/to_json.ts --- streams/to_text.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/streams/to_text.ts b/streams/to_text.ts index 414690123580..039a3970214f 100644 --- a/streams/to_text.ts +++ b/streams/to_text.ts @@ -7,12 +7,13 @@ const textDecoder = new TextDecoder(); * Converts a {@linkcode ReadableSteam} of strings or {@linkcode Uint8Array}s * to a single string. Works the same as {@linkcode Response.text}. * - * @example + * @example Basic usage * ```ts * import { toText } from "@std/streams/to-text"; + * import { assertEquals } from "@std/assert/assert-equals"; * * const stream = ReadableStream.from(["Hello, ", "world!"]); - * await toText(stream); // "Hello, world!" + * assertEquals(await toText(stream), "Hello, world!"); * ``` */ export async function toText( From 1c6d378d02dadc5dc7c59eaf6368a9779ac5812d Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 14:26:23 +0900 Subject: [PATCH 20/34] improve streams/text_line_stream.ts more --- streams/text_line_stream.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/streams/text_line_stream.ts b/streams/text_line_stream.ts index 9c898cc7e995..2ebd1f0d5c0d 100644 --- a/streams/text_line_stream.ts +++ b/streams/text_line_stream.ts @@ -29,14 +29,16 @@ export interface TextLineStreamOptions { * ": 25}\n", * ]); * + * type Person = { name: string; age: number }; + * * // Split the stream by newline and parse each line as a JSON object * const jsonStream = stream.pipeThrough(new TextLineStream()) - * .pipeThrough(toTransformStream(async function* (src) { + * .pipeThrough(toTransformStream(async function* (src) { * for await (const chunk of src) { * if (chunk.trim().length === 0) { * continue; * } - * yield JSON.parse(chunk); + * yield JSON.parse(chunk) as Person; * } * })); * From 3acbf105306a0486d196f3c75e0ce6f0b93fc257 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 14:31:56 +0900 Subject: [PATCH 21/34] improve streams/to_transform_stream.ts --- streams/to_transform_stream.ts | 44 +++++++++++++++++++++++++++++----- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/streams/to_transform_stream.ts b/streams/to_transform_stream.ts index 4c1c4ee35830..5e9ba54cc217 100644 --- a/streams/to_transform_stream.ts +++ b/streams/to_transform_stream.ts @@ -4,21 +4,53 @@ /** * Convert the generator function into a {@linkcode TransformStream}. * - * @example + * @example Build a transform stream that multiplies each value by 100 * ```ts * import { toTransformStream } from "@std/streams/to-transform-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; * - * const readable = ReadableStream.from([0, 1, 2]) + * const stream = ReadableStream.from([0, 1, 2]) * .pipeThrough(toTransformStream(async function* (src) { * for await (const chunk of src) { * yield chunk * 100; * } * })); * - * for await (const chunk of readable) { - * console.log(chunk); - * } - * // output: 0, 100, 200 + * assertEquals( + * await Array.fromAsync(stream), + * [0, 100, 200], + * ); + * ``` + * + * @example JSON Lines + * ```ts + * import { TextLineStream } from "@std/streams/text-line-stream"; + * import { toTransformStream } from "@std/streams/to-transform-stream"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream = ReadableStream.from([ + * '{"name": "Alice", "age": ', + * '30}\n{"name": "Bob", "age"', + * ": 25}\n", + * ]); + * + * type Person = { name: string; age: number }; + * + * // Split the stream by newline and parse each line as a JSON object + * const jsonStream = stream.pipeThrough(new TextLineStream()) + * .pipeThrough(toTransformStream(async function* (src) { + * for await (const chunk of src) { + * if (chunk.trim().length === 0) { + * continue; + * } + * yield JSON.parse(chunk) as Person; + * } + * })); + * + * assertEquals( + * await Array.fromAsync(jsonStream), + * [{ "name": "Alice", "age": 30 }, { "name": "Bob", "age": 25 }], + * ); * ``` * * @param transformer A function to transform. From 797265ca1914768bf981ef50dfdf85753b343086 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 14:43:12 +0900 Subject: [PATCH 22/34] improve streams/writable_stream_from_writer.ts --- streams/writable_stream_from_writer.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/streams/writable_stream_from_writer.ts b/streams/writable_stream_from_writer.ts index 7965cafab313..770ab512d87b 100644 --- a/streams/writable_stream_from_writer.ts +++ b/streams/writable_stream_from_writer.ts @@ -7,7 +7,7 @@ import { toWritableStream } from "@std/io/to-writable-stream"; /** * Options for {@linkcode writableStreamFromWriter}. * - * @deprecated This will be removed in 1.0.0. Use {@linkcode toWritableStream} instead. + * @deprecated This will be removed in 1.0.0. Use {@linkcode https://jsr.io/@std/io/doc/~/toWritableStream | toWritableStream} instead. */ export interface WritableStreamFromWriterOptions { /** @@ -20,9 +20,9 @@ export interface WritableStreamFromWriterOptions { } /** - * Create a {@linkcode WritableStream} from a {@linkcode Writer}. + * Create a {@linkcode WritableStream} from a {@linkcode https://jsr.io/@std/io/doc/types/~/Writer | Writer}. * - * @deprecated This will be removed in 1.0.0. Use {@linkcode toWritableStream} instead. + * @deprecated This will be removed in 1.0.0. Use {@linkcode https://jsr.io/@std/io/doc/~/toWritableStream | toWritableStream} instead. */ export function writableStreamFromWriter( writer: Writer, From d1ea8557f9fb8b9534165dd6855ce6d795b4ca14 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 14:46:34 +0900 Subject: [PATCH 23/34] improve streams/writer_from_stream_writer.ts --- streams/writer_from_stream_writer.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streams/writer_from_stream_writer.ts b/streams/writer_from_stream_writer.ts index 7b36b47fb81f..9c25403142f2 100644 --- a/streams/writer_from_stream_writer.ts +++ b/streams/writer_from_stream_writer.ts @@ -4,9 +4,9 @@ import type { Writer } from "@std/io/types"; /** - * Create a {@linkcode Writer} from a {@linkcode WritableStreamDefaultWriter}. + * Create a {@linkcode https://jsr.io/@std/io/doc/types/~/Writer | Writer} from a {@linkcode WritableStreamDefaultWriter}. * - * @example + * @example Read from a file and write to stdout using a writable stream * ```ts * import { copy } from "@std/io/copy"; * import { writerFromStreamWriter } from "@std/streams/writer-from-stream-writer"; From 2d93cedafc46f269dd7e4585e25ba4de177d381a Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 15:13:46 +0900 Subject: [PATCH 24/34] improve streams/zip_readable_streams.ts --- streams/early_zip_readable_streams.ts | 4 +- streams/zip_readable_streams.ts | 62 ++++++++++++++++++++++++--- 2 files changed, 60 insertions(+), 6 deletions(-) diff --git a/streams/early_zip_readable_streams.ts b/streams/early_zip_readable_streams.ts index d1c984bd8ad7..a2e5fe906e13 100644 --- a/streams/early_zip_readable_streams.ts +++ b/streams/early_zip_readable_streams.ts @@ -5,9 +5,11 @@ * Merge multiple streams into a single one, taking order into account, and each * stream will wait for a chunk to enqueue before the next stream can append * another chunk. + * * If a stream ends before other ones, the others will be cancelled after the * last chunk of said straem is read. See the examples below for more - * compresensible information. + * compresensible information. If you want to continue reading the other streams + * even after one of them ends, use {@linkcode zipReadableStreams}. * * @example Zip 2 streams with the same length * ```ts diff --git a/streams/zip_readable_streams.ts b/streams/zip_readable_streams.ts index 76d547fffa6c..a41be71c4133 100644 --- a/streams/zip_readable_streams.ts +++ b/streams/zip_readable_streams.ts @@ -4,19 +4,71 @@ /** * Merge multiple streams into a single one, taking order into account, and * each stream will wait for a chunk to enqueue before the next stream can - * append another chunk. If a stream ends before other ones, the others will - * continue adding data in order, and the finished one will not add any more - * data. + * append another chunk. * - * @example + * If a stream ends before other ones, the others will continue adding data in + * order, and the finished one will not add any more data. If you want to cancel + * the other streams when one of them ends, use {@linkcode earlyZipReadableStreams}. + * + * @example Zip 2 streams with the same length * ```ts * import { zipReadableStreams } from "@std/streams/zip-readable-streams"; + * import { assertEquals } from "@std/assert/assert-equals"; * * const stream1 = ReadableStream.from(["1", "2", "3"]); * const stream2 = ReadableStream.from(["a", "b", "c"]); + * const zippedStream = earlyZipReadableStreams(stream1, stream2); + * + * assertEquals( + * await Array.fromAsync(zippedStream), + * ["1", "a", "2", "b", "3", "c"], + * ); + * ``` + * + * @example Zip 2 streams with different length (first one is shorter) + * ```ts + * import { zipReadableStreams } from "@std/streams/zip-readable-streams"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream1 = ReadableStream.from(["1", "2"]); + * const stream2 = ReadableStream.from(["a", "b", "c", "d"]); + * const zippedStream = zipReadableStreams(stream1, stream2); + * + * assertEquals( + * await Array.fromAsync(zippedStream), + * ["1", "a", "2", "b", "c", "d"], + * ); + * ``` + * + * @example Zip 2 streams with different length (first one is longer) + * ```ts + * import { zipReadableStreams } from "@std/streams/zip-readable-streams"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream1 = ReadableStream.from(["1", "2", "3", "4"]); + * const stream2 = ReadableStream.from(["a", "b"]); * const zippedStream = zipReadableStreams(stream1, stream2); * - * await Array.fromAsync(zippedStream); // ["1", "a", "2", "b", "3", "c"]; + * assertEquals( + * await Array.fromAsync(zippedStream), + * ["1", "a", "2", "b", "3", "4"], + * ); + * ``` + * + * @example Zip 3 streams + * ```ts + * import { zipReadableStreams } from "@std/streams/zip-readable-streams"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream1 = ReadableStream.from(["1"]); + * const stream2 = ReadableStream.from(["a", "b"]); + * const stream3 = ReadableStream.from(["A", "B", "C"]); + * const zippedStream = zipReadableStreams(stream1, stream2, stream3); + * + * assertEquals( + * await Array.fromAsync(zippedStream), + * ["1", "a", "A", "b", "B", "C"], + * ); * ``` */ export function zipReadableStreams( From 2be8e5de4883068b63b24fdfdb472981d68b12b9 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 15:34:25 +0900 Subject: [PATCH 25/34] define interfaces for optional params --- streams/buffer.ts | 14 +++++++++++++- streams/delimiter_stream.ts | 10 +++++++--- streams/limited_bytes_transform_stream.ts | 16 +++++++++++++++- streams/limited_transform_stream.ts | 16 +++++++++++++++- streams/text_delimiter_stream.ts | 7 +++++-- 5 files changed, 55 insertions(+), 8 deletions(-) diff --git a/streams/buffer.ts b/streams/buffer.ts index 28d2b9124a80..928a1a31ec46 100644 --- a/streams/buffer.ts +++ b/streams/buffer.ts @@ -7,6 +7,18 @@ import { copy } from "@std/bytes/copy"; const MAX_SIZE = 2 ** 32 - 2; const DEFAULT_CHUNK_SIZE = 16_640; +/** Options for {@linkcode Buffer.bytes}. */ +export interface BufferBytesOptions { + /** + * If true, {@linkcode Buffer.bytes} will return a copy of the buffered data. + * + * If false, it will return a slice to the buffer's data. + * + * @default {true} + */ + copy?: boolean; +} + /** * A variable-sized buffer of bytes with `readable` and `writable` getters that * allows you to work with {@link https://developer.mozilla.org/en-US/docs/Web/API/Streams_API | Web Streams API}. @@ -185,7 +197,7 @@ export class Buffer { * assertEquals(slice[2], array[2]); * ``` */ - bytes(options = { copy: true }): Uint8Array { + bytes(options: BufferBytesOptions = { copy: true }): Uint8Array { if (options.copy === false) return this.#buf.subarray(this.#off); return this.#buf.slice(this.#off); } diff --git a/streams/delimiter_stream.ts b/streams/delimiter_stream.ts index 82b6f30a98c1..20a5787a7241 100644 --- a/streams/delimiter_stream.ts +++ b/streams/delimiter_stream.ts @@ -16,7 +16,11 @@ export type DelimiterDisposition = /** Options for {@linkcode DelimiterStream}. */ export interface DelimiterStreamOptions { - /** Disposition of the delimiter. */ + /** + * Disposition of the delimiter. + * + * @default {"discard"} + */ disposition?: DelimiterDisposition; } @@ -66,7 +70,7 @@ export class DelimiterStream extends TransformStream { /** Constructs a new instance. */ constructor( delimiter: Uint8Array, - options?: DelimiterStreamOptions, + options: DelimiterStreamOptions = { disposition: "discard" }, ) { super({ transform: (chunk, controller) => @@ -78,7 +82,7 @@ export class DelimiterStream extends TransformStream { this.#delimiter = delimiter; this.#delimLPS = delimiter.length > 1 ? createLPS(delimiter) : null; - this.#disp = options?.disposition ?? "discard"; + this.#disp = options.disposition ?? "discard"; } #handle( diff --git a/streams/limited_bytes_transform_stream.ts b/streams/limited_bytes_transform_stream.ts index d10c8fe0c689..9fb3735d924f 100644 --- a/streams/limited_bytes_transform_stream.ts +++ b/streams/limited_bytes_transform_stream.ts @@ -1,6 +1,17 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. // This module is browser compatible. +/** Options for {@linkcode LimitedBytesTransformStream}. */ +export interface LimitedBytesTransformStreamOptions { + /** + * If true, a {@linkcode RangeError} is thrown when queueing the current chunk + * would exceed the specified size limit. + * + * @default {false} + */ + error?: boolean; +} + /** * A {@linkcode TransformStream} that will only read & enqueue chunks until the * total amount of enqueued data exceeds `size`. The last chunk that would @@ -82,7 +93,10 @@ export class LimitedBytesTransformStream #read = 0; /** Constructs a new instance. */ - constructor(size: number, options: { error?: boolean } = {}) { + constructor( + size: number, + options: LimitedBytesTransformStreamOptions = { error: false }, + ) { super({ transform: (chunk, controller) => { if ((this.#read + chunk.byteLength) > size) { diff --git a/streams/limited_transform_stream.ts b/streams/limited_transform_stream.ts index 407c253a9bfc..2086290472a5 100644 --- a/streams/limited_transform_stream.ts +++ b/streams/limited_transform_stream.ts @@ -1,6 +1,17 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. // This module is browser compatible. +/** Options for {@linkcode LimitedTransformStream} */ +export interface LimitedTransformStreamOptions { + /** + * If true, a {@linkcode RangeError} is thrown when the total number of + * enqueued chunks is about to exceed the specified limit. + * + * @default {false} + */ + error?: boolean; +} + /** * A {@linkcode TransformStream} that will only read & enqueue `size` amount of * chunks. @@ -62,7 +73,10 @@ export class LimitedTransformStream extends TransformStream { #read = 0; /** Constructs a new instance. */ - constructor(size: number, options: { error?: boolean } = {}) { + constructor( + size: number, + options: LimitedTransformStreamOptions = { error: false }, + ) { super({ transform: (chunk, controller) => { if ((this.#read + 1) > size) { diff --git a/streams/text_delimiter_stream.ts b/streams/text_delimiter_stream.ts index 5618f5bd2484..4171cba9a63e 100644 --- a/streams/text_delimiter_stream.ts +++ b/streams/text_delimiter_stream.ts @@ -61,7 +61,10 @@ export class TextDelimiterStream extends TransformStream { #disp: DelimiterDisposition; /** Constructs a new instance. */ - constructor(delimiter: string, options?: DelimiterStreamOptions) { + constructor( + delimiter: string, + options: DelimiterStreamOptions = { disposition: "discard" }, + ) { super({ transform: (chunk, controller) => { this.#handle(chunk, controller); @@ -73,7 +76,7 @@ export class TextDelimiterStream extends TransformStream { this.#delimiter = delimiter; this.#delimLPS = createLPS(new TextEncoder().encode(delimiter)); - this.#disp = options?.disposition ?? "discard"; + this.#disp = options.disposition ?? "discard"; } #handle( From 5cac4e98559f9cbe409a8e40b4912d05af6d9cea Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 15:55:54 +0900 Subject: [PATCH 26/34] address lint warnings in buffer.ts --- streams/buffer.ts | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/streams/buffer.ts b/streams/buffer.ts index 928a1a31ec46..28e189138d29 100644 --- a/streams/buffer.ts +++ b/streams/buffer.ts @@ -84,6 +84,8 @@ export class Buffer { /** * Getter returning the instance's {@linkcode ReadableStream}. * + * @returns The readable stream of the buffer. + * * @example Read the content out of the buffer to stdout * ```ts * import { Buffer } from "@std/streams/buffer"; @@ -106,6 +108,8 @@ export class Buffer { /** * Getter returning the instance's {@linkcode WritableStream}. * + * @returns The writable stream of the buffer. + * * @example Write the data from stdin to the buffer * ```ts * import { Buffer } from "@std/streams/buffer"; @@ -121,6 +125,8 @@ export class Buffer { /** * Constructs a new instance. * + * @param ab An optional buffer to use as the initial buffer. + * * @example No initial buffer provided * ```ts * import { Buffer } from "@std/streams/buffer"; @@ -160,6 +166,9 @@ export class Buffer { * affect the result of future reads. If `options` is not provided, * `options.copy` defaults to `true`. * + * @param options Options for the bytes method. + * @returns A copy or a slice of the buffer. + * * @example Copy the buffer * ```ts * import { assertEquals } from "@std/assert/assert-equals"; @@ -205,6 +214,8 @@ export class Buffer { /** * Returns whether the unread portion of the buffer is empty. * + * @returns Whether the buffer is empty. + * * @example Empty buffer * ```ts * import { assert } from "@std/assert/assert"; @@ -245,6 +256,8 @@ export class Buffer { /** * A read only number of bytes of the unread portion of the buffer. * + * @returns The number of bytes in the unread portion of the buffer. + * * @example Basic usage * ```ts * import { assertEquals } from "@std/assert/assert-equals"; @@ -277,6 +290,8 @@ export class Buffer { * The read only capacity of the buffer's underlying byte slice, that is, * the total space allocated for the buffer's data. * + * @returns The number of allocated bytes for the buffer. + * * @example Basic usage * ```ts * import { assertEquals } from "@std/assert/assert-equals"; @@ -296,6 +311,8 @@ export class Buffer { * continues to use the same allocated storage. It throws if `n` is * negative or greater than the length of the buffer. * + * @param n The number of bytes to keep. + * * @example Basic usage * ```ts * import { assertEquals } from "@std/assert/assert-equals"; @@ -395,6 +412,8 @@ export class Buffer { * the buffer without another allocation. If `n` is negative, `.grow()` will * throw. If the buffer can't grow it will throw an error. * + * @param n The number of bytes to grow the buffer by. + * * Based on Go Lang's * {@link https://golang.org/pkg/bytes/#Buffer.Grow | Buffer.Grow}. * From abb7dd5c80864b842e060f3c3e5ec235575289cf Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 18:40:47 +0900 Subject: [PATCH 27/34] address lint warnings --- streams/buffer.ts | 28 ++++++++++++++--------- streams/byte_slice_stream.ts | 21 ++++++++++++++++- streams/concat_readable_streams.ts | 6 ++--- streams/delimiter_stream.ts | 27 ++++++++++++++++++++-- streams/early_zip_readable_streams.ts | 3 +++ streams/iterate_reader.ts | 26 ++++++++++++--------- streams/limited_bytes_transform_stream.ts | 23 +++++++++++++++++-- streams/limited_transform_stream.ts | 23 ++++++++++++++++++- streams/merge_readable_streams.ts | 4 ++++ streams/readable_stream_from_reader.ts | 6 ++++- streams/reader_from_iterable.ts | 15 ++++++++---- streams/reader_from_stream_reader.ts | 9 +++++--- streams/text_delimiter_stream.ts | 25 +++++++++++++++++++- streams/text_line_stream.ts | 20 +++++++++++++++- streams/to_array_buffer.ts | 3 +++ streams/to_blob.ts | 3 +++ streams/to_json.ts | 3 +++ streams/to_text.ts | 3 +++ streams/to_transform_stream.ts | 11 +++++---- streams/writable_stream_from_writer.ts | 15 ++++++++++++ streams/writer_from_stream_writer.ts | 7 +++++- streams/zip_readable_streams.ts | 5 +++- 22 files changed, 238 insertions(+), 48 deletions(-) diff --git a/streams/buffer.ts b/streams/buffer.ts index 28e189138d29..75d410ec9433 100644 --- a/streams/buffer.ts +++ b/streams/buffer.ts @@ -36,27 +36,33 @@ export interface BufferBytesOptions { * * Based on {@link https://golang.org/pkg/bytes/#Buffer | Go Buffer}. * - * @example Copy a file to another file via a buffer + * @example Buffer input bytes and convert it to a string * ```ts - * // File copy can be done with various ways. This example aims to demonstrate - * // how to use Buffer with other ReadableStream and WritableStream. - * + * import { Buffer } from "@std/streams/buffer"; + * import { toText } from "@std/streams/to-text"; * import { assert } from "@std/assert/assert"; * import { assertEquals } from "@std/assert/assert-equals"; - * import { Buffer } from "@std/streams/buffer"; * + * // Create a new buffer * const buf = new Buffer(); * assertEquals(buf.capacity, 0); * assertEquals(buf.length, 0); * - * using input = await Deno.open("input.txt"); - * using output = await Deno.open("output.txt", { write: true, create: true }); + * // Dummy input stream + * const inputStream = ReadableStream.from([ + * "hello, ", + * "world", + * "!", + * ]); * - * await input.readable.pipeTo(buf.writable); + * // Pipe the input stream to the buffer + * await inputStream.pipeThrough(new TextEncoderStream()).pipeTo(buf.writable); * assert(buf.capacity > 0); * assert(buf.length > 0); * - * await buf.readable.pipeTo(output.writable); + * // Convert the buffered bytes to a string + * const result = await toText(buf.readable); + * assertEquals(result, "hello, world!"); * assert(buf.empty()); * ``` */ @@ -84,7 +90,7 @@ export class Buffer { /** * Getter returning the instance's {@linkcode ReadableStream}. * - * @returns The readable stream of the buffer. + * @returns A `ReadableStream` of the buffer. * * @example Read the content out of the buffer to stdout * ```ts @@ -108,7 +114,7 @@ export class Buffer { /** * Getter returning the instance's {@linkcode WritableStream}. * - * @returns The writable stream of the buffer. + * @returns A `WritableStream` of the buffer. * * @example Write the data from stdin to the buffer * ```ts diff --git a/streams/byte_slice_stream.ts b/streams/byte_slice_stream.ts index 5b4f0e3e5a60..12b279c0d320 100644 --- a/streams/byte_slice_stream.ts +++ b/streams/byte_slice_stream.ts @@ -40,7 +40,26 @@ export class ByteSliceStream extends TransformStream { #offsetStart = 0; #offsetEnd = 0; - /** Constructs a new instance. */ + /** + * Constructs a new instance. + * + * @param start The zero-indexed byte index to start reading from. + * @param end The zero-indexed byte index to stop reading at. Inclusive. + * + * @example No parameters + * ```ts + * import { ByteSliceStream } from "@std/streams/byte-slice-stream"; + * + * const byteSliceStream = new ByteSliceStream(); + * ``` + * + * @example start = 4, end = 11 + * ```ts + * import { ByteSliceStream } from "@std/streams/byte-slice-stream"; + * + * const byteSliceStream = new ByteSliceStream(4, 11); + * ``` + */ constructor(start = 0, end: number = Infinity) { super({ start: () => { diff --git a/streams/concat_readable_streams.ts b/streams/concat_readable_streams.ts index a49d4befea3d..25d227c4f3e2 100644 --- a/streams/concat_readable_streams.ts +++ b/streams/concat_readable_streams.ts @@ -6,9 +6,9 @@ * * Cancelling the resulting stream will cancel all the input streams. * - * @typeParam T Type of the chunks in the streams. - * - * @param streams An iterable of `ReadableStream`s. + * @typeParam T The type of the chunks in the streams. + * @param streams An iterable of `ReadableStream`s to concat. + * @returns A `ReadableStream` that will emit the concatenated chunks. * * @example Usage * ```ts diff --git a/streams/delimiter_stream.ts b/streams/delimiter_stream.ts index 20a5787a7241..2451a13481d0 100644 --- a/streams/delimiter_stream.ts +++ b/streams/delimiter_stream.ts @@ -27,6 +27,8 @@ export interface DelimiterStreamOptions { /** * Divide a stream into chunks delimited by a given byte sequence. * + * If you are working with a stream of `string`, consider using {@linkcode TextDelimiterStream}. + * * @example * Divide a CSV stream by commas, discarding the commas: * ```ts @@ -43,7 +45,7 @@ export interface DelimiterStreamOptions { * ``` * * @example - * Divide a stream after semi-colons, keeping the semi-colons in the output: + * Divide a stream after semi-colons, keeping the semicolons in the output: * ```ts * import { DelimiterStream } from "@std/streams/delimiter-stream"; * import { assertEquals } from "@std/assert/assert-equals"; @@ -67,7 +69,28 @@ export class DelimiterStream extends TransformStream { #delimLPS: Uint8Array | null; #disp: DelimiterDisposition; - /** Constructs a new instance. */ + /** + * Constructs a new instance. + * + * @param delimiter A delimiter to split the stream by. + * @param options Options for the delimiter stream. + * + * @example comma as a delimiter + * ```ts + * import { DelimiterStream } from "@std/streams/delimiter-stream"; + * + * const delimiterStream = new DelimiterStream(new TextEncoder().encode(",")); + * ``` + * + * @example semicolon as a delimiter, and disposition set to `"suffix"` + * ```ts + * import { DelimiterStream } from "@std/streams/delimiter-stream"; + * + * const delimiterStream = new DelimiterStream(new TextEncoder().encode(";"), { + * disposition: "suffix", + * }); + * ``` + */ constructor( delimiter: Uint8Array, options: DelimiterStreamOptions = { disposition: "discard" }, diff --git a/streams/early_zip_readable_streams.ts b/streams/early_zip_readable_streams.ts index a2e5fe906e13..48015c27a672 100644 --- a/streams/early_zip_readable_streams.ts +++ b/streams/early_zip_readable_streams.ts @@ -11,6 +11,9 @@ * compresensible information. If you want to continue reading the other streams * even after one of them ends, use {@linkcode zipReadableStreams}. * + * @typeparam T The type of the chunks in the input streams. + * @returns A `ReadableStream` that will emit the zipped chunks + * * @example Zip 2 streams with the same length * ```ts * import { earlyZipReadableStreams } from "@std/streams/early-zip-readable-streams"; diff --git a/streams/iterate_reader.ts b/streams/iterate_reader.ts index 1799071f3042..29d78758aea9 100644 --- a/streams/iterate_reader.ts +++ b/streams/iterate_reader.ts @@ -10,9 +10,13 @@ import type { Reader, ReaderSync } from "@std/io/types"; export type { Reader, ReaderSync }; /** - * Turns a {@linkcode Reader}, `r`, into an async iterator. + * Turns a {@linkcode https://jsr.io/@std/io/doc/types/~/Reader | Reader}, `r`, into an async iterator. * - * @example + * @param r A reader to turn into an async iterator. + * @param options Options for the iterateReader function. + * @returns An async iterator that yields Uint8Array. + * + * @example Convert a `Deno.FsFile` into an async iterator and iterate over it * ```ts * import { iterateReader } from "@std/streams/iterate-reader"; * @@ -22,10 +26,7 @@ export type { Reader, ReaderSync }; * } * ``` * - * Second argument can be used to tune size of a buffer. - * Default size of the buffer is 32kB. - * - * @example + * @example Specify a buffer size of 1MiB * ```ts * import { iterateReader } from "@std/streams/iterate-reader"; * @@ -51,8 +52,13 @@ export function iterateReader( } /** - * Turns a {@linkcode ReaderSync}, `r`, into an iterator. + * Turns a {@linkcode https://jsr.io/@std/io/doc/types/~/ReaderSync | ReaderSync}, `r`, into an iterator. + * + * @param r A reader to turn into an iterator. + * @param options Options for the iterateReaderSync function. + * @returns An iterator that yields Uint8Array. * + * @example Convert a `Deno.FsFile` into an iterator and iterate over it * ```ts * import { iterateReaderSync } from "@std/streams/iterate-reader"; * @@ -62,12 +68,10 @@ export function iterateReader( * } * ``` * - * Second argument can be used to tune size of a buffer. - * Default size of the buffer is 32kB. - * + * @example Specify a buffer size of 1MiB * ```ts * import { iterateReaderSync } from "@std/streams/iterate-reader"; - + * * using f = await Deno.open("/etc/passwd"); * const iter = iterateReaderSync(f, { * bufSize: 1024 * 1024 diff --git a/streams/limited_bytes_transform_stream.ts b/streams/limited_bytes_transform_stream.ts index 9fb3735d924f..a8f6823ec534 100644 --- a/streams/limited_bytes_transform_stream.ts +++ b/streams/limited_bytes_transform_stream.ts @@ -80,7 +80,7 @@ export interface LimitedBytesTransformStreamOptions { * * const stream = ReadableStream.from(["1234", "5678"]); * const transformed = stream.pipeThrough(new TextEncoderStream()).pipeThrough( - * new LimitedBytesTransformStream(5), + * new LimitedBytesTransformStream(5, { error: true }), * ).pipeThrough(new TextDecoderStream()); * * await assertRejects(async () => { @@ -92,7 +92,26 @@ export class LimitedBytesTransformStream extends TransformStream { #read = 0; - /** Constructs a new instance. */ + /** + * Constructs a new instance. + * + * @param size A size limit in bytes. + * @param options Options for the stream. + * + * @example size = 42 + * ```ts + * import { LimitedBytesTransformStream } from "@std/streams/limited-bytes-transform-stream"; + * + * const limitedBytesTransformStream = new LimitedBytesTransformStream(42); + * ``` + * + * @example size = 42, error = true + * ```ts + * import { LimitedBytesTransformStream } from "@std/streams/limited-bytes-transform-stream"; + * + * const limitedBytesTransformStream = new LimitedBytesTransformStream(42, { error: true }); + * ``` + */ constructor( size: number, options: LimitedBytesTransformStreamOptions = { error: false }, diff --git a/streams/limited_transform_stream.ts b/streams/limited_transform_stream.ts index 2086290472a5..3a337c94a8ae 100644 --- a/streams/limited_transform_stream.ts +++ b/streams/limited_transform_stream.ts @@ -20,6 +20,8 @@ export interface LimitedTransformStreamOptions { * a {@linkcode RangeError} will be thrown when the total number of enqueued * chunks is about to exceed the specified size. * + * @typeparam T The type the chunks in the stream. + * * @example `size` is equal to the total number of chunks * ```ts * import { LimitedTransformStream } from "@std/streams/limited-transform-stream"; @@ -72,7 +74,26 @@ export interface LimitedTransformStreamOptions { export class LimitedTransformStream extends TransformStream { #read = 0; - /** Constructs a new instance. */ + /** + * Constructs a new instance. + * + * @param size The maximum number of chunks to read. + * @param options Options for the stream. + * + * @example size = 42 + * ```ts + * import { LimitedTransformStream } from "@std/streams/limited-transform-stream"; + * + * const limitedTransformStream = new LimitedTransformStream(42); + * ``` + * + * @example size = 42, error = true + * ```ts + * import { LimitedTransformStream } from "@std/streams/limited-transform-stream"; + * + * const limitedTransformStream = new LimitedTransformStream(42, { error: true }); + * ``` + */ constructor( size: number, options: LimitedTransformStreamOptions = { error: false }, diff --git a/streams/merge_readable_streams.ts b/streams/merge_readable_streams.ts index a4f298f43eba..b1ed79c52e78 100644 --- a/streams/merge_readable_streams.ts +++ b/streams/merge_readable_streams.ts @@ -5,6 +5,10 @@ * If a stream ends before other ones, the other will continue adding data, * and the finished one will not add any more data. * + * @typeparam T The type of the chunks in the input/output streams. + * @param streams An iterable of `ReadableStream`s to merge. + * @returns A `ReadableStream` that will emit the merged chunks. + * * @example Merge 2 streams * ```ts * import { mergeReadableStreams } from "@std/streams/merge-readable-streams"; diff --git a/streams/readable_stream_from_reader.ts b/streams/readable_stream_from_reader.ts index e0fff9e50c6d..581fe67235ce 100644 --- a/streams/readable_stream_from_reader.ts +++ b/streams/readable_stream_from_reader.ts @@ -34,11 +34,15 @@ export interface ReadableStreamFromReaderOptions { * will be read. When `null` is returned from the reader, the stream will be * closed along with the reader (if it is also a {@linkcode https://jsr.io/@std/io/doc/types/~/Closer | Closer}). * + * @param reader A reader to convert into a `ReadableStream`. + * @param options Options for the `readableStreamFromReader` function. + * @returns A `ReadableStream` of `Uint8Array`s. + * * @example Convert a `Deno.FsFile` into a readable stream: * ```ts * import { readableStreamFromReader } from "@std/streams/readable-stream-from-reader"; * - * using file = await Deno.open("./file.txt", { read: true }); + * using file = await Deno.open("./README.md", { read: true }); * const fileStream = readableStreamFromReader(file); * ``` * diff --git a/streams/reader_from_iterable.ts b/streams/reader_from_iterable.ts index 29a1ca6f8a39..8239b79d4de1 100644 --- a/streams/reader_from_iterable.ts +++ b/streams/reader_from_iterable.ts @@ -8,20 +8,25 @@ import type { Reader } from "@std/io/types"; /** * Create a {@linkcode https://jsr.io/@std/io/doc/types/~/Reader | Reader} from an iterable of {@linkcode Uint8Array}s. * - * @example Periodically write `Deno.build` information to `build.txt` + * @param iterable An iterable or async iterable of `Uint8Array`s to convert into a `Reader`. + * @returns A `Reader` that reads from the iterable. + * + * @example Write `Deno.build` information to `/dev/null` 3 times every second * ```ts * import { readerFromIterable } from "@std/streams/reader-from-iterable"; * import { copy } from "@std/io/copy"; + * import { delay } from "@std/async/delay"; * - * using file = await Deno.open("build.txt", { write: true }); * const reader = readerFromIterable((async function* () { - * while (true) { - * await new Promise((r) => setTimeout(r, 1000)); + * for (let i = 0; i < 3; i++) { + * await delay(1000); * const message = `data: ${JSON.stringify(Deno.build)}\n\n`; * yield new TextEncoder().encode(message); * } * })()); - * await copy(reader, file); + * + * using blackhole = await Deno.open("/dev/null", { write: true }); + * await copy(reader, blackhole); * ``` * * @deprecated This will be removed in 1.0.0. Use {@linkcode ReadableStream.from} instead. diff --git a/streams/reader_from_stream_reader.ts b/streams/reader_from_stream_reader.ts index c58f47aaa0f7..8bad3daa2f1d 100644 --- a/streams/reader_from_stream_reader.ts +++ b/streams/reader_from_stream_reader.ts @@ -7,16 +7,19 @@ import type { Reader } from "@std/io/types"; /** * Create a {@linkcode https://jsr.io/@std/io/doc/types/~/Reader | Reader} from a {@linkcode ReadableStreamDefaultReader}. * - * @example Copy the response body of a fetch request to a file + * @param streamReader A `ReadableStreamDefaultReader` to convert into a `Reader`. + * @returns A `Reader` that reads from the `streamReader`. + * + * @example Copy the response body of a fetch request to `/dev/null` * ```ts * import { copy } from "@std/io/copy"; * import { readerFromStreamReader } from "@std/streams/reader-from-stream-reader"; * * const res = await fetch("https://deno.land"); - * using file = await Deno.open("./deno.land.html", { create: true, write: true }); + * using blackhole = await Deno.open("/dev/null", { write: true }); * * const reader = readerFromStreamReader(res.body!.getReader()); - * await copy(reader, file); + * await copy(reader, blackhole); * ``` * * @deprecated This will be removed in 1.0.0. Import from diff --git a/streams/text_delimiter_stream.ts b/streams/text_delimiter_stream.ts index 4171cba9a63e..b36a7a2d722f 100644 --- a/streams/text_delimiter_stream.ts +++ b/streams/text_delimiter_stream.ts @@ -12,6 +12,8 @@ import type { * Transform a stream `string` into a stream where each chunk is divided by a * given delimiter. * + * If you are working with a stream of `Uint8Array`, consider using {@linkcode DelimiterStream}. + * * If you want to split by a newline, consider using {@linkcode TextLineStream}. * * @example Comma-separated values @@ -60,7 +62,28 @@ export class TextDelimiterStream extends TransformStream { #delimLPS: Uint8Array; #disp: DelimiterDisposition; - /** Constructs a new instance. */ + /** + * Constructs a new instance. + * + * @param delimiter A delimiter to split the stream by. + * @param options Options for the stream. + * + * @example comma as a delimiter + * ```ts + * import { TextDelimiterStream } from "@std/streams/text-delimiter-stream"; + * + * const delimiterStream = new TextDelimiterStream(","); + * ``` + * + * @example semicolon as a delimiter, and disposition set to `"suffix"` + * ```ts + * import { TextDelimiterStream } from "@std/streams/text-delimiter-stream"; + * + * const delimiterStream = new TextDelimiterStream(",", { + * disposition: "suffix", + * }); + * ``` + */ constructor( delimiter: string, options: DelimiterStreamOptions = { disposition: "discard" }, diff --git a/streams/text_line_stream.ts b/streams/text_line_stream.ts index 2ebd1f0d5c0d..f68328d99c43 100644 --- a/streams/text_line_stream.ts +++ b/streams/text_line_stream.ts @@ -69,7 +69,25 @@ export interface TextLineStreamOptions { export class TextLineStream extends TransformStream { #currentLine = ""; - /** Constructs a new instance. */ + /** + * Constructs a new instance. + * + * @param options Options for the stream. + * + * @example No parameters + * ```ts + * import { TextLineStream } from "@std/streams/text-line-stream"; + * + * const textLineStream = new TextLineStream(); + * ``` + * + * @example allowCR = true + * ```ts + * import { TextLineStream } from "@std/streams/text-line-stream"; + * + * const textLineStream = new TextLineStream({ allowCR: true }); + * ``` + */ constructor(options: TextLineStreamOptions = { allowCR: false }) { super({ transform: (chars, controller) => { diff --git a/streams/to_array_buffer.ts b/streams/to_array_buffer.ts index b176f35a4b41..292be918345d 100644 --- a/streams/to_array_buffer.ts +++ b/streams/to_array_buffer.ts @@ -7,6 +7,9 @@ import { concat } from "@std/bytes/concat"; * Converts a {@linkcode ReadableStream} of {@linkcode Uint8Array}s to an * {@linkcode ArrayBuffer}. Works the same as {@linkcode Response.arrayBuffer}. * + * @param readableStream A `ReadableStream` of `Uint8Array`s to convert into an `ArrayBuffer`. + * @returns A promise that resolves with the `ArrayBuffer` containing all the data from the stream. + * * @example Basic usage * ```ts * import { toArrayBuffer } from "@std/streams/to-array-buffer"; diff --git a/streams/to_blob.ts b/streams/to_blob.ts index 342f27965013..a467102d73e3 100644 --- a/streams/to_blob.ts +++ b/streams/to_blob.ts @@ -5,6 +5,9 @@ * Converts a {@linkcode ReadableStream} of {@linkcode Uint8Array}s to a * {@linkcode Blob}. Works the same as {@linkcode Response.blob}. * + * @param stream A `ReadableStream` of `Uint8Array`s to convert into a `Blob`. + * @returns A `Promise` that resolves to the `Blob`. + * * @example Basic usage * ```ts * import { toBlob } from "@std/streams/to-blob"; diff --git a/streams/to_json.ts b/streams/to_json.ts index 798f2dbdd3ed..bebaa5fa5ba5 100644 --- a/streams/to_json.ts +++ b/streams/to_json.ts @@ -8,6 +8,9 @@ import { toText } from "./to_text.ts"; * {@linkcode Uint8Array}s to an object. Works the same as * {@linkcode Response.json}. * + * @param readableStream A `ReadableStream` whose chunks compose a JSON. + * @returns A promise that resolves to the parsed JSON. + * * @example Basic usage * ```ts * import { toJson } from "@std/streams/to-json"; diff --git a/streams/to_text.ts b/streams/to_text.ts index 039a3970214f..c0f1a74033c5 100644 --- a/streams/to_text.ts +++ b/streams/to_text.ts @@ -7,6 +7,9 @@ const textDecoder = new TextDecoder(); * Converts a {@linkcode ReadableSteam} of strings or {@linkcode Uint8Array}s * to a single string. Works the same as {@linkcode Response.text}. * + * @param readableStream A `ReadableStream` to convert into a `string`. + * @returns A `Promise` that resolves to the `string`. + * * @example Basic usage * ```ts * import { toText } from "@std/streams/to-text"; diff --git a/streams/to_transform_stream.ts b/streams/to_transform_stream.ts index 5e9ba54cc217..7cdfea8b4d88 100644 --- a/streams/to_transform_stream.ts +++ b/streams/to_transform_stream.ts @@ -4,6 +4,13 @@ /** * Convert the generator function into a {@linkcode TransformStream}. * + * @typeparam I The type of the chunks in the source stream. + * @typeparam O The type of the chunks in the transformed stream. + * @param transformer A function to transform. + * @param writableStrategy An object that optionally defines a queuing strategy for the stream. + * @param readableStrategy An object that optionally defines a queuing strategy for the stream. + * @returns A {@linkcode TransformStream} that transforms the source stream as defined by the provided transformer. + * * @example Build a transform stream that multiplies each value by 100 * ```ts * import { toTransformStream } from "@std/streams/to-transform-stream"; @@ -52,10 +59,6 @@ * [{ "name": "Alice", "age": 30 }, { "name": "Bob", "age": 25 }], * ); * ``` - * - * @param transformer A function to transform. - * @param writableStrategy An object that optionally defines a queuing strategy for the stream. - * @param readableStrategy An object that optionally defines a queuing strategy for the stream. */ export function toTransformStream( transformer: (src: ReadableStream) => Iterable | AsyncIterable, diff --git a/streams/writable_stream_from_writer.ts b/streams/writable_stream_from_writer.ts index 770ab512d87b..52df58ea38ff 100644 --- a/streams/writable_stream_from_writer.ts +++ b/streams/writable_stream_from_writer.ts @@ -22,6 +22,21 @@ export interface WritableStreamFromWriterOptions { /** * Create a {@linkcode WritableStream} from a {@linkcode https://jsr.io/@std/io/doc/types/~/Writer | Writer}. * + * @param writer A `Writer` to convert into a `WritableStream`. + * @param options Options for the `writableStreamFromWriter` function. + * @returns A `WritableStream` of `Uint8Array`s. + * + * @example Convert `Deno.stdout` into a writable stream + * ```ts + * // Note that you can directly get the writer from `Deno.stdout` by + * // `Deno.stdout.writable`. This example is just for demonstration purposes; + * // definitely not a recommended way. + * + * import { writableStreamFromWriter } from "@std/streams/writable-stream-from-writer"; + * + * const stdoutStream = writableStreamFromWriter(Deno.stdout); + * ``` + * * @deprecated This will be removed in 1.0.0. Use {@linkcode https://jsr.io/@std/io/doc/~/toWritableStream | toWritableStream} instead. */ export function writableStreamFromWriter( diff --git a/streams/writer_from_stream_writer.ts b/streams/writer_from_stream_writer.ts index 9c25403142f2..7e9c875dc316 100644 --- a/streams/writer_from_stream_writer.ts +++ b/streams/writer_from_stream_writer.ts @@ -3,15 +3,20 @@ import type { Writer } from "@std/io/types"; +export type { Writer }; + /** * Create a {@linkcode https://jsr.io/@std/io/doc/types/~/Writer | Writer} from a {@linkcode WritableStreamDefaultWriter}. * + * @param streamWriter A `WritableStreamDefaultWriter` to convert into a `Writer`. + * @returns A `Writer` that writes to the `WritableStreamDefaultWriter`. + * * @example Read from a file and write to stdout using a writable stream * ```ts * import { copy } from "@std/io/copy"; * import { writerFromStreamWriter } from "@std/streams/writer-from-stream-writer"; * - * using file = await Deno.open("./deno.land.html", { read: true }); + * using file = await Deno.open("./README.md", { read: true }); * * const writableStream = new WritableStream({ * write(chunk): void { diff --git a/streams/zip_readable_streams.ts b/streams/zip_readable_streams.ts index a41be71c4133..0fc890aebb38 100644 --- a/streams/zip_readable_streams.ts +++ b/streams/zip_readable_streams.ts @@ -10,6 +10,9 @@ * order, and the finished one will not add any more data. If you want to cancel * the other streams when one of them ends, use {@linkcode earlyZipReadableStreams}. * + * @typeparam T The type of the chunks in the input/output streams. + * @returns A `ReadableStream` that will emit the zipped chunks. + * * @example Zip 2 streams with the same length * ```ts * import { zipReadableStreams } from "@std/streams/zip-readable-streams"; @@ -17,7 +20,7 @@ * * const stream1 = ReadableStream.from(["1", "2", "3"]); * const stream2 = ReadableStream.from(["a", "b", "c"]); - * const zippedStream = earlyZipReadableStreams(stream1, stream2); + * const zippedStream = zipReadableStreams(stream1, stream2); * * assertEquals( * await Array.fromAsync(zippedStream), From 17109561161001b83a947f64e6edb94ce2052cb5 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 18:41:05 +0900 Subject: [PATCH 28/34] add streams/mod.ts to doc checker entrypoints --- _tools/check_docs.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/_tools/check_docs.ts b/_tools/check_docs.ts index 18370635bb05..a62931ed928f 100644 --- a/_tools/check_docs.ts +++ b/_tools/check_docs.ts @@ -33,6 +33,7 @@ const ENTRY_POINTS = [ "../internal/mod.ts", "../jsonc/mod.ts", "../media_types/mod.ts", + "../streams/mod.ts", "../ulid/mod.ts", "../webgpu/mod.ts", "../http/mod.ts", From bbf4667121380b46815c561c017858618c0fd449 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 18:47:30 +0900 Subject: [PATCH 29/34] fix type error --- streams/byte_slice_stream.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/byte_slice_stream.ts b/streams/byte_slice_stream.ts index 12b279c0d320..c3af61beeb25 100644 --- a/streams/byte_slice_stream.ts +++ b/streams/byte_slice_stream.ts @@ -33,7 +33,7 @@ import { assert } from "@std/assert/assert"; * const rangedStream = response.body! * .pipeThrough(new ByteSliceStream(3, 8)); * const collected = await Array.fromAsync(rangedStream); - * assertEquals(collected[0].length, 6); + * assertEquals(collected[0]?.length, 6); * ``` */ export class ByteSliceStream extends TransformStream { From 32b416c6601f144a88c4bfbcfb2cbf168be54c34 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 18:51:59 +0900 Subject: [PATCH 30/34] tweak test data so spell checker will not complain --- streams/delimiter_stream.ts | 2 +- streams/to_json.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/streams/delimiter_stream.ts b/streams/delimiter_stream.ts index 2451a13481d0..1f3c5afa6d08 100644 --- a/streams/delimiter_stream.ts +++ b/streams/delimiter_stream.ts @@ -50,7 +50,7 @@ export interface DelimiterStreamOptions { * import { DelimiterStream } from "@std/streams/delimiter-stream"; * import { assertEquals } from "@std/assert/assert-equals"; * - * const inputStream = ReadableStream.from(["foo;", "bar;ba", "z;"]); + * const inputStream = ReadableStream.from(["foo;", "bar;baz", ";"]); * * const transformed = inputStream.pipeThrough(new TextEncoderStream()) * .pipeThrough( diff --git a/streams/to_json.ts b/streams/to_json.ts index bebaa5fa5ba5..4269b2b1a8fd 100644 --- a/streams/to_json.ts +++ b/streams/to_json.ts @@ -18,8 +18,8 @@ import { toText } from "./to_text.ts"; * * const stream = ReadableStream.from([ * "[1, true", - * ', [], {}, "hel', - * 'lo", null]', + * ', [], {}, "hello', + * '", null]', * ]); * const json = await toJson(stream); * assertEquals(json, [1, true, [], {}, "hello", null]); From 08ef77667e501227086dd24176ba46ead1ee3369 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 19:37:50 +0900 Subject: [PATCH 31/34] fix example code to make it work in windows --- streams/iterate_reader.ts | 4 ++-- streams/reader_from_iterable.ts | 5 +++-- streams/reader_from_stream_reader.ts | 5 +++-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/streams/iterate_reader.ts b/streams/iterate_reader.ts index 29d78758aea9..5ba911279299 100644 --- a/streams/iterate_reader.ts +++ b/streams/iterate_reader.ts @@ -20,7 +20,7 @@ export type { Reader, ReaderSync }; * ```ts * import { iterateReader } from "@std/streams/iterate-reader"; * - * using f = await Deno.open("/etc/passwd"); + * using f = await Deno.open("./README.md"); * for await (const chunk of iterateReader(f)) { * console.log(chunk); * } @@ -30,7 +30,7 @@ export type { Reader, ReaderSync }; * ```ts * import { iterateReader } from "@std/streams/iterate-reader"; * - * using f = await Deno.open("/etc/passwd"); + * using f = await Deno.open("./README.md"); * const it = iterateReader(f, { * bufSize: 1024 * 1024 * }); diff --git a/streams/reader_from_iterable.ts b/streams/reader_from_iterable.ts index 8239b79d4de1..ae498b5bd9d6 100644 --- a/streams/reader_from_iterable.ts +++ b/streams/reader_from_iterable.ts @@ -11,11 +11,12 @@ import type { Reader } from "@std/io/types"; * @param iterable An iterable or async iterable of `Uint8Array`s to convert into a `Reader`. * @returns A `Reader` that reads from the iterable. * - * @example Write `Deno.build` information to `/dev/null` 3 times every second + * @example Write `Deno.build` information to the blackhole 3 times every second * ```ts * import { readerFromIterable } from "@std/streams/reader-from-iterable"; * import { copy } from "@std/io/copy"; * import { delay } from "@std/async/delay"; + * import { devNull } from "node:os"; * * const reader = readerFromIterable((async function* () { * for (let i = 0; i < 3; i++) { @@ -25,7 +26,7 @@ import type { Reader } from "@std/io/types"; * } * })()); * - * using blackhole = await Deno.open("/dev/null", { write: true }); + * using blackhole = await Deno.open(devNull, { write: true }); * await copy(reader, blackhole); * ``` * diff --git a/streams/reader_from_stream_reader.ts b/streams/reader_from_stream_reader.ts index 8bad3daa2f1d..960a831107e1 100644 --- a/streams/reader_from_stream_reader.ts +++ b/streams/reader_from_stream_reader.ts @@ -10,13 +10,14 @@ import type { Reader } from "@std/io/types"; * @param streamReader A `ReadableStreamDefaultReader` to convert into a `Reader`. * @returns A `Reader` that reads from the `streamReader`. * - * @example Copy the response body of a fetch request to `/dev/null` + * @example Copy the response body of a fetch request to the blackhole * ```ts * import { copy } from "@std/io/copy"; * import { readerFromStreamReader } from "@std/streams/reader-from-stream-reader"; + * import { devNull } from "node:os"; * * const res = await fetch("https://deno.land"); - * using blackhole = await Deno.open("/dev/null", { write: true }); + * using blackhole = await Deno.open(devNull, { write: true }); * * const reader = readerFromStreamReader(res.body!.getReader()); * await copy(reader, blackhole); From 9cba9c66995a2cfa4784daff1b9ee1e1b669e5d7 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Fri, 24 May 2024 19:43:26 +0900 Subject: [PATCH 32/34] fix example code to make it work in windows (2) --- streams/iterate_reader.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streams/iterate_reader.ts b/streams/iterate_reader.ts index 5ba911279299..cd51bd617e1c 100644 --- a/streams/iterate_reader.ts +++ b/streams/iterate_reader.ts @@ -62,7 +62,7 @@ export function iterateReader( * ```ts * import { iterateReaderSync } from "@std/streams/iterate-reader"; * - * using f = Deno.openSync("/etc/passwd"); + * using f = Deno.openSync("./README.md"); * for (const chunk of iterateReaderSync(f)) { * console.log(chunk); * } @@ -72,7 +72,7 @@ export function iterateReader( * ```ts * import { iterateReaderSync } from "@std/streams/iterate-reader"; * - * using f = await Deno.open("/etc/passwd"); + * using f = await Deno.open("./README.md"); * const iter = iterateReaderSync(f, { * bufSize: 1024 * 1024 * }); From acde22aee0cdbe2690c9a3549ae93ec987f509dd Mon Sep 17 00:00:00 2001 From: Yoshiya Hinosawa Date: Mon, 27 May 2024 14:20:31 +0900 Subject: [PATCH 33/34] Update streams/early_zip_readable_streams.ts --- streams/early_zip_readable_streams.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/early_zip_readable_streams.ts b/streams/early_zip_readable_streams.ts index 48015c27a672..37cf6eddc49f 100644 --- a/streams/early_zip_readable_streams.ts +++ b/streams/early_zip_readable_streams.ts @@ -7,7 +7,7 @@ * another chunk. * * If a stream ends before other ones, the others will be cancelled after the - * last chunk of said straem is read. See the examples below for more + * last chunk of said stream is read. See the examples below for more * compresensible information. If you want to continue reading the other streams * even after one of them ends, use {@linkcode zipReadableStreams}. * From 67f6732942bf8f9553ebba7165630c4eb68dd3e9 Mon Sep 17 00:00:00 2001 From: Yoshiya Hinosawa Date: Mon, 27 May 2024 16:10:13 +0900 Subject: [PATCH 34/34] Update streams/early_zip_readable_streams.ts --- streams/early_zip_readable_streams.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/early_zip_readable_streams.ts b/streams/early_zip_readable_streams.ts index 37cf6eddc49f..800392dabc6a 100644 --- a/streams/early_zip_readable_streams.ts +++ b/streams/early_zip_readable_streams.ts @@ -8,7 +8,7 @@ * * If a stream ends before other ones, the others will be cancelled after the * last chunk of said stream is read. See the examples below for more - * compresensible information. If you want to continue reading the other streams + * comprehensible information. If you want to continue reading the other streams * even after one of them ends, use {@linkcode zipReadableStreams}. * * @typeparam T The type of the chunks in the input streams.