Skip to content

Commit

Permalink
refactor(rstream-query): update types to use ISubscription
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Mar 6, 2021
1 parent 224f614 commit f299612
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 22 deletions.
8 changes: 4 additions & 4 deletions packages/rstream-query/src/api.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ILogger, IObjectOf, NULL_LOGGER } from "@thi.ng/api";
import type { ISubscribable } from "@thi.ng/rstream";
import { Fn, ILogger, IObjectOf, NULL_LOGGER } from "@thi.ng/api";
import type { ISubscription } from "@thi.ng/rstream";

export type Pattern = [any, any, any];

Expand All @@ -15,9 +15,9 @@ export type Solution = IObjectOf<any>;

export type Solutions = Set<Solution>;

export type QuerySolution = ISubscribable<Solutions>;
export type QuerySolution = ISubscription<any, Solutions>;

export type BindFn = (s: Solution) => any;
export type BindFn = Fn<Solution, any>;

export interface Edit {
index: Set<number>;
Expand Down
38 changes: 20 additions & 18 deletions packages/rstream-query/src/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { illegalArgs } from "@thi.ng/errors";
import { min3id } from "@thi.ng/math";
import {
CloseMode,
ISubscribable,
ISubscription,
nextID,
Stream,
Subscription,
Expand Down Expand Up @@ -55,8 +55,8 @@ export class TripleStore implements Iterable<Triple>, IToDot {
indexS: Map<any, TripleIds>;
indexP: Map<any, TripleIds>;
indexO: Map<any, TripleIds>;
indexSelections: IObjectOf<Map<any, Subscription<Edit, TripleIds>>>;
queries: Map<string, ISubscribable<TripleIds>>;
indexSelections: IObjectOf<Map<any, ISubscription<Edit, TripleIds>>>;
queries: Map<string, ISubscription<any, TripleIds>>;
allIDs: TripleIds;

streamAll: Stream<TripleIds>;
Expand Down Expand Up @@ -186,29 +186,33 @@ export class TripleStore implements Iterable<Triple>, IToDot {
* @param id -
* @param param1 -
*/
addPatternQuery(pattern: Pattern, id?: string): ISubscribable<Triples>;
addPatternQuery(pattern: Pattern, id?: string): ISubscription<any, Triples>;
addPatternQuery(
pattern: Pattern,
id?: string,
emitTriples?: false
): ISubscribable<TripleIds>;
): ISubscription<TripleIds>;
addPatternQuery(
pattern: Pattern,
id?: string,
emitTriples?: true
): ISubscribable<Triples>;
addPatternQuery(pattern: Pattern, id?: string, emitTriples = true) {
let results: ISubscribable<TripleIds | Triples> | undefined;
): ISubscription<Triples>;
addPatternQuery(
pattern: Pattern,
id?: string,
emitTriples = true
): ISubscription {
let results: ISubscription<any, TripleIds> | undefined;
const [s, p, o] = pattern;
if (s == null && p == null && o == null) {
results = <ISubscribable<TripleIds>>this.streamAll;
results = this.streamAll;
} else {
const key = JSON.stringify(pattern);
if (!(results = this.queries.get(key))) {
const qs = this.getIndexSelection(this.streamS, s, "s");
const qp = this.getIndexSelection(this.streamP, p, "p");
const qo = this.getIndexSelection(this.streamO, o, "o");
let src: IObjectOf<Subscription<any, TripleIds>>;
let src: IObjectOf<ISubscription<any, TripleIds>>;
let xform = intersect2;
// optimize cases with 2 null terms (only needs single intersection w/ streamAll)
if (s == null && p == null) {
Expand All @@ -221,19 +225,19 @@ export class TripleStore implements Iterable<Triple>, IToDot {
src = { s: qs, p: qp, o: qo };
xform = intersect3;
}
results = <ISubscribable<TripleIds>>sync({
results = sync({
id,
src,
xform,
reset: true,
});
this.queries.set(key, <ISubscribable<TripleIds>>results);
this.queries.set(key, results);
submit(this.indexS, qs, s);
submit(this.indexP, qp, p);
submit(this.indexO, qo, o);
}
}
return emitTriples ? results.subscribe(resultTriples(this)) : results;
return emitTriples ? results.transform(resultTriples(this)) : results;
}

/**
Expand Down Expand Up @@ -377,9 +381,7 @@ export class TripleStore implements Iterable<Triple>, IToDot {
spec.bind && xforms.push(bindVars(spec.bind));
spec.select && xforms.push(filterSolutions(spec.select));
if (xforms.length) {
query = <ISubscribable<any>>(
query!.subscribe(comp.apply(null, <any>xforms))
);
query = query!.transform(comp.apply(null, <any>xforms));
}
return query!;
}
Expand Down Expand Up @@ -432,7 +434,7 @@ export class TripleStore implements Iterable<Triple>, IToDot {
stream: Stream<Edit>,
key: any,
id: string
): Subscription<any, TripleIds> {
): ISubscription<any, TripleIds> {
if (key == null) {
return this.streamAll;
}
Expand All @@ -456,7 +458,7 @@ export class TripleStore implements Iterable<Triple>, IToDot {

const submit = (
index: Map<any, TripleIds>,
stream: Subscription<Edit, TripleIds>,
stream: ISubscription<Edit, TripleIds>,
key: any
) => {
if (key != null) {
Expand Down

0 comments on commit f299612

Please sign in to comment.