Skip to content

Commit

Permalink
Support multiple values in stream-cons + use stream-cons in stream op…
Browse files Browse the repository at this point in the history
…erations

The stream protocol and stream operations support multiple values.
However, stream-cons does not. The consequences are:

- Multiple values in streams can't be easily used by users.
- Internal stream operations are implemented in a convoluted way
  to support multiple values, which is error-prone.

  There are many mistakes in the current implementation,
  including but not limited to:

  - Incorrect laziness
  - Linear time memory consumption on operations that should take
    constant time

  See
  https://racket.discourse.group/t/stream-filter-not-in-constant-space/1643
  for the discussion.

This commit adds a support of multiple values to stream-cons.
We then utilize this stream-cons on the stream operations
to fix the issues mentioned above.

Note that there are still known deficiencies in the current
implementation. These deficiencies are not degradation, however, because
SRFI-41 also has them. Furthermore, the data definition of streams in
Racket makes it difficult to fix these deficiencies without a large
refactoring, so I would like to postpone fixing these issues
to the next PRs.

See https://srfi-email.schemers.org/srfi-41/msg/21844204/
for the discussion.
sorawee authored Feb 14, 2023
1 parent 65a760a commit ba2e0f3
Showing 5 changed files with 278 additions and 95 deletions.
37 changes: 23 additions & 14 deletions pkgs/racket-doc/scribblings/reference/sequences.scrbl
Original file line number Diff line number Diff line change
@@ -1180,10 +1180,11 @@ stream, but plain lists can be used as streams, and functions such as
that is like the one produced by @racket[(stream-lazy rest-expr)].

The first element of the stream as produced by @racket[first-expr]
must be a single value. The @racket[rest-expr] must produce a stream
can be multiple values. The @racket[rest-expr] must produce a stream
when it is evaluated, otherwise the @exnraise[exn:fail:contract?].

@history[#:changed "8.0.0.12" @elem{Added @racket[#:eager] options.}]}
@history[#:changed "8.0.0.12" @elem{Added @racket[#:eager] options.}
#:changed "8.8.0.7" @elem{Changed to allow multiple values.}]}

@defform*[[(stream-lazy stream-expr)
(stream-lazy #:who who-expr stream-expr)]]{
@@ -1223,23 +1224,32 @@ stream, but plain lists can be used as streams, and functions such as

@history[#:added "8.0.0.12"]}

@defform[(stream e ...)]{
@defform[#:literals (values)
(stream elem-expr ...)
#:grammar ([elem-expr (values single-expr ...)
single-expr])]{
A shorthand for nested @racket[stream-cons]es ending with
@racket[empty-stream]. As a match pattern, @racket[stream]
matches a stream with as many elements as @racket[e]s,
and each element must match the corresponding @racket[e] pattern.
matches a stream with as many elements as @racket[elem-expr]s,
and each element must match the corresponding @racket[elem-expr] pattern.
The pattern @racket[elem-expr] can be @racket[(values single-expr ...)], which matches against
multiple valued elements in the stream.

@history[#:changed "8.8.0.7" @elem{Changed to allow multiple values.}]
}

@defform[(stream* e ... tail)]{
A shorthand for nested @racket[stream-cons]es, but the @racket[tail]
@defform[(stream* elem-expr ... tail-expr)]{
A shorthand for nested @racket[stream-cons]es, but the @racket[tail-expr]
must produce a stream when it is forced, and that stream is used as the rest of the stream instead of
@racket[empty-stream]. Similar to @racket[list*] but for streams.
As a match pattern, @racket[stream*] is similar to a @racket[stream] pattern,
but the @racket[tail] pattern matches the ``rest'' of the stream after the last @racket[e].
but the @racket[tail-expr] pattern matches the ``rest'' of the stream after the last @racket[elem-expr].

@history[#:added "6.3"
#:changed "8.0.0.12" @elem{Changed to delay @racket[rest-expr] even
if zero @racket[expr]s are provided.}]}
if zero @racket[expr]s are provided.}
#:changed "8.8.0.7" @elem{Changed to allow multiple values.}]
}

@defproc[(in-stream [s stream?]) sequence?]{
Returns a sequence that is equivalent to @racket[s].
@@ -1349,8 +1359,7 @@ stream, but plain lists can be used as streams, and functions such as
Returns a stream whose elements are the elements of @racket[s] for
which @racket[f] returns a true result. Although the new stream is
constructed lazily, if @racket[s] has an infinite number of elements
where @racket[f] returns a false result in between two elements
where @racket[f] returns a true result, then operations on this
where @racket[f] returns a false result, then operations on this
stream will not terminate during the infinite sub-stream.
}

@@ -1371,16 +1380,16 @@ stream, but plain lists can be used as streams, and functions such as
allows @racket[for/stream] and @racket[for*/stream] to iterate over infinite
sequences, unlike their finite counterparts.

Please note that these forms do not support returning @tech{multiple values}.

@examples[#:eval sequence-evaluator
(for/stream ([i '(1 2 3)]) (* i i))
(stream->list (for/stream ([i '(1 2 3)]) (* i i)))
(stream-ref (for/stream ([i '(1 2 3)]) (displayln i) (* i i)) 1)
(stream-ref (for/stream ([i (in-naturals)]) (* i i)) 25)
(stream-ref (for/stream ([i (in-naturals)]) (values i (add1 i))) 10)
]

@history[#:added "6.3.0.9"]
@history[#:added "6.3.0.9"
#:changed "8.8.0.7" @elem{Changed to allow multiple values.}]
}

@defthing[gen:stream any/c]{
165 changes: 165 additions & 0 deletions pkgs/racket-test-core/tests/racket/stream.rktl
Original file line number Diff line number Diff line change
@@ -220,10 +220,175 @@
(test #t 'stream (match '() [(stream) #t]))
(test 1 'stream (match '(1) [(stream x) x]))
(test 3 'stream (match '(1 2) [(stream x y) (+ x y)]))
(test '(0 1 1 2) 'stream
(match (for/stream ([i 2])
(values i (add1 i)))
[(stream (values a b) (values c d)) (list a b c d)]))
(test '(1 2) 'stream* (match '(1 2) [(stream* xs) xs]))
(test 1 'stream* (match '(1 2) [(stream* hd _) hd]))
(test '(2) 'stream* (match '(1 2) [(stream* _ tl) tl]))
(test -1 'stream* (match '(1 2 3 4) [(stream* x y tl) (- x y)]))
(test '(3 4) 'stream* (match '(1 2 3 4) [(stream* x y tl) tl]))
(test '(0 1 1 2 #t) 'stream*
(match (for/stream ([i 2])
(values i (add1 i)))
[(stream* (values a b) (values c d) tl) (list a b c d (stream-empty? tl))]))

;; constructors with multiple values
(test '((1 2))
'stream-cons
(for/list ([(a b) (stream-cons (values 1 2) empty-stream)])
(list a b)))

(test '((1 2))
'stream-cons
(for/list ([(a b) (stream-cons #:eager (values 1 2) empty-stream)])
(list a b)))

(test '((1 2))
'stream-cons
(for/list ([(a b) (stream-cons (values 1 2) #:eager empty-stream)])
(list a b)))

(test '((1 2))
'stream-cons
(for/list ([(a b) (stream-cons #:eager (values 1 2) #:eager empty-stream)])
(list a b)))

(test '((1 2) (3 4))
'stream
(for/list ([(a b) (stream (values 1 2) (values 3 4))])
(list a b)))

(test '((1 2) (3 4))
'stream*
(for/list ([(a b) (stream* (values 1 2) (stream (values 3 4)))])
(list a b)))

(test '((0 1) (1 2))
'for/stream
(for/list ([(a b) (for/stream ([i 2]) (values i (add1 i)))])
(list a b)))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; testing lazy operation
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

;; stream-map
(let ()
(define t (stream-cons 0 t))
(define s (stream-filter negative? t))
(test #t stream? (stream-map add1 s)))

;; stream-filter
(let ()
(define t (stream-cons 0 t))
(define s (stream-filter negative? t))
(test #t stream? (stream-filter positive? s)))

(let ()
(define val #f)
(define st
(stream-cons 1
(begin
(set! val #t)
empty-stream)))

(stream-first st)
(test #f 'stream-cons val)

(define st* (stream-filter (lambda (x) #t) st))
(stream-first st*)
(test #f 'stream-filter val))


;; stream-take
(let ()
(define t (stream-cons 0 t))
(define s (stream-filter negative? t))
(test #t stream? (stream-take s 10)))

;; stream-append
(let ()
(define t (stream-cons 0 t))
(define s (stream-filter negative? t))
(test #t stream? (stream-append s s)))

;; stream-add-between
(let ()
(define t (stream-cons 0 t))
(define s (stream-filter negative? t))
(test #t stream? (stream-add-between s 1)))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; testing memoizing operation
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

;; stream-map
(let ()
(define acc 0)
(define (f x y)
(set! acc (add1 acc))
(values (+ 1 x) (+ 2 y)))
(define t (stream-cons (values 0 1) t))
(define s (stream-map f t))
(test '(1 3) call-with-values (λ () (stream-first s)) list)
(test '(1 3) call-with-values (λ () (stream-first s)) list)
(test 1 'stream-map acc)
(test '(1 3) call-with-values (λ () (stream-first (stream-rest s))) list)
(test '(1 3) call-with-values (λ () (stream-first (stream-rest s))) list)
(test 2 'stream-map acc))

;; stream-filter
(define-syntax-rule (terminate-quickly e)
(let ()
(define-values (xs cpu real gc) (time-apply (λ () e) '()))
(when (run-unreliable-tests? 'timing)
(test #t < real 50))
(apply values xs)))

(let ()
(define st
(for/stream ([i (in-naturals)])
(modulo i 1000000)))
(define st* (terminate-quickly (stream-filter zero? st))) ; should be fast
(terminate-quickly (stream-rest st*)) ; should be fast
(time (test 0 stream-first (stream-rest st*))) ; should take time
(test 0 'stream-filter (terminate-quickly (stream-first (stream-rest st*)))) ; should be fast
)

(let ()
(define s (stream-cons 0 s))
(define t (stream-filter (λ (x) (sleep 0.5) #t) s))
(test 0 stream-first t)
(test 0 'stream-filter (terminate-quickly (stream-first t))))

;; constant space (adapted from an example by Jacob J. A. Koot)
;; https://racket.discourse.group/t/stream-filter-not-in-constant-space/1643
(let ()
(define boxes '())

(define (gc!)
(collect-garbage)
(collect-garbage)
(collect-garbage)
(set! boxes (filter weak-box-value boxes))
(test #t <= (length boxes) 1))

(define (pred x)
(zero? (remainder x 10)))

(define (make-nats n)
(stream-cons n
(let ()
(define s (make-nats (add1 n)))
(set! boxes (cons (make-weak-box s) boxes))
s)))

(for/fold ([nats (make-nats 0)])
([i 5])
(gc!)
(stream-rest (stream-filter pred nats)))
(gc!))

(report-errs)
2 changes: 1 addition & 1 deletion racket/collects/racket/private/for.rkt
Original file line number Diff line number Diff line change
@@ -1345,7 +1345,7 @@

(define (sequence->stream s)
(unless (sequence? s)
(raise-argument-error 'sequence-generate "sequence?" s))
(raise-argument-error 'sequence->stream "sequence?" s))
(cond
[(stream? s) s]
[else
31 changes: 23 additions & 8 deletions racket/collects/racket/private/stream-cons.rkt
Original file line number Diff line number Diff line change
@@ -22,10 +22,13 @@
(require (prefix-in for: racket/private/for))

(provide stream-null stream-cons stream? stream-null? stream-pair?
stream-car stream-cdr stream-lambda stream-lazy stream-force)
stream-car stream-cdr stream-lambda stream-lazy stream-force
unpack-multivalue thunk->multivalue)

(struct multivalue (content))

;; An eagerly constructed stream has a lazy first element, and
;; normaly its rest is a lazily constructed stream.
;; normally its rest is a lazily constructed stream.
(define-struct eagerly-created-stream ([first-forced? #:mutable] [first #:mutable] rest)
#:reflection-name 'stream
#:property for:prop:stream (vector
@@ -100,18 +103,30 @@
[(for:stream? s) s]
[else (raise-argument-error 'stream-force "stream?" s)]))

;; Forces the first element of an eagerly consttructed stream
(define (unpack-multivalue v)
(cond
[(multivalue? v) (apply values (multivalue-content v))]
[else v]))

(define (thunk->multivalue thk)
(call-with-values
thk
(case-lambda
[(v) v]
[vs (multivalue vs)])))

;; Forces the first element of an eagerly constructed stream
(define (stream-force-first p)
(cond
[(eagerly-created-stream-first-forced? p)
(eagerly-created-stream-first p)]
(unpack-multivalue (eagerly-created-stream-first p))]
[else
(define thunk (eagerly-created-stream-first p))
(set-eagerly-created-stream-first! p reentrant-error)
(define v (thunk))
(define v (thunk->multivalue thunk))
(set-eagerly-created-stream-first! p v)
(set-eagerly-created-stream-first-forced?! p #t)
v]))
(unpack-multivalue v)]))

(define-syntax stream-lambda
(syntax-rules ()
@@ -130,13 +145,13 @@
(eagerly-created-stream #f (lambda () obj)
(lazily-created-stream #t (lambda () strm))))
((stream-cons #:eager obj strm)
(eagerly-created-stream #t obj
(eagerly-created-stream #t (thunk->multivalue (lambda () obj))
(lazily-created-stream #t (lambda () strm))))
((stream-cons obj #:eager strm)
(eagerly-created-stream #f (lambda () obj)
(stream-assert strm)))
((stream-cons #:eager obj #:eager strm)
(eagerly-created-stream #t obj
(eagerly-created-stream #t (thunk->multivalue (lambda () obj))
(stream-assert strm)))))

(define (stream-assert v)
Loading
Oops, something went wrong.

0 comments on commit ba2e0f3

Please sign in to comment.