Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Time Based Operators #23

Open
simoniful opened this issue Apr 25, 2022 · 0 comments
Open

Time Based Operators #23

simoniful opened this issue Apr 25, 2022 · 0 comments
Labels

Comments

@simoniful
Copy link
Owner

simoniful commented Apr 25, 2022

Ch.11 Time Based Operators

A. Buffering operators

  • 과거의 요소들을 구독자에게 다시 재생하거나, 잠시 버퍼를 두고 줄 수 있다.
  • 언제 어떻게 과거와 새로운 요소들을 전달할 것인지 제어 가능

1. replay(), replayAll() - 과거 요소 리플레이

  • 설정한 버퍼의 크기 만큼 전에 onNext된 이벤트를 방출, 이 후 이벤트는 방출하던 대로 방출
    • 아래 예시에서 이벤트 생성 이 후, Observer가 구독했음에도 불구하고, 마지막 이벤트 2개가 출력
    • replayAll()을 사용할 때는 지나간 이벤트 모두 방출하므로 버퍼할 요소의 전체 개수를 정확히 알고 있는 상황에서 써야한다
  • ConnectableObservable을 리턴한다
    • 때문에 아이템을 받기 시작하려면 이것을 기본 소스에 연결해야 한다
    • connect() 메소드를 통해 연결하기 전까지는 구독자 수와 관계 없이, 아무 값도 방출하지 않는다
    • ConnectableObservable를 리턴하는 연산자들
      • replay(_:) / replayAll() / multicast(_:) / publish()
let disposeBag = DisposeBag()

let subject = PublishSubject<String>()
let bufferedObserver = subject.replay(2)

bufferedObserver.connect()

subject.onNext("이전 방출: 0")
subject.onNext("이전 방출: 1")
subject.onNext("이전 방출: 2")

bufferedObserver
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)

subject.onNext("이후 버퍼와 관계없이 방출")
subject.onNext("이후 버퍼와 관계없이 방출")
subject.onNext("이후 버퍼와 관계없이 방출")

/* Prints:
이전 방출: 1
이전 방출: 2
이후 버퍼와 관계없이 방출
이후 버퍼와 관계없이 방출
이후 버퍼와 관계없이 방출
*/

2. buffer() - 시간과 용량으로 통제 가능한 버퍼

  • 버퍼의 전체용량(full capacity)에 다다랐을 때, 또는 제한 시간(deadline)에 다다랐을 때
  • 받은 요소들을 array 형태로 즉시 방출
  • source observable에서 받을 것이 없으면, 일정 간격으로 빈 array를 방출
  • Observable<[Element]> 를 리턴
let disposeBag = DisposeBag()

let source = PublishSubject<String>()
var count = 0

// 타이머 구성
let timer = DispatchSource.makeTimerSource()
timer.schedule(deadline: .now() + 2, repeating: .seconds(1))

timer.setEventHandler {
    count += 1
    source.onNext("\(count)")
}
timer.resume()

source
    // buffer의 timeSpan(deadline)
    // buffer의 count(capacity)
    .buffer(
        timeSpan: .seconds(2),
        count: 2,
        scheduler: MainScheduler.instance
    )
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)
/* Prints:
[]
["1", "2"]
["3", "4"]
["5", "6"]
["7", "8"]
["9", "10"]
["11", "12"]
["13", "14"]
["15"]
...
*/

3. window() - 시간과 용량으로 통제 가능한 버퍼

  • buffer() 함수와 아주 비슷
  • 버퍼의 전체용량(full capacity)에 다다랐을 때, 또는 제한 시간(deadline)에 다다랐을 때
  • 받은 요소들을 Observable을 형태로 즉시 방출
  • source observable에서 받을 것이 없으면, 일정 간격으로 빈 array를 방출
  • Observable 를 리턴
let disposeBag = DisposeBag()

let source = PublishSubject<String>()
var count = 0
let timer = DispatchSource.makeTimerSource()
timer.schedule(deadline: .now() + 2, repeating: .seconds(1))

timer.setEventHandler {
    count += 1
    source.onNext("\(count)")
}
timer.resume()

source
    .window(
        timeSpan: .seconds(2),
        count: 2,
        scheduler: MainScheduler.instance
    )
    .flatMap { o -> Observable<(index: Int, element: String)> in
        return o.enumerated()
    }
    .subscribe(onNext: {
        print("\($0.index)번째 Observable의 요소 \($0.element)")
    })
    .disposed(by: disposeBag)
/* Prints:
0번째 Observable의 요소 1
0번째 Observable의 요소 2
1번째 Observable의 요소 3
0번째 Observable의 요소 4
1번째 Observable의 요소 5
0번째 Observable의 요소 6
1번째 Observable의 요소 7
0번째 Observable의 요소 8
...
*/

B. Time-shifting operators

1. delaySubscription() - 구독 지연

  • 즉시 구독이 아닌 지연시간 뒤에 구독함
  • 즉, 정한 시간이후부터 들어오는 이벤트를 방출함
  • delayInSeconds에 정의된 것에 따라 지연 이후 보여질 요소들을 선택하기 시작
  • Observable 리턴 (= element 타입을 방출)
  • Hot / Cold
    • "cold" observable들은 요소를 구독할 때만 방출이 시작
    • "hot" observable들은 어떤 시점에서부터 구독과 관계없이 영구적으로 작동 (Notifications 같은)
    • 구독을 지연시켰을 때, "cold" observable이라면 지연에 따른 차이가 없다
    • 구독을 지연시켰을 때, "hot" observable이라면 예제에서와 같이 일정 요소를 건너뛰게 된다
let disposeBag = DisposeBag()

let source = PublishSubject<String>()
var count = 0
let timer = DispatchSource.makeTimerSource()
timer.schedule(deadline: .now() + 2, repeating: .seconds(1))

timer.setEventHandler {
    count += 1
    source.onNext("\(count)")
}
timer.resume()

source
    .delaySubscription(
        .seconds(5),
        scheduler: MainScheduler.instance
    )
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)
/* Prints:
// 5초뒤에
4
5
6
7
8
...
*/

2. delay() - 구독 sequence를 뒤로 미루어 받기

  • 구독을 지연시키는 대신, source observable을 즉시 구독한다.
  • 다만 요소의 방출을 설정한 시간만큼 미룬다는 것
  • delaySubscription와의 차이점은 지연시간 이후 이벤트를 처음부터 발생
let disposeBag = DisposeBag()

let source = PublishSubject<String>()
var count = 0
let timer = DispatchSource.makeTimerSource()
timer.schedule(deadline: .now() + 2, repeating: .seconds(1))

timer.setEventHandler {
    count += 1
    source.onNext("\(count)")
}
timer.resume()

source
    .delay(
        .seconds(5),
        scheduler: MainScheduler.instance
    )
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)
/* Prints:
// 5초뒤에
1
2
3
4
5
...
*/

C. Timer operators

  • 어떤 application이든 timer를 필요. iOS와 macOS에는 이에 대해 다양한 솔루션들이 존재
  • 통상적으로 NSTimer가 해당 작업을 수행
    • 혼란스러운 소유권 모델을 가지고 있어 적절한 사용이 어려웠다.
  • 최근에는 dispatch 프레임워크가 dispatch 소스를 통해 타이머를 제공
    • 확실히 + NSTimer보다는 나은 솔루션이지만, API는 여전히 랩핑 없이는 복잡
  • RxSwift는 간단하고 효과적인 timer솔루션을 제공

1. Observable.interval()

일정하게 전송된 Int값의 무한한 observable을 생성
손쉽게 dispose()로 취소할 수 있다. 구독이 취소된다는 것은 즉 타이머를 멈춘다는 것을 의미
Observable.interval(_:scheduler:)를 통해 방출된 값은 0부터 시작한다. 다른 값이 필요하다면, map(_:)을 이용

Observable<Int>
     .interval(.seconds(3), scheduler: MainScheduler.instance)
     .subscribe(onNext: {
         print($0)
     })
     .disposed(by: disposeBag)
/* Prints:
0
1
2
3
...
*/

2. Observable.timer()

  • 구독과 첫 값 방출 사이에서 "dueTime"을 설정할 수 있다.
    • 얼마나 지연시키고 첫 값을 받을지 정해주는 것
  • period는 옵셔널이다. 만약 period를 설정하지 않으면 타이머 observable은 한번만 방출된 뒤 완료될 것이다.
  • 가독성이 높고, 손쉽게 구독 관리를 통해서 종료하는 것이 가능하다
let disposeBag = DisposeBag()

Observable<Int>
    .timer(.seconds(5), period: .seconds(1), scheduler: MainScheduler.instance)
     .subscribe(onNext: {
         print($0)
     })
     .disposed(by: disposeBag)
/* Prints:
0
1
2
3
...
*/

3. Timeout

  • timeout 연산자의 주된 목적은 타이머를 시간초과(오류) 조건에 대해 구별하는 것
  • 따라서 timeout 연산자가 실행되면, RxError.TimeoutError라는 에러 이벤트를 방출
  • 만약 에러가 잡히지 않으면 sequence를 완전 종료
import UIKit
import RxSwift
import RxCocoa
import PlaygroundSupport

let disposeBag = DisposeBag()

let button = UIButton(type: .system)
button.setTitle("Press me in 5sec!", for: .normal)
button.sizeToFit()

PlaygroundPage.current.liveView = button

let _ = button
    .rx.tap
    .map { _ in "" }
    .timeout(.seconds(5), scheduler: MainScheduler.instance)
    .subscribe {
        print($0)
    }
    .disposed(by: disposeBag)

5초 이내로 클릭하지 않으면 error 발생, 클릭 시 이벤트 발생

@simoniful simoniful added the Rx label Apr 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant