Skip to content

Commit

Permalink
Add perspective-server crate
Browse files Browse the repository at this point in the history
Co-authored-by: Davis Silverman <sinistersnare@users.noreply.github.com>
Signed-off-by: Tim Bess <tim@prospective.dev>
  • Loading branch information
2 people authored and texodus committed Jul 4, 2024
1 parent abed771 commit be26922
Show file tree
Hide file tree
Showing 8 changed files with 392 additions and 1 deletion.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ members = [
"rust/bundle",
"rust/perspective-client",
"rust/perspective-js",
"rust/perspective-server",
]

[profile.dev]
Expand Down
2 changes: 1 addition & 1 deletion rust/perspective-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

[package]
name = "perspective-server"
version = "3.0.0-alpha.1"
version = "2.10.1"
authors = ["Andrew Stein <steinlink@gmail.com>"]
edition = "2021"
description = "A data visualization and analytics component, especially well-suited for large and/or streaming datasets."
Expand Down
115 changes: 115 additions & 0 deletions rust/perspective-server/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
// ┃ Copyright (c) 2017, the Perspective Authors. ┃
// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
// ┃ This file is part of the Perspective library, distributed under the terms ┃
// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

use std::collections::HashSet;
use std::path::Path;
use std::{fs, io};

use cmake::Config;

fn copy_dir_all(
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
skip: &HashSet<&str>,
) -> io::Result<()> {
fs::create_dir_all(&dst)?;
for entry in fs::read_dir(src)? {
let entry = entry?;
let ty = entry.file_type()?;
if ty.is_dir() {
if !skip.contains(&*entry.file_name().to_string_lossy()) {
copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()), skip)?;
}
} else {
fs::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
}
}

Ok(())
}

fn cmake_build() -> Result<(), std::io::Error> {
if std::env::var("CARGO_FEATURE_EXTERNAL_CPP").is_ok() {
println!("cargo:warning=MESSAGE Building in development mode");
let root_dir_env = std::env::var("PSP_ROOT_DIR").expect("Must set PSP_ROOT_DIR");
let root_dir = Path::new(root_dir_env.as_str());
copy_dir_all(Path::join(root_dir, "cpp"), "cpp", &HashSet::from(["dist"]))?;
copy_dir_all(Path::join(root_dir, "cmake"), "cmake", &HashSet::new())?;
}

let mut dst = Config::new("cpp/perspective");
let profile = std::env::var("PROFILE").unwrap();
dst.always_configure(true);
dst.define("CMAKE_BUILD_TYPE", profile.as_str());
dst.define("PSP_WASM_BUILD", "0");
dst.define("PSP_PYTHON_BUILD", "1");
if std::env::var("CARGO_FEATURE_EXTERNAL_CPP").is_err() {
dst.env("PSP_DISABLE_CLANGD", "1");
}

dst.build_arg(format!("-j{}", num_cpus::get()));
println!("cargo:warning=MESSAGE Building cmake {}", profile);
let artifact = dst.build();

println!("cargo:warning=MESSAGE Building cxx");
cxx_build::bridge("src/ffi.rs")
.file("src/server.cpp")
.include("include")
.include("cpp/perspective/src/include")
.flag_if_supported("-std=c++17") // TODO not needed?
.flag("-fexceptions") // TODO not needed?
.static_flag(true)
.compile("perspective");

println!(
"cargo:rustc-link-search=native={}/build",
artifact.display()
);

println!("cargo:rustc-link-lib=static=psp");
link_cmake_static_archives(artifact.as_path())?;
println!("cargo:rerun-if-changed=cpp/perspective");
println!("cargo:rerun-if-changed=include/server.h");
println!("cargo:rerun-if-changed=src/server.cpp");
println!("cargo:rerun-if-changed=src/lib.rs");
Ok(())
}

/// Walk the cmake output path and emit link instructions for all archives.
/// TODO Can this be faster pls?
fn link_cmake_static_archives(dir: &Path) -> Result<(), std::io::Error> {
if dir.is_dir() {
for entry in fs::read_dir(dir)? {
let path = entry?.path();
if path.is_dir() {
link_cmake_static_archives(&path)?;
} else {
let ext = path.extension().as_ref().map(|x| x.to_string_lossy());
let stem = path.file_stem().as_ref().map(|x| x.to_string_lossy());
if ext.as_deref() == Some("a")
&& stem.as_deref() != Some("libpsp")
&& stem.as_deref() != Some("libperspective")
{
let a = stem.expect("bad")[3..].to_string();
println!("cargo:rustc-link-search=native={}", dir.display());
println!("cargo:rustc-link-lib=static={}", a);
}
}
}
}

Ok(())
}

fn main() -> Result<(), std::io::Error> {
cmake_build()
}
31 changes: 31 additions & 0 deletions rust/perspective-server/include/server.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
// ┃ Copyright (c) 2017, the Perspective Authors. ┃
// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
// ┃ This file is part of the Perspective library, distributed under the terms ┃
// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

#pragma once

#include "perspective/proto_api.h"
#include <memory>
#include "rust/cxx.h"

struct ResponseBatch;

std::unique_ptr<ProtoApiServer> new_proto_server();

std::uint32_t new_session(const ProtoApiServer& self);

rust::Box<ResponseBatch> handle_request(
const ProtoApiServer& self,
std::uint32_t client_id,
const rust::Vec<std::uint8_t>& message
);

rust::Box<ResponseBatch> poll(const ProtoApiServer& self);
63 changes: 63 additions & 0 deletions rust/perspective-server/src/ffi.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
// ┃ Copyright (c) 2017, the Perspective Authors. ┃
// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
// ┃ This file is part of the Perspective library, distributed under the terms ┃
// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

#[cxx::bridge]
mod ffi_internal {
extern "Rust" {
type ResponseBatch;
fn create_response_batch() -> Box<ResponseBatch>;
fn push_response(self: &mut ResponseBatch, client_id: u32, resp: Vec<u8>);
}
unsafe extern "C++" {
include!("server.h");
type ProtoApiServer;
fn new_proto_server() -> UniquePtr<ProtoApiServer>;
fn new_session(server: &ProtoApiServer) -> u32;
fn handle_request(
server: &ProtoApiServer,
client_id: u32,
val: &Vec<u8>,
) -> Box<ResponseBatch>;
fn poll(server: &ProtoApiServer) -> Box<ResponseBatch>;
}
}

pub struct Response {
pub client_id: u32,
pub resp: Vec<u8>,
}
pub struct ResponseBatch(pub Vec<Response>);

impl Deref for ResponseBatch {
type Target = Vec<Response>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl ResponseBatch {
fn push_response(&mut self, client_id: u32, resp: Vec<u8>) {
self.0.push(Response { client_id, resp });
}
}

fn create_response_batch() -> Box<ResponseBatch> {
Box::new(ResponseBatch(vec![]))
}

unsafe impl Send for ffi_internal::ProtoApiServer {}
unsafe impl Sync for ffi_internal::ProtoApiServer {}

use std::ops::Deref;

pub use ffi_internal::*;
103 changes: 103 additions & 0 deletions rust/perspective-server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
// ┃ Copyright (c) 2017, the Perspective Authors. ┃
// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
// ┃ This file is part of the Perspective library, distributed under the terms ┃
// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

#![feature(lazy_cell)]

use std::collections::HashMap;
use std::sync::{Arc, LazyLock, RwLock};

use cxx::UniquePtr;

mod ffi;

#[derive(Clone)]
pub struct Server {
server: Arc<UniquePtr<ffi::ProtoApiServer>>,
}

pub type SessionCallback = Arc<dyn Fn(u32, &Vec<u8>) + 'static + Sync + Send>;

impl Default for Server {
fn default() -> Self {
let server = Arc::new(ffi::new_proto_server());
Self { server }
}
}

static CALLBACKS: LazyLock<RwLock<HashMap<u32, SessionCallback>>> =
LazyLock::new(|| RwLock::new(HashMap::new()));

#[no_mangle]
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn psp_global_session_handler(client_id: u32, data: *const u8, length: u32) {
let data = unsafe { std::slice::from_raw_parts(data, length as usize) };
let data_vec = data.to_owned();
// Print process and thread id
let thread_id = std::thread::current().id();
let process_id = std::process::id();
tracing::info!(
"Global session handler called for client_id: {}, thread_id: {:?}, process_id: {}",
client_id,
thread_id,
process_id
);

let cb = CALLBACKS
.read()
.expect("lock poisoned")
.get(&client_id)
.cloned();

if let Some(cb) = cb {
cb(client_id, &data_vec);
} else {
tracing::info!("No callback found for client_id: {}", client_id);
}
}

impl Server {
pub fn new() -> Self {
Self::default()
}

pub fn new_session(&self) -> u32 {
ffi::new_session(&self.server)
}

pub fn register_session_cb(&mut self, cb: SessionCallback) -> u32 {
let client_id = ffi::new_session(&self.server);
tracing::info!("Registering session callback for client_id: {}", client_id);
CALLBACKS
.write()
.expect("lock poisoned")
.insert(client_id, cb);
client_id
}

pub fn unregister_session_cb(&mut self, client_id: u32) {
CALLBACKS.write().expect("lock poisoned").remove(&client_id);
}

pub fn handle_request(
&self,
client_id: u32,
val: &Vec<u8>,
) -> impl Iterator<Item = (u32, Vec<u8>)> {
let response_batch = ffi::handle_request(&self.server, client_id, val);
response_batch.0.into_iter().map(|x| (x.client_id, x.resp))
}

pub fn poll(&self) -> impl Iterator<Item = (u32, Vec<u8>)> {
let response_batch = ffi::poll(&self.server);
response_batch.0.into_iter().map(|x| (x.client_id, x.resp))
}
}
Loading

0 comments on commit be26922

Please sign in to comment.