Skip to content

Commit

Permalink
Merge pull request #170 from SpringQL/feat/CAN-source-reader
Browse files Browse the repository at this point in the history
feat: CAN source reader
  • Loading branch information
laysakura authored Jun 20, 2022
2 parents ac7f9b9 + dc4b5ab commit 1594ceb
Show file tree
Hide file tree
Showing 25 changed files with 519 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ All other sections are for end-users.
- Implicit `ptime` column (processing time) for streams without `ROWTIME` keyword (event time) ([#195](https://github.com/SpringQL/SpringQL/pull/195))
- `BLOB` type ([#187](https://github.com/SpringQL/SpringQL/pull/187))
- `UNSIGNED INTEGER` type ([#201](https://github.com/SpringQL/SpringQL/pull/201))
- CAN source reader, which feeds SocketCAN frames into a stream with `can_id UNSIGNED INTEGER NOT NULL, can_data BLOB NOT NULL` columns ([#170](https://github.com/SpringQL/SpringQL/pull/170))

### For developers

Expand Down
2 changes: 2 additions & 0 deletions springql-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ once_cell = "1.8"
parking_lot = "0.12"
time = {version="0.3.9", features = ["formatting", "parsing", "macros"]}

socketcan = "1.7"

[dev-dependencies]
springql-foreign-service = {path = "../foreign-service"}
springql-test-logger = {path = "../test-logger"}
Expand Down
98 changes: 98 additions & 0 deletions springql-core/examples/can_source_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

//! Demo application in <https://springql.github.io/get-started/write-basic-apps/#app1-simple-arithmetic-conversion-over-a-stream>.
//!
//! Usage:
//!
//! ```bash
//! # Enable your SocketCAN interface (e.g. vcan0, slcan0) first.
//!
//! cargo run --example can_source_reader -- SOCKET_CAN_INTERFACE
//! ```
use std::env;

use springql_core::api::{SpringConfig, SpringPipeline};

fn parse_can_interface_arg() -> String {
let args: Vec<String> = env::args().collect();
assert_eq!(args.len(), 2);

let interface = args[1].clone();
log::info!("Using CAN interface: {}", interface);

interface
}

fn main() {
let can_interface = parse_can_interface_arg();

let pipeline = SpringPipeline::new(&SpringConfig::default()).unwrap();

pipeline
.command(
"
CREATE SOURCE STREAM source_can (
can_id UNSIGNED INTEGER NOT NULL,
can_data BLOB NOT NULL
);
",
)
.unwrap();

pipeline
.command(
"
CREATE SINK STREAM sink_can (
ts TIMESTAMP NOT NULL ROWTIME,
can_id UNSIGNED INTEGER NOT NULL,
can_data BLOB NOT NULL
);
",
)
.unwrap();

pipeline
.command(
"
CREATE PUMP pump_can AS
INSERT INTO sink_can (ts, can_id, can_data)
SELECT STREAM
source_can.ptime,
source_can.can_id,
source_can.can_data
FROM source_can;
",
)
.unwrap();

pipeline
.command(
"
CREATE SINK WRITER queue_can FOR sink_can
TYPE IN_MEMORY_QUEUE OPTIONS (
NAME 'q'
);
",
)
.unwrap();

pipeline
.command(format!(
"
CREATE SOURCE READER can FOR source_can
TYPE CAN OPTIONS (
INTERFACE '{}'
);
",
can_interface
))
.unwrap();

while let Ok(row) = pipeline.pop("q") {
let ts: String = row.get_not_null_by_index(0).unwrap();
let can_id: u32 = row.get_not_null_by_index(1).unwrap();
let can_data: Vec<u8> = row.get_not_null_by_index(2).unwrap();
eprintln!("{}\t{:X}\t{:02X?}", ts, can_id, can_data);
}
}
4 changes: 4 additions & 0 deletions springql-core/src/api/error/foreign_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ use std::{
pub enum ForeignInfo {
/// Generic TCP connection.
GenericTcp(SocketAddr),

/// Socket CAN interface
SocketCAN(String),
}

impl Display for ForeignInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let detail = match self {
ForeignInfo::GenericTcp(addr) => format!("TCP connection to {:?}", addr),
ForeignInfo::SocketCAN(interface) => format!("Socket CAN interface {}", interface),
};

write!(f, "[foreign info.] {}", detail)
Expand Down
4 changes: 4 additions & 0 deletions springql-core/src/api/spring_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ timeout_msec = 3_000
net_connect_timeout_msec = 1_000
net_read_timeout_msec = 100
can_read_timeout_msec = 100
[sink_writer]
net_connect_timeout_msec = 1_000
net_write_timeout_msec = 100
Expand Down Expand Up @@ -153,6 +155,8 @@ pub struct SpringWebConsoleConfig {
pub struct SpringSourceReaderConfig {
pub net_connect_timeout_msec: u32,
pub net_read_timeout_msec: u32,

pub can_read_timeout_msec: u32,
}

/// Config related to sink writer.
Expand Down
5 changes: 4 additions & 1 deletion springql-core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#![doc = include_str!("pipeline.md")]

mod can_source;
mod field;
mod name;
mod option;
Expand All @@ -16,13 +17,15 @@ mod stream_model;
#[cfg(test)]
pub mod test_support;

pub use can_source::{CANSourceStreamColumns, CANSourceStreamModel};
pub use field::{ColumnReference, Field};
pub use name::{
AggrAlias, ColumnName, CorrelationAlias, PumpName, QueueName, SinkWriterName, SourceReaderName,
StreamName, ValueAlias,
};
pub use option::{
InMemoryQueueOptions, NetClientOptions, NetProtocol, NetServerOptions, Options, OptionsBuilder,
CANOptions, InMemoryQueueOptions, NetClientOptions, NetProtocol, NetServerOptions, Options,
OptionsBuilder,
};
pub use pipeline_graph::{Edge, PipelineGraph};
pub use pipeline_version::PipelineVersion;
Expand Down
8 changes: 8 additions & 0 deletions springql-core/src/pipeline/can_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

mod can_source_stream_columns;
mod can_source_stream_model;
mod can_source_stream_shape;

pub use can_source_stream_columns::CANSourceStreamColumns;
pub use can_source_stream_model::CANSourceStreamModel;
30 changes: 30 additions & 0 deletions springql-core/src/pipeline/can_source/can_source_stream_columns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

use std::sync::Arc;

use crate::{
api::error::Result,
pipeline::can_source::can_source_stream_model::CANSourceStreamModel,
stream_engine::autonomous_executor::{ColumnValues, StreamColumns},
};

/// Column values in a CAN source stream.
#[derive(Clone, PartialEq, Debug)]
pub struct CANSourceStreamColumns(StreamColumns);

impl CANSourceStreamColumns {
/// # Failure
///
/// - `SpringError::Sql` when:
/// - `column_values` lacks any of `stream_model.columns()`.
/// - Type mismatch (and failed to convert type) with `stream_model` and `column_values`.
pub fn new(stream_model: &CANSourceStreamModel, column_values: ColumnValues) -> Result<Self> {
let stream_model = Arc::new(stream_model.as_stream_model().clone());
let stream_columns = StreamColumns::new(stream_model, column_values)?;
Ok(Self(stream_columns))
}

pub fn into_stream_columns(self) -> StreamColumns {
self.0
}
}
48 changes: 48 additions & 0 deletions springql-core/src/pipeline/can_source/can_source_stream_model.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

use crate::{
api::SpringError,
pipeline::{
can_source::can_source_stream_shape::CANSourceStreamShape, StreamModel, StreamName,
},
};

use anyhow::anyhow;

#[derive(Clone, Eq, PartialEq, Debug)]
pub struct CANSourceStreamModel(StreamModel);

impl CANSourceStreamModel {
fn new(name: StreamName) -> Self {
let stream_model =
StreamModel::new(name, CANSourceStreamShape::default().into_stream_shape());
Self(stream_model)
}

pub fn as_stream_model(&self) -> &StreamModel {
&self.0
}
}

impl TryFrom<&StreamModel> for CANSourceStreamModel {
type Error = SpringError;

/// # Failure
///
/// - `SpringError::InvalidFormat` when:
/// - This stream model is not compatible to CAN source stream.
fn try_from(stream_model: &StreamModel) -> Result<Self, Self::Error> {
if stream_model.shape() == &CANSourceStreamShape::default().into_stream_shape() {
Ok(Self::new(stream_model.name().clone()))
} else {
Err(SpringError::InvalidFormat {
s: "CAN source stream format".to_string(),
source: anyhow!(
"`{}` does not conform to CAN source stream's shape: {:?}",
stream_model.name(),
stream_model
),
})
}
}
}
39 changes: 39 additions & 0 deletions springql-core/src/pipeline/can_source/can_source_stream_shape.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

use crate::pipeline::{ColumnDataType, ColumnDefinition, ColumnName, SqlType, StreamShape};

#[derive(Clone, Eq, PartialEq, Debug)]
pub struct CANSourceStreamShape(StreamShape);

impl Default for CANSourceStreamShape {
fn default() -> Self {
let can_id_col = ColumnDefinition::new(
ColumnDataType::new(
ColumnName::new("can_id".to_string()),
SqlType::unsigned_integer(),
false,
),
vec![],
);

let can_data_col = ColumnDefinition::new(
ColumnDataType::new(
ColumnName::new("can_data".to_string()),
SqlType::blob(),
false,
),
vec![],
);

let stream_shape =
StreamShape::new(vec![can_id_col, can_data_col]).expect("must be a valid shape");

Self(stream_shape)
}
}

impl CANSourceStreamShape {
pub fn into_stream_shape(self) -> StreamShape {
self.0
}
}
2 changes: 2 additions & 0 deletions springql-core/src/pipeline/option.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

mod can_options;
mod in_memory_queue_options;
mod net_options;
mod options_builder;

pub use can_options::CANOptions;
pub use in_memory_queue_options::InMemoryQueueOptions;
pub use net_options::{NetClientOptions, NetProtocol, NetServerOptions};
pub use options_builder::OptionsBuilder;
Expand Down
21 changes: 21 additions & 0 deletions springql-core/src/pipeline/option/can_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

use crate::{
api::error::{Result, SpringError},
pipeline::Options,
};

#[derive(Clone, Eq, PartialEq, Debug)]
pub struct CANOptions {
pub interface: String,
}

impl TryFrom<&Options> for CANOptions {
type Error = SpringError;

fn try_from(options: &Options) -> Result<Self> {
Ok(Self {
interface: options.get("INTERFACE", |s| Ok(s.to_owned()))?,
})
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

#[allow(clippy::upper_case_acronyms)]
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum SourceReaderType {
NetClient,
NetServer,
CAN,
}
4 changes: 3 additions & 1 deletion springql-core/src/pipeline/stream_model.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

mod stream_shape;
use crate::pipeline::{field::ColumnReference, name::StreamName};

pub use stream_shape::StreamShape;

use crate::pipeline::{field::ColumnReference, name::StreamName};

#[derive(Clone, Eq, PartialEq, Debug, new)]
pub struct StreamModel {
name: StreamName,
Expand Down
Loading

0 comments on commit 1594ceb

Please sign in to comment.