Skip to content

Commit

Permalink
feat: add support for blind writes (#2065)
Browse files Browse the repository at this point in the history
* sample: blind write

* sample: blind write

* refactor

* add class mutation

* add class mutation

* feat: blind-writes

* refactor

* fix: lint errors

* refactor

* fix: lint errors

* fix: header checks

* refactor the blind write

* feat: add support for blind writes

chore(main): release 7.9.0 (#2053)

:robot: I have created a release *beep* *boop*
---

* **spanner:** Add support for batchWrite ([#2054](https://togithub.com/googleapis/nodejs-spanner/issues/2054)) ([06aab6e](https://togithub.com/googleapis/nodejs-spanner/commit/06aab6e39bbce9e3786f1ac631c80e8909197e92))

* **deps:** Update dependency google-gax to v4.3.4 ([#2051](https://togithub.com/googleapis/nodejs-spanner/issues/2051)) ([80abf06](https://togithub.com/googleapis/nodejs-spanner/commit/80abf06ba8ef9497318ffc597b83fb63e4408f9c))
* **deps:** Update dependency google-gax to v4.3.5 ([#2055](https://togithub.com/googleapis/nodejs-spanner/issues/2055)) ([702c9b0](https://togithub.com/googleapis/nodejs-spanner/commit/702c9b0f34e6cc34233c5aa52b97601b19f70980))
* **deps:** Update dependency google-gax to v4.3.6 ([#2057](https://togithub.com/googleapis/nodejs-spanner/issues/2057)) ([74ebf1e](https://togithub.com/googleapis/nodejs-spanner/commit/74ebf1e45cddf614c180295f3a761a8f84c5cb32))
* **deps:** Update dependency google-gax to v4.3.7 ([#2068](https://togithub.com/googleapis/nodejs-spanner/issues/2068)) ([28fec6c](https://togithub.com/googleapis/nodejs-spanner/commit/28fec6ca505d78d725efc123950be978e0c84ab7))

---
This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).

refactor: blind write method

fix: lint errors

fix: Retry with timeout (#2071)

Use `gaxOptions.timeout` during retry in streaming calls. Earlier the timeout value was only used for a single RPC not for the whole operation including retries. Now if RPC returns `Unavailable` error and the timeout value has been reached, library will throw an Deadline exceeded error.

```
const query = {
        sql: 'Select 1',
        gaxOptions: {timeout: 500}
    }
const [rows] = await database.run(query);
```

chore(main): release 7.9.1 (#2072)

:robot: I have created a release *beep* *boop*
---

* Retry with timeout ([#2071](https://togithub.com/googleapis/nodejs-spanner/issues/2071)) ([a943257](https://togithub.com/googleapis/nodejs-spanner/commit/a943257a0402b26fd80196057a9724fd28fc5c1b))

---
This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).

refactor: blind write method

test: unit test for blind write

test: unit test for blind write

refactor

fix: lint errors

feat: add support for change streams transaction exclusion option for Batch Write (#2070)

* feat: change stream transaction exclusion option for Batch Write

* refactor

docs: add doc to blindWrite method

docs: add doc to the setQueuedMutations

refactor: doc setQueuedMutations

fix: presubmit error

fix(deps): update dependency google-gax to v4.3.8 (#2077)

[![Mend Renovate](https://app.renovatebot.com/images/banner.svg)](https://renovatebot.com)

This PR contains the following updates:

| Package | Change | Age | Adoption | Passing | Confidence |
|---|---|---|---|---|---|
| [google-gax](https://togithub.com/googleapis/gax-nodejs) ([source](https://togithub.com/googleapis/gax-nodejs/tree/HEAD/gax)) | [`4.3.7` -> `4.3.8`](https://renovatebot.com/diffs/npm/google-gax/4.3.7/4.3.8) | [![age](https://developer.mend.io/api/mc/badges/age/npm/google-gax/4.3.8?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![adoption](https://developer.mend.io/api/mc/badges/adoption/npm/google-gax/4.3.8?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![passing](https://developer.mend.io/api/mc/badges/compatibility/npm/google-gax/4.3.7/4.3.8?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![confidence](https://developer.mend.io/api/mc/badges/confidence/npm/google-gax/4.3.7/4.3.8?slim=true)](https://docs.renovatebot.com/merge-confidence/) |

---

<details>
<summary>googleapis/gax-nodejs (google-gax)</summary>

[Compare Source](https://togithub.com/googleapis/gax-nodejs/compare/google-gax-v4.3.7...google-gax-v4.3.8)

-   **deps:** remove rimraf in favor of native node rm function ([#&#8203;1626](https://togithub.com/googleapis/gax-nodejs/issues/1626)) ([dd87646](https://togithub.com/googleapis/gax-nodejs/commit/dd87646618d5026549920e224df7f85cbb5ff6a8))

</details>

---

📅 **Schedule**: Branch creation - "after 9am and before 3pm" (UTC), Automerge - At any time (no schedule defined).

🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied.

♻ **Rebasing**: Whenever PR is behind base branch, or you tick the rebase/retry checkbox.

🔕 **Ignore**: Close this PR and you won't be reminded about this update again.

---

 - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, check this box

---

This PR has been generated by [Mend Renovate](https://www.mend.io/free-developer-tools/renovate/). View repository job log [here](https://developer.mend.io/github/googleapis/nodejs-spanner).
<!--renovate-debug:eyJjcmVhdGVkSW5WZXIiOiIzNy40MjUuMSIsInVwZGF0ZWRJblZlciI6IjM3LjQyNS4xIiwidGFyZ2V0QnJhbmNoIjoibWFpbiIsImxhYmVscyI6W119-->

updated

updated

lint

refactor

* fix: presubmit error

* refactor: docs of the method writeAtLeastOnce

* test: unit test using mockspanner

* fix: lint errors

* docs refactor

* refactor

* refactor

---------

Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com>
Co-authored-by: surbhigarg92 <surbhigarg.92@gmail.com>
  • Loading branch information
3 people authored Jul 29, 2024
1 parent b91e284 commit 62fc0a4
Show file tree
Hide file tree
Showing 5 changed files with 363 additions and 1 deletion.
93 changes: 93 additions & 0 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ import {
import {CreateTableCallback, CreateTableResponse, Table} from './table';
import {
BatchWriteOptions,
CommitCallback,
CommitResponse,
ExecuteSqlRequest,
MutationGroup,
MutationSet,
RunCallback,
RunResponse,
RunUpdateCallback,
Expand Down Expand Up @@ -3346,6 +3349,95 @@ class Database extends common.GrpcServiceObject {
return proxyStream as NodeJS.ReadableStream;
}

/**
* Write mutations using a single RPC invocation without replay protection.
*
* writeAtLeastOnce writes mutations to Spanner using a single Commit RPC.
* These requests are not replay protected, meaning that it may apply mutations more
* than once, if the mutations are not idempotent, this may lead to a failure being
* reported when the mutation was applied once. Replays non-idempotent mutations may
* have undesirable effects. For example, replays of an insert mutation may produce an
* already exists error. For this reason, most users of the library will prefer to use
* {@link runTransaction} instead.
*
* However, {@link writeAtLeastOnce()} requires only a single RPC, whereas {@link runTransaction()}
* requires two RPCs (one of which may be performed in advance), and so this method may be
* appropriate for latency sensitive and/or high throughput blind writing.
*
* We recommend structuring your mutation set to be idempotent to avoid this issue.
*
* @param {MutationSet} [mutations] Set of Mutations to be applied.
* @param {CallOptions} [options] Options object for blind write request.
* @param {CommitCallback} [callback] Callback function for blind write request.
*
* @returns {Promise}
*
* @example
* ```
* const {Spanner} = require('@google-cloud/spanner');
* const spanner = new Spanner();
*
* const instance = spanner.instance('my-instance');
* const database = instance.database('my-database');
* const mutations = new MutationSet();
* mutations.upsert('Singers', {
* SingerId: 1,
* FirstName: 'Scarlet',
* LastName: 'Terry',
* });
* mutations.upsert('Singers', {
* SingerId: 2,
* FirstName: 'Marc',
* LastName: 'Richards',
* });
*
* try {
* const [response, err] = await database.writeAtLeastOnce(mutations, {});
* console.log(response.commitTimestamp);
* } catch(err) {
* console.log("Error: ", err);
* }
* ```
*/
writeAtLeastOnce(mutations: MutationSet): Promise<CommitResponse>;
writeAtLeastOnce(
mutations: MutationSet,
options: CallOptions
): Promise<CommitResponse>;
writeAtLeastOnce(mutations: MutationSet, callback: CommitCallback): void;
writeAtLeastOnce(
mutations: MutationSet,
options: CallOptions,
callback: CommitCallback
): void;
writeAtLeastOnce(
mutations: MutationSet,
optionsOrCallback?: CallOptions | CommitCallback,
callback?: CommitCallback
): void | Promise<CommitResponse> {
const cb =
typeof optionsOrCallback === 'function'
? (optionsOrCallback as CommitCallback)
: callback;
const options =
typeof optionsOrCallback === 'object' && optionsOrCallback
? (optionsOrCallback as CallOptions)
: {};
this.pool_.getSession((err, session?, transaction?) => {
if (err && isSessionNotFoundError(err as grpc.ServiceError)) {
this.writeAtLeastOnce(mutations, options, cb!);
return;
}
if (err) {
cb!(err as grpc.ServiceError);
return;
}
this._releaseOnEnd(session!, transaction!);
transaction?.setQueuedMutations(mutations.proto());
return transaction?.commit(options, cb!);
});
}

/**
* Create a Session object.
*
Expand Down Expand Up @@ -3674,6 +3766,7 @@ callbackifyAll(Database, {
'batchCreateSessions',
'batchTransaction',
'batchWriteAtLeastOnce',
'writeAtLeastOnce',
'close',
'createBatchTransaction',
'createSession',
Expand Down
10 changes: 10 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import {SessionPool} from './session-pool';
import {Table} from './table';
import {
MutationGroup,
MutationSet,
PartitionedDml,
Snapshot,
Transaction,
Expand Down Expand Up @@ -2025,6 +2026,15 @@ export {Transaction};
*/
export {MutationGroup};

/**
* {@link MutationSet} class.
*
* @name Spanner.MutationSet
* @see MutationSet
* @type {Constructor}
*/
export {MutationSet};

/**
* @type {object}
* @property {constructor} DatabaseAdminClient
Expand Down
115 changes: 115 additions & 0 deletions src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1854,6 +1854,17 @@ export class Transaction extends Dml {
return undefined;
}

/**
* This method updates the _queuedMutations property of the transaction.
*
* @public
*
* @param {spannerClient.spanner.v1.Mutation[]} [mutation]
*/
setQueuedMutations(mutation: spannerClient.spanner.v1.Mutation[]): void {
this._queuedMutations = mutation;
}

/**
* @typedef {object} CommitOptions
* @property {google.spanner.v1.IRequestOptions} requestOptions The request options to include
Expand Down Expand Up @@ -2530,6 +2541,110 @@ function buildDeleteMutation(
return mutation as spannerClient.spanner.v1.Mutation;
}

/**
* MutationSet represent a set of changes to be applied atomically to a Cloud Spanner
* database with a {@link Transaction}.
* Mutations are used to insert, update, upsert(insert or update), replace, or
* delete rows within tables.
*
* Mutations are added to a {@link Transaction} and are not executed until the
* transaction is committed via {@link Transaction#commit}.
*
* If the transaction is rolled back or encounters an error, the mutations are
* discarded.
*
* @example
* ```
* const {Spanner, Mutation} = require('@google-cloud/spanner');
* const spanner = new Spanner();
*
* const instance = spanner.instance('my-instance');
* const database = instance.database('my-database');
*
* const mutations = new MutationSet();
* mutations.insert('Singers', {SingerId: '123', FirstName: 'David'});
* mutations.update('Singers', {SingerId: '123', FirstName: 'Marc'});
*
* try {
* database.writeAtLeastOnce(mutations, (err, res) => {
* console.log("RESPONSE: ", res);
* });
* } catch(err) {
* console.log("ERROR: ", err);
* }
* ```
*/
export class MutationSet {
/**
* An array to store the mutations.
*/
private _queuedMutations: spannerClient.spanner.v1.Mutation[];

/**
* Creates a new Mutation object.
*/
constructor() {
this._queuedMutations = [];
}

/**
* Adds an insert operation to the mutation set.
* @param {string} table. The name of the table to insert into.
* @param {object|object[]} rows. A single row object or an array of row objects to insert.
*/
insert(table: string, rows: object | object[]): void {
this._queuedMutations.push(buildMutation('insert', table, rows));
}

/**
* Adds an update operation to the mutation set.
* @param {string} table. The name of the table to update.
* @param {object|object[]} rows. A single row object or an array of row objects to update.
* Each row object must contain the primary key values to indentify the row to update.
*/
update(table: string, rows: object | object[]): void {
this._queuedMutations.push(buildMutation('update', table, rows));
}

/**
* Adds an upsert operation to the mutation set.
* An upsert will insert a new row if it does not exist or update an existing row if it does.
* @param {string} table. The name of the table to upsert.
* @param {object|object[]} rows. A single row object or an array of row objects to upsert.
*/
upsert(table: string, rows: object | object[]): void {
this._queuedMutations.push(buildMutation('insertOrUpdate', table, rows));
}

/**
* Adds a replace operation to the mutation set.
* A replace operation deletes the existing row (if it exists) and inserts the new row.
* @param {string} table. The name of the table to replace.
* @param {object|object[]} rows. A single row object or an array of row objects to replace.
*/
replace(table: string, rows: object | object[]): void {
this._queuedMutations.push(buildMutation('replace', table, rows));
}

/**
* Adds a deleteRows operation to the mutation set.
* This operation deletes rows from the specified table based on their primary keys.
* @param {string} table. The name of the table to deleteRows from.
* @param {key[]} key. An array of key objects, each represeting the primary key of a row to delete.
*/
deleteRows(table: string, keys: Key[]): void {
this._queuedMutations.push(buildDeleteMutation(table, keys));
}

/**
* Returns the internal representation of the queued mutations as a protobuf message.
* @returns {spannerClient.spanner.v1.Mutation[]}. The protobuf message representing the mutations.
*/
proto(): spannerClient.spanner.v1.Mutation[] {
return this._queuedMutations;
}
}

/**
* A group of mutations to be committed together.
* Related mutations should be placed in a group.
Expand Down
Loading

0 comments on commit 62fc0a4

Please sign in to comment.