-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #170 from SpringQL/feat/CAN-source-reader
feat: CAN source reader
- Loading branch information
Showing
25 changed files
with
519 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. | ||
|
||
//! Demo application in <https://springql.github.io/get-started/write-basic-apps/#app1-simple-arithmetic-conversion-over-a-stream>. | ||
//! | ||
//! Usage: | ||
//! | ||
//! ```bash | ||
//! # Enable your SocketCAN interface (e.g. vcan0, slcan0) first. | ||
//! | ||
//! cargo run --example can_source_reader -- SOCKET_CAN_INTERFACE | ||
//! ``` | ||
use std::env; | ||
|
||
use springql_core::api::{SpringConfig, SpringPipeline}; | ||
|
||
fn parse_can_interface_arg() -> String { | ||
let args: Vec<String> = env::args().collect(); | ||
assert_eq!(args.len(), 2); | ||
|
||
let interface = args[1].clone(); | ||
log::info!("Using CAN interface: {}", interface); | ||
|
||
interface | ||
} | ||
|
||
fn main() { | ||
let can_interface = parse_can_interface_arg(); | ||
|
||
let pipeline = SpringPipeline::new(&SpringConfig::default()).unwrap(); | ||
|
||
pipeline | ||
.command( | ||
" | ||
CREATE SOURCE STREAM source_can ( | ||
can_id UNSIGNED INTEGER NOT NULL, | ||
can_data BLOB NOT NULL | ||
); | ||
", | ||
) | ||
.unwrap(); | ||
|
||
pipeline | ||
.command( | ||
" | ||
CREATE SINK STREAM sink_can ( | ||
ts TIMESTAMP NOT NULL ROWTIME, | ||
can_id UNSIGNED INTEGER NOT NULL, | ||
can_data BLOB NOT NULL | ||
); | ||
", | ||
) | ||
.unwrap(); | ||
|
||
pipeline | ||
.command( | ||
" | ||
CREATE PUMP pump_can AS | ||
INSERT INTO sink_can (ts, can_id, can_data) | ||
SELECT STREAM | ||
source_can.ptime, | ||
source_can.can_id, | ||
source_can.can_data | ||
FROM source_can; | ||
", | ||
) | ||
.unwrap(); | ||
|
||
pipeline | ||
.command( | ||
" | ||
CREATE SINK WRITER queue_can FOR sink_can | ||
TYPE IN_MEMORY_QUEUE OPTIONS ( | ||
NAME 'q' | ||
); | ||
", | ||
) | ||
.unwrap(); | ||
|
||
pipeline | ||
.command(format!( | ||
" | ||
CREATE SOURCE READER can FOR source_can | ||
TYPE CAN OPTIONS ( | ||
INTERFACE '{}' | ||
); | ||
", | ||
can_interface | ||
)) | ||
.unwrap(); | ||
|
||
while let Ok(row) = pipeline.pop("q") { | ||
let ts: String = row.get_not_null_by_index(0).unwrap(); | ||
let can_id: u32 = row.get_not_null_by_index(1).unwrap(); | ||
let can_data: Vec<u8> = row.get_not_null_by_index(2).unwrap(); | ||
eprintln!("{}\t{:X}\t{:02X?}", ts, can_id, can_data); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. | ||
|
||
mod can_source_stream_columns; | ||
mod can_source_stream_model; | ||
mod can_source_stream_shape; | ||
|
||
pub use can_source_stream_columns::CANSourceStreamColumns; | ||
pub use can_source_stream_model::CANSourceStreamModel; |
30 changes: 30 additions & 0 deletions
30
springql-core/src/pipeline/can_source/can_source_stream_columns.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. | ||
|
||
use std::sync::Arc; | ||
|
||
use crate::{ | ||
api::error::Result, | ||
pipeline::can_source::can_source_stream_model::CANSourceStreamModel, | ||
stream_engine::autonomous_executor::{ColumnValues, StreamColumns}, | ||
}; | ||
|
||
/// Column values in a CAN source stream. | ||
#[derive(Clone, PartialEq, Debug)] | ||
pub struct CANSourceStreamColumns(StreamColumns); | ||
|
||
impl CANSourceStreamColumns { | ||
/// # Failure | ||
/// | ||
/// - `SpringError::Sql` when: | ||
/// - `column_values` lacks any of `stream_model.columns()`. | ||
/// - Type mismatch (and failed to convert type) with `stream_model` and `column_values`. | ||
pub fn new(stream_model: &CANSourceStreamModel, column_values: ColumnValues) -> Result<Self> { | ||
let stream_model = Arc::new(stream_model.as_stream_model().clone()); | ||
let stream_columns = StreamColumns::new(stream_model, column_values)?; | ||
Ok(Self(stream_columns)) | ||
} | ||
|
||
pub fn into_stream_columns(self) -> StreamColumns { | ||
self.0 | ||
} | ||
} |
48 changes: 48 additions & 0 deletions
48
springql-core/src/pipeline/can_source/can_source_stream_model.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. | ||
|
||
use crate::{ | ||
api::SpringError, | ||
pipeline::{ | ||
can_source::can_source_stream_shape::CANSourceStreamShape, StreamModel, StreamName, | ||
}, | ||
}; | ||
|
||
use anyhow::anyhow; | ||
|
||
#[derive(Clone, Eq, PartialEq, Debug)] | ||
pub struct CANSourceStreamModel(StreamModel); | ||
|
||
impl CANSourceStreamModel { | ||
fn new(name: StreamName) -> Self { | ||
let stream_model = | ||
StreamModel::new(name, CANSourceStreamShape::default().into_stream_shape()); | ||
Self(stream_model) | ||
} | ||
|
||
pub fn as_stream_model(&self) -> &StreamModel { | ||
&self.0 | ||
} | ||
} | ||
|
||
impl TryFrom<&StreamModel> for CANSourceStreamModel { | ||
type Error = SpringError; | ||
|
||
/// # Failure | ||
/// | ||
/// - `SpringError::InvalidFormat` when: | ||
/// - This stream model is not compatible to CAN source stream. | ||
fn try_from(stream_model: &StreamModel) -> Result<Self, Self::Error> { | ||
if stream_model.shape() == &CANSourceStreamShape::default().into_stream_shape() { | ||
Ok(Self::new(stream_model.name().clone())) | ||
} else { | ||
Err(SpringError::InvalidFormat { | ||
s: "CAN source stream format".to_string(), | ||
source: anyhow!( | ||
"`{}` does not conform to CAN source stream's shape: {:?}", | ||
stream_model.name(), | ||
stream_model | ||
), | ||
}) | ||
} | ||
} | ||
} |
39 changes: 39 additions & 0 deletions
39
springql-core/src/pipeline/can_source/can_source_stream_shape.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. | ||
|
||
use crate::pipeline::{ColumnDataType, ColumnDefinition, ColumnName, SqlType, StreamShape}; | ||
|
||
#[derive(Clone, Eq, PartialEq, Debug)] | ||
pub struct CANSourceStreamShape(StreamShape); | ||
|
||
impl Default for CANSourceStreamShape { | ||
fn default() -> Self { | ||
let can_id_col = ColumnDefinition::new( | ||
ColumnDataType::new( | ||
ColumnName::new("can_id".to_string()), | ||
SqlType::unsigned_integer(), | ||
false, | ||
), | ||
vec![], | ||
); | ||
|
||
let can_data_col = ColumnDefinition::new( | ||
ColumnDataType::new( | ||
ColumnName::new("can_data".to_string()), | ||
SqlType::blob(), | ||
false, | ||
), | ||
vec![], | ||
); | ||
|
||
let stream_shape = | ||
StreamShape::new(vec![can_id_col, can_data_col]).expect("must be a valid shape"); | ||
|
||
Self(stream_shape) | ||
} | ||
} | ||
|
||
impl CANSourceStreamShape { | ||
pub fn into_stream_shape(self) -> StreamShape { | ||
self.0 | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. | ||
|
||
use crate::{ | ||
api::error::{Result, SpringError}, | ||
pipeline::Options, | ||
}; | ||
|
||
#[derive(Clone, Eq, PartialEq, Debug)] | ||
pub struct CANOptions { | ||
pub interface: String, | ||
} | ||
|
||
impl TryFrom<&Options> for CANOptions { | ||
type Error = SpringError; | ||
|
||
fn try_from(options: &Options) -> Result<Self> { | ||
Ok(Self { | ||
interface: options.get("INTERFACE", |s| Ok(s.to_owned()))?, | ||
}) | ||
} | ||
} |
2 changes: 2 additions & 0 deletions
2
springql-core/src/pipeline/source_reader_model/source_reader_type.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,9 @@ | ||
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. | ||
|
||
#[allow(clippy::upper_case_acronyms)] | ||
#[derive(Clone, Eq, PartialEq, Debug)] | ||
pub enum SourceReaderType { | ||
NetClient, | ||
NetServer, | ||
CAN, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.