Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into feat/CAN-source-reader
Browse files Browse the repository at this point in the history
  • Loading branch information
laysakura committed Jun 20, 2022
2 parents 2f6e405 + 32f8c52 commit 1154744
Show file tree
Hide file tree
Showing 29 changed files with 445 additions and 338 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
- make:
task: test
os: ubuntu-latest
rust: 1.56.0
rust: 1.56.1
- make:
task: test
os: macos-latest
Expand Down Expand Up @@ -213,4 +213,3 @@ jobs:
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}

9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ All other sections are for end-users.
<!-- markdownlint-disable MD024 -->
## [Unreleased]

### Changed

- Migrate dependencies `chrono` -> `time` ([#194](https://github.com/SpringQL/SpringQL/pull/194))
- SpringTimestamp::from_str can accept more strictly
- subsecond part must 9 digits
- relates security advisory [RUSTSEC-2020-0071](https://rustsec.org/advisories/RUSTSEC-2020-0071)
- [Tracking issue](https://github.com/SpringQL/SpringQL/issues/173)
- Bump up MSRV from 1.56.0 to 1.56.1 ([#199](https://github.com/SpringQL/SpringQL/pull/199))

### Added

- Implicit `ptime` column (processing time) for streams without `ROWTIME` keyword (event time) ([#195](https://github.com/SpringQL/SpringQL/pull/195))
Expand Down
2 changes: 1 addition & 1 deletion Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ cat target/cargo-deadlinks.out | sort | uniq > target/cargo-deadlinks.sort
cat deadlinks.expected | sort | uniq > target/cargo-deadlinks.expected.sort
diff -u target/cargo-deadlinks.expected.sort target/cargo-deadlinks.sort
mlc --ignore-path target
mlc --ignore-path target --throttle 15
''']

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[![crates.io](https://img.shields.io/crates/v/springql-core.svg)](https://crates.io/crates/springql-core)
[![Crates.io](https://img.shields.io/crates/d/springql-core?label=cargo%20installs)](https://crates.io/crates/springql-core)
[![docs.rs](https://img.shields.io/badge/API%20doc-docs.rs-blueviolet)](https://docs.rs/springql-core)
![MSRV](https://img.shields.io/badge/rustc-1.56+-lightgray.svg)
![MSRV](https://img.shields.io/badge/rustc-1.56.1+-lightgray.svg)
[![codecov](https://codecov.io/gh/SpringQL/SpringQL/branch/main/graph/badge.svg?token=XI0IR5QVU3)](https://codecov.io/gh/SpringQL/SpringQL)
[![License: MIT](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/SpringQL/SpringQL/blob/master/LICENSE-MIT)
[![License: Apache 2.0](https://img.shields.io/badge/license-Apache_2.0-blue.svg)](https://github.com/SpringQL/SpringQL/blob/master/LICENSE-APACHE)
Expand Down
1 change: 1 addition & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ ignore = [
# temporary turn off : [Potential segfault in the time crate](https://rustsec.org/advisories/RUSTSEC-2020-0071)
# tracking issue : https://github.com/SpringQL/SpringQL/issues/173
"RUSTSEC-2020-0071",

]
# Threshold for security vulnerabilities, any vulnerability with a CVSS score
# lower than the range specified will be ignored. Note that ignored advisories
Expand Down
6 changes: 1 addition & 5 deletions foreign-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,8 @@ publish = false

[dependencies]
anyhow = "^1.0"

serde_json = "1.0"

env_logger = "0.9"
log = "0.4"

chrono = "0.4"

csv = "1.1"
time = {version="0.3.9", features = ["formatting", "parsing", "macros"]}
4 changes: 2 additions & 2 deletions foreign-service/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
};

use anyhow::Result;
use chrono::Duration;
use time::Duration;

/// Runs as a TCP server and write(2)s foreign rows to socket.
pub struct ForeignSource {
Expand Down Expand Up @@ -62,7 +62,7 @@ impl ForeignSource {
}

log::info!("[ForeignSource] No message left. Wait forever...");
thread::sleep(Duration::hours(1).to_std().unwrap());
thread::sleep(Duration::hours(1).try_into().unwrap());

Ok(())
}
Expand Down
8 changes: 4 additions & 4 deletions foreign-service/src/source/source_input/timed_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod timer;
use std::{path::Path, thread, time::Duration};

use anyhow::{Context, Result};
use chrono::{DateTime, FixedOffset};
use time::{format_description::well_known, OffsetDateTime};

use crate::source::source_input::timed_stream::{
file_parser::FileParser, file_type::FileType, timer::Timer,
Expand All @@ -33,10 +33,10 @@ impl TimedStream {
file_type: FileType,
file_path: P,
timestamp_field: String,
virt_initial_datetime: DateTime<FixedOffset>,
virt_initial_datetime: OffsetDateTime,
) -> Result<Self> {
let file_parser = FileParser::new(file_type, file_path)?;
let timer = Timer::new(virt_initial_datetime.into());
let timer = Timer::new(virt_initial_datetime);
Ok(Self {
timestamp_field,
timer,
Expand Down Expand Up @@ -64,7 +64,7 @@ impl Iterator for TimedStream {
)
})?;
let timestamp =
DateTime::parse_from_rfc3339(timestamp_s)
OffsetDateTime::parse(timestamp_s, &well_known::Rfc3339)
.with_context(||
format!(
r#"timestamp field "{}" is not in RFC 3339 format. Correct example: "1996-12-19T16:39:57-08:00""#,
Expand Down
14 changes: 7 additions & 7 deletions foreign-service/src/source/source_input/timed_stream/timer.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

use chrono::{DateTime, Duration, Utc};
use time::{Duration, OffsetDateTime};

#[derive(Eq, PartialEq, Debug)]
pub struct Timer {
real_initial_datetime: DateTime<Utc>,
real_initial_datetime: OffsetDateTime,
elapsed: Duration,

virt_initial_datetime: DateTime<Utc>,
virt_initial_datetime: OffsetDateTime,
}

impl Timer {
pub fn new(virt_initial_datetime: DateTime<Utc>) -> Self {
let real_initial_datetime = Utc::now();
pub fn new(virt_initial_datetime: OffsetDateTime) -> Self {
let real_initial_datetime = OffsetDateTime::now_utc();
let elapsed = Duration::seconds(0);
Timer {
real_initial_datetime,
Expand All @@ -21,13 +21,13 @@ impl Timer {
}
}

pub fn virt_current_datetime(&mut self) -> DateTime<Utc> {
pub fn virt_current_datetime(&mut self) -> OffsetDateTime {
self.update_clock();
self.virt_initial_datetime + self.elapsed
}

fn update_clock(&mut self) {
let now = Utc::now();
let now = OffsetDateTime::now_utc();
self.elapsed = now - self.real_initial_datetime;
}
}
5 changes: 2 additions & 3 deletions springql-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ authors = ["Sho Nakatani <lay.sakura@gmail.com>"]
license = "MIT OR Apache-2.0"

edition = "2021"
rust-version = "1.56.1"

categories = ["embedded"]
description = "SpringQL: Open-source stream processor for IoT devices and in-vehicle computers"
Expand All @@ -17,9 +18,6 @@ repository = "https://github.com/SpringQL/SpringQL"
[dependencies]
anyhow = "1.0"
thiserror = "1.0"

chrono = "0.4"

serde = {version = "1.0", features = ["derive"], default-features = false}
serde_json = "1.0"

Expand All @@ -46,6 +44,7 @@ reqwest = {version = "0.11", features = ["json", "blocking"], default-features =
once_cell = "1.8"

parking_lot = "0.12"
time = {version="0.3.9", features = ["formatting", "parsing", "macros"]}

socketcan = "1.7"

Expand Down
5 changes: 4 additions & 1 deletion springql-core/src/api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub mod foreign_info;

use thiserror::Error;

use crate::{api::error::foreign_info::ForeignInfo, pipeline::StreamName};
use crate::{api::error::foreign_info::ForeignInfo, pipeline::StreamName, time::TimeError};

/// Result type
pub type Result<T> = std::result::Result<T, SpringError>;
Expand Down Expand Up @@ -68,4 +68,7 @@ pub enum SpringError {
/// Column index
i_col: usize,
},

#[error("Time conversion error {0}")]
Time(TimeError),
}
2 changes: 1 addition & 1 deletion springql-core/src/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl ValueExprPh2 {
SqlValue::NotNull(NnSqlValue::Timestamp(ts)),
SqlValue::NotNull(NnSqlValue::Duration(resolution)),
) => {
let ts_floor = ts.floor(resolution.to_duration());
let ts_floor = ts.floor(resolution.to_duration())?;
Ok(SqlValue::NotNull(NnSqlValue::Timestamp(ts_floor)))
}
_ => Err(SpringError::Sql(anyhow!(
Expand Down
4 changes: 3 additions & 1 deletion springql-core/src/stream_engine/autonomous_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,11 @@ impl AutonomousExecutor {
log::trace!("{:?}", e)
}

// IO Error / Value Error
SpringError::ForeignIo { .. }
| SpringError::SpringQlCoreIo(_)
| SpringError::Unavailable { .. } => log::warn!("{:?}", e),
| SpringError::Unavailable { .. }
| SpringError::Time(_) => log::warn!("{:?}", e),

SpringError::InvalidOption { .. }
| SpringError::InvalidFormat { .. }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ impl GroupAggregateWindowSubtask {
.lock()
.expect("another thread accessing to window gets poisoned")
.dispatch(expr_resolver, tuple, ())
.expect("dispatch failed")
}

pub fn get_window_mut(&self) -> MutexGuard<AggrWindow> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl JoinSubtask {
.lock()
.expect("another thread accessing to window gets poisoned")
.dispatch(expr_resolver, tuple, dir)
.expect("dispatch failed")
}

pub fn get_window_mut(&self) -> MutexGuard<JoinWindow> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub use join_window::JoinWindow;
pub use panes::{AggrPane, GroupByValues, JoinDir, JoinPane, Pane, Panes};

use crate::{
api::SpringError,
expr_resolver::ExprResolver,
stream_engine::{
autonomous_executor::{
Expand All @@ -20,6 +21,8 @@ use crate::{
},
};

type Success<T> = (Vec<T>, WindowInFlowByWindowTask);

pub trait Window {
type Pane: Pane;

Expand All @@ -37,22 +40,19 @@ pub trait Window {
expr_resolver: &ExprResolver,
tuple: Tuple,
arg: <<Self as Window>::Pane as Pane>::DispatchArg,
) -> (
Vec<<<Self as Window>::Pane as Pane>::CloseOut>,
WindowInFlowByWindowTask,
) {
) -> Result<Success<<Self::Pane as Pane>::CloseOut>, SpringError> {
let rowtime = tuple.rowtime().as_timestamp();

if rowtime < self.watermark().as_timestamp() {
// too late tuple does not have any chance to be dispatched nor to close a pane.
(Vec::new(), WindowInFlowByWindowTask::zero())
Ok((Vec::new(), WindowInFlowByWindowTask::zero()))
} else {
self.watermark_mut().update(rowtime);
let wm = *self.watermark();

let window_in_flow_dispatch = self
.panes_mut()
.panes_to_dispatch(rowtime)
.panes_to_dispatch(rowtime)?
.map(|pane| pane.dispatch(expr_resolver, &tuple, arg.clone()))
.fold(WindowInFlowByWindowTask::zero(), |acc, window_in_flow| {
acc + window_in_flow
Expand All @@ -71,7 +71,7 @@ pub trait Window {
},
);

(out, window_in_flow_dispatch + window_in_flow_close)
Ok((out, window_in_flow_dispatch + window_in_flow_close))
}
}
}
Loading

0 comments on commit 1154744

Please sign in to comment.