이번 포스트에서는 생성 연산자
를 정리하겠습니다.
just
just
는 인자로 받은 데이터들을 순서대로 통지하는 Observable을 생성하는 연산자 입니다.
주의할 점은, just 연산자는 Observable의 생성시점이 선언한 시점이기 때문에 원하는 동작으로 수행되지 않을 수 있으므로 사용할 때 주의해야 합니다.
그러므로, just 연산자 혹은 from 연산자를 사용해야할 경우에는 Observable을 지연생성해주는 create, fromCallable을 사용하거나 defer로 감싸주는 것을 권장합니다.
fun main() {
Observable
.just(0,1,2,3)
.map { it * 2}
.subscribe {
println(it)
}
}
실행결과
craete
create
는 Observable을 구독할 때 마다 ObservableOnSubscribe의 콜백이 새로운 ObservableEmitter를 인스턴스를 호출하며 해당 Emitter의 콜백을 통해서 스트림을 제어하는 onNext(), onError(), onComplete() 함수를 호출해서 데이터를 전달할 수 있습니다.
onNext
-> 데이터를 발행을 알립니다.
onComplete
-> 스트림을 종료를 알립니다.
onError
-> 스트림에 오류가 발생했음을 알립니다. onComplete와 마찬가지로 스트림을 종료합니다.
fun main() {
Observable.create<String> { emitter ->
// Hello 전달
emitter.onNext("Hello")
// World 전달
emitter.onNext("World")
// 완료
emitter.onComplete()
}.subscribe { println(it) }
}
실행결과
defer
defer
는 구독(subscribe) 하기 전까지는 Observable을 생성을 연기(Lazy) 합니다.
내부적으로 들여다보면, defer의 파라미터로 Func0<R>이 존재하는데 subscribe를 할 때마다 호출되기 때문에 매번 새로운 Observable을 생성합니다. 즉, subscribe()가 호출되는 시점까지 stream의 생성을 미루다가 subscribe()가 호출이 될때마다 새로운 Observable을 생성 합니다.
Observable의 생성 시점이 구독시점이기 때문에 외부 참조 변수를 사용하더라도 항상 최신의 데이터를 유지할 수 있습니다.
defer vs just
fun main() {
var name = "seunghwan"
val justObservable = Observable.just(name)
val deferObservable = Observable.defer { Observable.just(name) }
name = "seosh817"
justObservable.subscribe {
println("justObservable name: $it")
}
deferObservable.subscribe {
println("deferObservable name: $it")
}
}
위의 코드를 통해 defer와 just의 실행결과의 차이점을 알아보겠습니다.
먼저, name이라는 String 변수에 "seunghwan"이라고 선언 합니다.
그리고 하나는 just, 하나는 defer로 감싸서 Observable을 각각 생성해줍니다.
그 후, name 변수에 seosh817이라고 재선언을 해줍니다.
그리고 각각의 observable을 subscribe 해주면
just로 생성한 Observable은 맨 처음에 선언한 "seunghwan"이라는 문자열을 방출하고 있고
defer로 생성한 Observable은 Observable을 생성한 이후에 재선언한 "seosh817"이라는 문자열을 방출하고 있습니다.
이런 결과가 나온 이유는
just로 생성한 Observable은 선언한 시점에 "seunghwan"이라는 문자열을 가진 Observable이 생성되었기 때문이고,
defer로 생성한 Observable은 선언했을 때 Observable을 생성하고 있지 않다가 구독 시점에 Observable을 생성하기 때문에 재선언한 "seosh817"을 가진 Observable이 생성되었기 때문입니다.
실행결과
fromCallable
fromCallable
은 defer와 마찬가지로 Observable 생성을 구독(subscribe)시점으로 지연합니다.
defer와의 차이점은 fromCallable은 생성자를 반환하지 않아도 된다는 점입니다.
fun main() {
var name = "seunghwan"
val justObservable = Observable.just(name)
val fromCallableObservable = Observable.fromCallable { name }
name = "seosh817"
justObservable.subscribe {
println("justObservable name: $it")
}
fromCallableObservable.subscribe {
println("fromCallableObservable name: $it")
}
Thread.sleep(1000)
}
실행결과
fromArray
fromArray
는 array를 받아서 Observable을 생성합니다.
*는 spread 형태 (kotlin operator spread)
val items = arrayOf("Hello", "World")
Observable.fromArray(*items)
.subscribe { println(it) }
실행결과
fromIterable
fromIterable
은 fromArray와는 다르게 iterable을 implements 한 클래스를 파라미터로 받아서 Observable을 만드는 연산자입니다.
val list2 = listOf(4, 5, 6)
Observable.fromIterable(
list2
).subscribe {
println(it)
}
실행결과
interval
interval
은 initialDelay를 시작으로 period 주기로 Long 값을 0부터 n까지 증가시키면서 데이터를 전달하는 Observable을 만드는 연산자입니다.
즉, 특정 주기마다 반복해야 할 때 사용합니다.
interval은 별도의 설정이 없으면 Schedulers.computation()의 스케쥴러에서 실행됩니다. 별도의 스레드에서 처리를 하기 때문에, 해당 스레드의 작업이 끝날때 까지 프로그램이 끝나면 안되므로 Thread.Sleep()를 통해 기다려 줍니다.
interval 메소드로 생성한 Observable은 완료할 수가 없으므로 완료 통지가 필요하다면 대표적으로 두가지 방법이 존재합니다.
- take()를 통해 통지할 데이터를 제한하는 방법
- Interval() 대신에 IntervalRange()를 사용하는 방법
val dateTimeFormatter: DateTimeFormatter = DateTimeFormatter.ofPattern("hh:mm:ss.SSS")
Observable.interval(0, 1000L, TimeUnit.MILLISECONDS)
.subscribe {data ->
println(LocalTime.now().format(dateTimeFormatter) + "data = $data")
}
Thread.sleep(3000L)
실행결과
timer
timer
는 정해진 delay 이후에 특정 값을 전달 하는 Observable을 생성하는 연산자입니다.
스케쥴러 설정을 해주지 않으면 default로 Schedulers.computation() 스케쥴러에서 실행됩니다.
이것도 마찬가지로 별도의 스레드에서 동작하기 때문에 Thread.sleep()로 프로그램이 종료하지 못하도록 설정했습니다
println("start:\t\t ${System.currentTimeMillis()}")
Observable.timer(1000, TimeUnit.MILLISECONDS)
.subscribe {data ->
println("subscribe:\t ${System.currentTimeMillis()} + data = $data")
}
Thread.sleep(2000)
실행결과
range
range
는 start부터 count만큼 값을 1씩 증가하는 값을 생산하는 Observable을 만드는 operator입니다.
주의 n부터 m까지가 아니라 n부터 m만큼 입니다.
Observable.range(2, 5)
.subscribe { println(it) }
실행결과
repeat
repeat
은 Observable을 원하는 갯수만큼 반복시켜서 데이터를 전달하는 연산자입니다.
val observable = Observable.just("Hello", "World")
.doOnSubscribe { println("subscribe") }
.repeat(2)
observable.subscribe { println(it) }
실행결과
References
https://brunch.co.kr/@lonnie/18
https://selfish-developer.com/entry/RxJava-defer-fromCallable