flatMap의 원리와 Observable 스트림 종료 후 다시 이어가기
RxJava
는 데이터 스트림에 여러 연산자들을 체이닝하여 데이터를 조작할 수 있습니다.
원본 데이터에 체이닝하면서 데이터를 처리하기 위해서 거의 대부분의 경우에 flatMap, concatMap, switchMap을 사용하여 처리할 텐데, 이유는 이 연산자들은 방출되는 각각의 데이터 마다 Observable을 생성하여 데이터를 처리할 수 있게 해주므로 스레드 관리에 효과적이기 때문일 것입니다.
그런데 여기서 생산자에 따라 차이가 생기게 되며, 이 포스트에서는 flatMap을 예로 들어보겠습니다.
Single, Maybe, Completable은 1 혹은 0개의 데이터를 발생하고 스트림을 종료합니다.
그래서 이 생산자들은 어짜피 하나의 데이터거나 데이터가 없기 때문에 데이터 스트림 종료에 대한 고민할 필요가 없습니다.
하지만, Observable
은 여러개의 데이터가 스트림에 들어올 수 있으므로 종료 처리를 따로 해주어야 스트림이 종료됩니다.
그렇기 때문에 Observable은 Single의 flatMap이 Single을, flatMapMaybe가 Maybe를 return하는 것과는 다르게 동작하는 것을 알 수 있습니다. 앞의 두 생산자와는 다르게 Observable
의 flatMapSingle, flatMapMaybe 메소드는 Observable을 return합니다.
Observable의 flatMapSingle
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMapSingle(Function<? super T, ? extends SingleSource<? extends R>> mapper) {
return flatMapSingle(mapper, false);
}
flatMap이 Observable을 return하는 이유
를 알기 위해서는 flatMap이 어떻게 동작하는지를 알아야 합니다. flatMap은 데이터가 들어오면 여러개의 Observable을 모아서 하나의 새로운 Observable로 만들어 주는 연산자입니다.
결국, flatMap을 자세히 들여다 보면, 각각의 데이터를 처리하는 부분인 flatMap의 콜백 함수 Function<? super T, ? extends SingleSource<? extends R> mapper>()에서는 각각의 데이터를 처리 하는 생산자를 반환하며, Rx 체이닝을 할 수 있도록 자기 자신을 반환하는 형태를 가지고 있습니다. 즉, flatMap 이후의 downStream에서 구독할 때에는 이 각각의 데이터마다 생성된 Observable들을 flatten하여 평면화한 Observable을 구독하게 되는 것입니다.
(그래서 평면화 하기 전 각각의 Observable에서 어떤 데이터가 발행되는지 알고 싶다면 해당 flatMap 연산자 아래에 doOnNext 메소드를 추가해주면 확인할 수 있게됩니다.)
그래서, 위에서 이야기 했던 Single, Maybe의 flatMap이 Observable의 flatMap과는 결과가 다른 이유는 Single, Maybe는 upstream의 데이터가 무조건 1개 초과로 방출될 수 없기 때문에 flatMap으로 연결하면 flatMap의 반환 값이 그대로 downStream으로 이어지게 되는 것입니다. 그러므로, upstream의 Observable은 Rx로 체이닝 할 때 스트림이 유지되게만 한다면 무한히 붙일 붙일 수 있습니다.
이 스트림을 종료하는 해결 방법은 사실 간단한데 Observable이 스트림이 onComplete를 호출하게 만들어서 스트림을 종료하도록 만들면 됩니다. 하지만, subject와는 다르게 onComplete를 자체적으로 호출 할 수 없으니 다른 방법을 이용해야합니다.
방법은 flatMapCompletable 혹은 ignoreElements을 체이닝해서 데이터를 방출시키지 않는다면 처리할 데이터가 없어지므로 스트림이 종료되고(onComplete 호출), 새로운 Observable을 이어붙여 스트림을 이어나가면 됩니다.
flatMapCompletable로 스트림 연결하는 방법
마블다이어그램을 살펴보면 flatMapCompletable은 onComplete()를 실행하고 데이터를 발행하지 않습니다.
fun main() {
Observable.just(listOf(1, 2, 3, 4, 5, 6, 7, 8, 9))
.flatMap { Observable.fromIterable(it) }
.flatMapSingle {
Single.just("$it: flatMapSingle1 +")
}
.flatMapSingle {
Single.fromCallable {
"$it flatMapSingle2 +"
}
}
.flatMapSingle {
Single.fromCallable {
"$it flatMapSingle3"
}
}
.doOnNext {
println(it)
}
.flatMapCompletable {
Completable.create { emitter ->
if(it.length >= 2) {
emitter.onComplete()
}
emitter.onError(IllegalArgumentException(""))
}
}
.doOnComplete {
println("onCompleted")
}
.andThen(Single.just("New Stream"))
.subscribe({
println(it)
}, {
})
Thread.sleep(5000L)
}
먼저, Observable에 1, 2, 3, 4, 5, 6, 7, 8, 9를 스트림에 흘려보내줍니다.
그리고 flatMapSingle 3번으로 데이터를 처리한 후에 flatMapCompletable
로 데이터를 발행하지 않게합니다.
그러면 상위 Observable 데이터에 대한 스트림이 종료됩니다.
그 후, andThen으로 새로운 스트림을 만들어줍니다.
실행결과
실행결과를 보면 알 수 있듯이 각각의 데이터를 모두 처리하고 데이터 스트림이 종료된 이후 Single로 새로운 스트림을 만든 결과를 확인하실 수 있습니다.
ignoreElements로 스트림을 연결하는 방법
ignoreElements는 모든 데이터를 무시합니다.
fun main() {
Observable.just(listOf(1, 2, 3, 4, 5, 6, 7, 8, 9))
.flatMap { Observable.fromIterable(it) }
.flatMapSingle {
Single.just("$it: flatMapSingle1 +")
}
.flatMapSingle {
Single.fromCallable {
"$it flatMapSingle2 +"
}
}
.flatMapSingle {
Single.fromCallable {
"$it flatMapSingle3"
}
}
.doOnNext {
println(it)
}
.ignoreElements()
.doOnComplete {
println("onCompleted")
}
.andThen(Single.just("New Stream"))
.subscribe({
println(it)
}, {
})
Thread.sleep(5000L)
}
ignoreElements 또한 flatMapCompletable과 같은 원리입니다.
다만, 차이점은 flatMapCompletable은 Completable 동작을 수행하고 종료시키는 것이고
ignoreElements
는 아무 동작을 하지 않고 upstream의 데이터를 그냥 무시하고 완료시키는 것입니다.
실행결과
ignoreElements 또한 flatMapCompletable을 실행했을 때와 같은 결과가 나오는 것을 확인할 수 있습니다.
마무리
사실 Rx를 많이 사용하다보면 원리를 모르더라도 해결 방법 자체는 알 수 있는 내용입니다. 하지만, 복잡한 로직을 코드로 구현 했을 때, 생각했었던 것과 다르게 동작 하던 경우가 많았는데 세부적으로 어떻게 동작하는지 알고 난 후로는 그런 일이 줄어들었기 때문에 정리가 필요할 것 같아 정리하게 되었습니다.