Skip to content

Commit

Permalink
Merge pull request finos#2763 from finos/flexible-table
Browse files Browse the repository at this point in the history
Allow `bytes` and `string` for JSON/CSV input data with `format` option
  • Loading branch information
texodus authored Sep 23, 2024
2 parents 8b29408 + 918f6f7 commit ea5cab9
Show file tree
Hide file tree
Showing 11 changed files with 317 additions and 49 deletions.
14 changes: 14 additions & 0 deletions rust/perspective-client/docs/client/table.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ is stored and all calculation occurs.
data.
- `name` - The name of the table. This will be generated if it is not
provided.
- `format` - The explicit format of the input data, can be one of
`"json"`, `"columns"`, `"csv"` or `"arrow"`. This overrides
language-specific type dispatch behavior, which allows stringified and
byte array alternative inputs.

<div class="javascript">

Expand All @@ -51,6 +55,16 @@ import * as fs from "node:fs/promises";
const table2 = await client.table(await fs.readFile("superstore.arrow"));
```

Load a CSV from a `UInt8Array` (the default for this type is Arrow) using a
format override:

```javascript
const enc = new TextEncoder();
const table = await client.table(enc.encode("x,y\n1,2\n3,4"), {
format: "csv",
});
```

Create a table with an `index`:

```javascript
Expand Down
4 changes: 3 additions & 1 deletion rust/perspective-client/src/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ pub mod utils;

pub use crate::client::{Client, ClientHandler, Features, SystemInfo};
pub use crate::session::{ProxySession, Session};
pub use crate::table::{Schema, Table, TableInitOptions, UpdateOptions, ValidateExpressionsData};
pub use crate::table::{
Schema, Table, TableInitOptions, TableReadFormat, UpdateOptions, ValidateExpressionsData,
};
pub use crate::table_data::{TableData, UpdateData};
pub use crate::view::{OnUpdateMode, OnUpdateOptions, View, ViewWindow};

Expand Down
33 changes: 33 additions & 0 deletions rust/perspective-client/src/rust/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,34 @@ use crate::view::View;

pub type Schema = HashMap<String, ColumnType>;

#[derive(Clone, Copy, Debug, Serialize, Deserialize, TS)]
pub enum TableReadFormat {
#[serde(rename = "csv")]
Csv,

#[serde(rename = "json")]
JsonString,

#[serde(rename = "columns")]
ColumnsString,

#[serde(rename = "arrow")]
Arrow,
}

impl TableReadFormat {
pub fn parse(value: Option<String>) -> Result<Option<Self>, String> {
Ok(match value.as_deref() {
Some("csv") => Some(TableReadFormat::Csv),
Some("json") => Some(TableReadFormat::JsonString),
Some("columns") => Some(TableReadFormat::ColumnsString),
Some("arrow") => Some(TableReadFormat::Arrow),
None => None,
Some(x) => return Err(format!("Unknown format \"{}\"", x)),
})
}
}

/// Options which impact the behavior of [`Client::table`], as well as
/// subsequent calls to [`Table::update`].
#[derive(Clone, Debug, Default, Serialize, Deserialize, TS)]
Expand All @@ -39,6 +67,10 @@ pub struct TableInitOptions {
#[ts(optional)]
pub name: Option<String>,

#[serde(default)]
#[ts(optional)]
pub format: Option<TableReadFormat>,

/// This [`Table`] should use the column named by the `index` parameter as
/// the `index`, which causes [`Table::update`] and [`Client::table`] input
/// to either insert or update existing rows based on `index` column
Expand Down Expand Up @@ -101,6 +133,7 @@ impl From<TableInitOptions> for TableOptions {
#[derive(Clone, Debug, Default, Deserialize, Serialize, TS)]
pub struct UpdateOptions {
pub port_id: Option<u32>,
pub format: Option<TableReadFormat>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion rust/perspective-js/src/rust/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ impl Client {
value: &JsTableInitData,
options: Option<JsTableInitOptions>,
) -> ApiResult<Table> {
let args = TableData::from_js_value(value)?;
let options = options
.into_serde_ext::<Option<TableInitOptions>>()?
.unwrap_or_default();

let args = TableData::from_js_value(value, options.format)?;
Ok(Table(self.client.table(args, options).await?))
}

Expand Down
74 changes: 60 additions & 14 deletions rust/perspective-js/src/rust/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use extend::ext;
use js_sys::{Array, ArrayBuffer, Function, Object, Reflect, Uint8Array, JSON};
use perspective_client::config::*;
use perspective_client::proto::*;
use perspective_client::{assert_table_api, TableData, UpdateData, UpdateOptions};
use perspective_client::{assert_table_api, TableData, TableReadFormat, UpdateData, UpdateOptions};
use wasm_bindgen::convert::TryFromJsValue;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::spawn_local;
Expand Down Expand Up @@ -45,9 +45,9 @@ impl Vec<(String, ColumnType)> {

#[ext]
pub(crate) impl TableData {
fn from_js_value(value: &JsValue) -> ApiResult<TableData> {
fn from_js_value(value: &JsValue, format: Option<TableReadFormat>) -> ApiResult<TableData> {
let err_fn = || JsValue::from(format!("Failed to construct Table {:?}", value));
if let Some(result) = UpdateData::from_js_value_partial(value)? {
if let Some(result) = UpdateData::from_js_value_partial(value, format)? {
Ok(result.into())
} else if value.is_instance_of::<Object>() && Reflect::has(value, &"__get_model".into())? {
let val = Reflect::get(value, &"__get_model".into())?
Expand Down Expand Up @@ -87,19 +87,53 @@ pub(crate) impl TableData {

#[ext]
pub(crate) impl UpdateData {
fn from_js_value_partial(value: &JsValue) -> ApiResult<Option<UpdateData>> {
fn from_js_value_partial(
value: &JsValue,
format: Option<TableReadFormat>,
) -> ApiResult<Option<UpdateData>> {
let err_fn = || JsValue::from(format!("Failed to construct Table {:?}", value));
if value.is_undefined() {
Err(err_fn().into())
} else if value.is_string() {
Ok(Some(UpdateData::Csv(value.as_string().into_apierror()?)))
match format {
None | Some(TableReadFormat::Csv) => {
Ok(Some(UpdateData::Csv(value.as_string().into_apierror()?)))
},
Some(TableReadFormat::JsonString) => Ok(Some(UpdateData::JsonRows(
value.as_string().into_apierror()?,
))),
Some(TableReadFormat::ColumnsString) => Ok(Some(UpdateData::JsonColumns(
value.as_string().into_apierror()?,
))),
Some(TableReadFormat::Arrow) => Ok(Some(UpdateData::Arrow(
value.as_string().into_apierror()?.into_bytes().into(),
))),
}
} else if value.is_instance_of::<ArrayBuffer>() {
let uint8array = Uint8Array::new(value);
let slice = uint8array.to_vec();
Ok(Some(UpdateData::Arrow(slice.into())))
match format {
Some(TableReadFormat::Csv) => Ok(Some(UpdateData::Csv(String::from_utf8(slice)?))),
Some(TableReadFormat::JsonString) => {
Ok(Some(UpdateData::JsonRows(String::from_utf8(slice)?)))
},
Some(TableReadFormat::ColumnsString) => {
Ok(Some(UpdateData::JsonColumns(String::from_utf8(slice)?)))
},
None | Some(TableReadFormat::Arrow) => Ok(Some(UpdateData::Arrow(slice.into()))),
}
} else if let Some(uint8array) = value.dyn_ref::<Uint8Array>() {
let slice = uint8array.to_vec();
Ok(Some(UpdateData::Arrow(slice.into())))
match format {
Some(TableReadFormat::Csv) => Ok(Some(UpdateData::Csv(String::from_utf8(slice)?))),
Some(TableReadFormat::JsonString) => {
Ok(Some(UpdateData::JsonRows(String::from_utf8(slice)?)))
},
Some(TableReadFormat::ColumnsString) => {
Ok(Some(UpdateData::JsonColumns(String::from_utf8(slice)?)))
},
None | Some(TableReadFormat::Arrow) => Ok(Some(UpdateData::Arrow(slice.into()))),
}
} else if value.is_instance_of::<Array>() {
let rows = JSON::stringify(value)?.as_string().into_apierror()?;
Ok(Some(UpdateData::JsonRows(rows)))
Expand All @@ -108,8 +142,8 @@ pub(crate) impl UpdateData {
}
}

fn from_js_value(value: &JsValue) -> ApiResult<UpdateData> {
match TableData::from_js_value(value)? {
fn from_js_value(value: &JsValue, format: Option<TableReadFormat>) -> ApiResult<UpdateData> {
match TableData::from_js_value(value, format)? {
TableData::Schema(_) => Err(ApiError::new(
"Method cannot be called with `Schema` argument",
)),
Expand Down Expand Up @@ -237,16 +271,28 @@ impl Table {

#[doc = inherit_docs!("table/replace.md")]
#[wasm_bindgen]
pub async fn remove(&self, value: &JsValue) -> ApiResult<()> {
let input = UpdateData::from_js_value(value)?;
pub async fn remove(&self, value: &JsValue, options: Option<JsUpdateOptions>) -> ApiResult<()> {
let options = options
.into_serde_ext::<Option<UpdateOptions>>()?
.unwrap_or_default();

let input = UpdateData::from_js_value(value, options.format)?;
self.0.remove(input).await?;
Ok(())
}

#[doc = inherit_docs!("table/replace.md")]
#[wasm_bindgen]
pub async fn replace(&self, input: &JsValue) -> ApiResult<()> {
let input = UpdateData::from_js_value(input)?;
pub async fn replace(
&self,
input: &JsValue,
options: Option<JsUpdateOptions>,
) -> ApiResult<()> {
let options = options
.into_serde_ext::<Option<UpdateOptions>>()?
.unwrap_or_default();

let input = UpdateData::from_js_value(input, options.format)?;
self.0.replace(input).await?;
Ok(())
}
Expand All @@ -258,11 +304,11 @@ impl Table {
input: &JsTableInitData,
options: Option<JsUpdateOptions>,
) -> ApiResult<()> {
let input = UpdateData::from_js_value(input)?;
let options = options
.into_serde_ext::<Option<UpdateOptions>>()?
.unwrap_or_default();

let input = UpdateData::from_js_value(input, options.format)?;
self.0.update(input, options).await?;
Ok(())
}
Expand Down
4 changes: 3 additions & 1 deletion rust/perspective-js/src/rust/utils/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

use std::fmt::Display;
use std::string::FromUtf8Error;

use perspective_client::ClientError;
use wasm_bindgen::prelude::*;
Expand Down Expand Up @@ -84,7 +85,8 @@ define_api_error!(
futures::channel::oneshot::Canceled,
base64::DecodeError,
chrono::ParseError,
prost::DecodeError
prost::DecodeError,
FromUtf8Error
);

#[wasm_bindgen(inline_js = r#"
Expand Down
66 changes: 66 additions & 0 deletions rust/perspective-js/test/js/constructors/csv.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
// ┃ Copyright (c) 2017, the Perspective Authors. ┃
// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
// ┃ This file is part of the Perspective library, distributed under the terms ┃
// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

import { test, expect } from "@finos/perspective-test";
import perspective from "../perspective_client";

const CSV = "x,y,z\n1,2,3\n4,5,6";

test.describe("CSV Constructors", function () {
test("Handles String format", async function () {
const table = await perspective.table(CSV);
const v = await table.view();
const json = await v.to_json();
expect(json).toEqual([
{ x: 1, y: 2, z: 3 },
{ x: 4, y: 5, z: 6 },
]);
});

test("Handles String format with explicit format option", async function () {
const table = await perspective.table(CSV, {
format: "csv",
});
const v = await table.view();
const json = await v.to_json();
expect(json).toEqual([
{ x: 1, y: 2, z: 3 },
{ x: 4, y: 5, z: 6 },
]);
});

test("Handles ArrayBuffer format", async function () {
var enc = new TextEncoder();
const table = await perspective.table(enc.encode(CSV).buffer, {
format: "csv",
});
const v = await table.view();
const json = await v.to_json();
expect(json).toEqual([
{ x: 1, y: 2, z: 3 },
{ x: 4, y: 5, z: 6 },
]);
});

test("Handles UInt8Arry format", async function () {
var enc = new TextEncoder();
const table = await perspective.table(enc.encode(CSV), {
format: "csv",
});
const v = await table.view();
const json = await v.to_json();
expect(json).toEqual([
{ x: 1, y: 2, z: 3 },
{ x: 4, y: 5, z: 6 },
]);
});
});
46 changes: 46 additions & 0 deletions rust/perspective-js/test/js/constructors/json.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import { test, expect } from "@finos/perspective-test";
import perspective from "../perspective_client";

const TEST_JSON = { x: [1, 4], y: [2, 5], z: [3, 6] };

test.describe("JSON", function () {
test.describe("Integer columns", function () {
test("Integer columns can be updated with all JSON numeric types and cousins", async function () {
Expand Down Expand Up @@ -41,4 +43,48 @@ test.describe("JSON", function () {
]);
});
});

test("Handles String format", async function () {
const table = await perspective.table(JSON.stringify(TEST_JSON), {
format: "columns",
});
const v = await table.view();
const json = await v.to_json();
expect(json).toEqual([
{ x: 1, y: 2, z: 3 },
{ x: 4, y: 5, z: 6 },
]);
});

test("Handles UInt8Array format", async function () {
const enc = new TextEncoder();
const table = await perspective.table(
enc.encode(JSON.stringify(TEST_JSON)),
{
format: "columns",
}
);
const v = await table.view();
const json = await v.to_json();
expect(json).toEqual([
{ x: 1, y: 2, z: 3 },
{ x: 4, y: 5, z: 6 },
]);
});

test("Handles ArrayBuffer format", async function () {
const enc = new TextEncoder();
const table = await perspective.table(
enc.encode(JSON.stringify(TEST_JSON)).buffer,
{
format: "columns",
}
);
const v = await table.view();
const json = await v.to_json();
expect(json).toEqual([
{ x: 1, y: 2, z: 3 },
{ x: 4, y: 5, z: 6 },
]);
});
});
Loading

0 comments on commit ea5cab9

Please sign in to comment.