RxJava
는 데이터를 만들고 통지하는 생산자와 통지된 데이터를 받아 처리하는 소비자로 구성됩니다.
이 생산자를 소비자가 구독해 생산자가 통지한 데이터를 소비자가 받게 됩니다.
RxJava에서 이 생산자와 소비자의 관계는 크게 두 가지로 나뉩니다.
하나는 Reactive Streams를 지원하는 Flowable과 Subscriber, 다른 하나는 Reactive Streams를 지원하지 않고 배압 기능이 없는 Observable과 Observer입니다.
구분 | 생산자 | 소비자 |
Reactive Streams 지원 | Flowable | Subscriber |
Reactive Streams 미지원 | Observable | Observer |
Flowable
은 Reactive Streams의 생산자인 Publisher를 구현한 클래스고, Subscriber는 Reactive Streams의 클래스입니다. 그래서 기본적인 매커니즘은 Reactive Streams와 같습니다. 생산자인 Flowable로 구독 시작(onSubscribe), 데이터 통지(onNext), 에러 통지(onError), 완료 통지(onComplete)를 하고 각 통지를 받은 시점에 소비자인 Subscriber로 처리합니다. 그리고 Subscription으로 데이터 개수 요청과 구독 해지를 합니다.
반면, Observable과 Observer는 Reactive Streams를 구현하지 않아서 Reactive Streams 인터페이스를 사용하지 않습니다. 하지만 기본적인 메커니즘은 Flowable과 Subscriber 구성과 거의 같습니다. 생산자인 Observable에서 구독 시작(onSubscribe), 데이터 통지(onNext), 에러 통지(onError), 완료 통지(onComplete)를 하면 Observer에서 이 통지를 받습니다.
다만, Observable과 Observer 구성은 통지하는 데이터 개수를 제어하는 배압 기능이 없기 때문에 데이터 개수를 요청하지 않습니다. 그래서 Subscription을 사용하지 않고, Disposable이라는 구독 해지 메서드가 있는 인터페이스를 사용합니다. 이 Disposable은 구독을 시작하는 시점에 onSubscribe 메서드의 인자로 Observer에 전달됩니다.
그래서 Observable과 Observer 간에 데이터를 교환할 때 Flowable과 Subscriber처럼 데이터 개수 요청은 하지 않고 데이터가 생성되자마자 Observer에 통지됩니다.
Observer
Observer는 위에서 말한대로 Reactive Streams의 인터페이스를 사용하지 않습니다. 그러므로, Subscription의 데이터 개수를 요청하는 기능이 없고, 배압기능을 사용하지 않습니다.
생산자가 소비자인 Observer를 subscribe()하면 구독을 시작합니다.
package io.reactivex;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
public interface Observer<T> {
/**
* Provides the Observer with the means of cancelling (disposing) the
* connection (channel) with the Observable in both
* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
* @param d the Disposable instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(@NonNull Disposable d);
/**
* Provides the Observer with a new item to observe.
* <p>
* The {@link Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
void onNext(@NonNull T t);
/**
* Notifies the Observer that the {@link Observable} has experienced an error condition.
* <p>
* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onComplete}.
*
* @param e
* the exception encountered by the Observable
*/
void onError(@NonNull Throwable e);
/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/
void onComplete();
}
CustomObserver
class CustomObserver<T>(
private val compositeDisposable: CompositeDisposable,
private val onNextAction: (T) -> Unit
) : Observer<T> {
override fun onComplete() {
// no-op
}
override fun onSubscribe(d: Disposable) {
compositeDisposable.add(d)
}
override fun onNext(t: T) {
onNextAction(t)
}
override fun onError(e: Throwable) {
//ErrorUtil.handleError(e)
}
}
▼ Observer 방식일 때의 subscribe() 함수
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
Subscriber
Subscriber
는 Reactive Streams의 인터페이스입니다. 그러므로, onSubscribe()로 데이터 구독을 시작하게 되면 Subscription의 request()를 통해 생산자에게 데이터 개수를 요청 할 수 있는 기능이 있으며, 이 Subscriber가 받은 데이터를 처리할 때 까지 Flowable이 데이터를 통지하지 않게 배압기능을 사용할 수 있습니다.
Flowable이 소비자인 Subscriber를 subscribe()하면 구독을 시작합니다.
package org.reactivestreams;
/**
* Will receive call to {@link #onSubscribe(Subscription)} once after passing an instance of {@link Subscriber} to {@link Publisher#subscribe(Subscriber)}.
* <p>
* No further notifications will be received until {@link Subscription#request(long)} is called.
* <p>
* After signaling demand:
* <ul>
* <li>One or more invocations of {@link #onNext(Object)} up to the maximum number defined by {@link Subscription#request(long)}</li>
* <li>Single invocation of {@link #onError(Throwable)} or {@link Subscriber#onComplete()} which signals a terminal state after which no further events will be sent.
* </ul>
* <p>
* Demand can be signaled via {@link Subscription#request(long)} whenever the {@link Subscriber} instance is capable of handling more.
*
* @param <T> the type of element signaled.
*/
public interface Subscriber<T> {
/**
* Invoked after calling {@link Publisher#subscribe(Subscriber)}.
* <p>
* No data will start flowing until {@link Subscription#request(long)} is invoked.
* <p>
* It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(long)} whenever more data is wanted.
* <p>
* The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}.
*
* @param s
* {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
*/
public void onSubscribe(Subscription s);
/**
* Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(long)}.
*
* @param t the element signaled
*/
public void onNext(T t);
/**
* Failed terminal state.
* <p>
* No further events will be sent even if {@link Subscription#request(long)} is invoked again.
*
* @param t the throwable signaled
*/
public void onError(Throwable t);
/**
* Successful terminal state.
* <p>
* No further events will be sent even if {@link Subscription#request(long)} is invoked again.
*/
public void onComplete();
}
Consumer방식
아래와 같이 subscribe()함수를 consumer 방식으로 코드를 작성할 수 있습니다.
Observable.just(1, 2, 3)
.subscribe({
println(it)
}, {
println(it)
})
실행결과
위와 동일한 코드이므로 당연히 실행결과는 같고 위와는 subscribe()함수가 다른 것을 알 수 있습니다.
▼ Consumer 방식일 때의 subscribe() 함수
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
위의 메소드를 보면 알 수 있듯이 subscribe()를 하는 시점에 Disposable객체를 반환합니다.
RxJava의 생산자
RxJava의 생산자
는 데이터를 생성하고 통지하는 클래스입니다.
RxJava의 생산자들에 대해 알아보겠습니다.
Flowable
Flowable
은 0..n개의 데이터를 전달하는 생산자입니다.
Subscriber가 Flowable을 구독하게 하는 방식으로 소비자가 구독할 수 있습니다.
또한, BackPressureStarategy를 지정하여 배압기능을 사용할 수 있습니다.
BackPressureStrategy
옵션 | 설명 |
BUFFER | 통지할 수 있을 때까지 모든 데이터를 버퍼링한다. |
DROP | 통지할 수 있을 때까지 새로 생성한 데이터를 삭제한다. |
LATEST | 생성한 최신 데이터만 버퍼링하고 생성할 때마다 버퍼링하는 데이터를 교환한다. |
ERROR | 통지 대기 데이터가 버퍼 크기를 초과하면 MissingBackpressureException 에러를 통지한다. |
NONE | 특정 처리를 수행하지 않는다. 주로 onBackPressure로 시작하는 메서드로 배압 모드를 설정할 때 사용한다. |
Observable
Observable
은 0..n개의 데이터를 전달하는 생산자 입니다.
Observer를 subscribe하거나 Consumer를 subscribe하는 방식으로 소비자가 구독할 수 있습니다.
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
onSubscirbe()
데이터를 준비할 완료가 되면 이 함수를 호출합니다.
이 함수에서 Disposable을 매개변수로 받습니다.
onNext()
데이터를 전달할 때 이 함수를 호출합니다.
이 함수에서 데이터를 받아서 처리합니다.
onComplete()
이 함수가 호출되면 Observable이 완료된 것을 의미합니다.
onError()
에러를 전달할 때 이 함수를 호출합니다.
이 함수에서 에러를 받아서 처리합니다.
Single
Single
은 1개의 데이터를 전달하는 생산자 입니다.
SingleObserver를 subscribe하거나 Consumer를 subscribe하는 방식으로 소비자가 구독할 수 있습니다.
public interface SingleObserver<T> {
void onSubscribe(@NonNull Disposable d);
void onSuccess(@NonNull T t);
void onError(@NonNull Throwable e);
}
onSuccess()
데이터를 전달할 때 이 함수를 호출합니다.
이 함수에서 데이터를 받아서 처리합니다.
Maybe
Maybe
는 0..1개의 데이터를 전달하는 생산자 입니다.
MaybeObserver를 subscribe 하거나 Consumer를 subscribe하는 방식으로 소비자가 구독할 수 있습니다.
public interface MaybeObserver<T> {
void onSubscribe(@NonNull Disposable d);
void onSuccess(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete()
}
onSuccess()
데이터를 전달할 때 이 함수를 호출 합니다.
여기서 데이터를 처리합니다.
onComplete()
이 함수가 호출되면 Maybe가 완료된 것을 의미합니다.
Completable
Completable은 0개의 데이터를 전달하는 생산자 입니다.
CompletableObserver를 subscribe하거나 Consumer를 subscribe하는 방식으로 소비자가 구독할 수 있습니다.
public interface CompletableObserver {
void onSubscribe(@NonNull Disposable d);
void onComplete();
void onError(@NonNull Throwable e);
}
onComplete()
이 함수가 호출되면 Completable()이 완료된 것을 의미합니다.
Subejct
Rxjava의 Observable은 기본적으로 차가운 Observable
입니다.
차가운 Observable
-> 구독자가 구독한 시점부터 데이터를 발행하는 Observable입니다.
뜨거운 Observable
-> 구독자의 존재 여부와 관계 없이 데이터를 발행하는 Observable입니다. 따라서 일정 시간이 지난 이후 구독을 한다면 중간 부분부터 구독을 하게 됩니다. 구독자의 경우 모든 데이터를 수신하는 것을 보장하고 있지 않기 때문에 데이터를 발행할 때 배압을 고려해야 합니다.
Cold Observable을 구독하는 모든 옵저버들은 자신만을 위해 독립적으로 실행하는 Observable을 가지는 형태가 됩니다.
하지만, Hot Observable을 구독하는 옵저버들은 단일의 Observable을 구독하게 되는 형태가 됩니다.
이것을, Cold Observable은 Unicast하다. Hot Observable은 MultiCast하다. 라고 합니다.
차가운 Observable을 뜨거운 Observable로 만들려면 publish()나 share()같은 연산자를 사용할 수 있지만 Subject를 이용하는 것도 편리합니다.
Subject의 특징
Observable과 Observer의 성격을 둘 다 가지고 있는 클래스 입니다.
Observable이 가지고 있는 subscribe와 Observable의 연산자들을 사용할 수 있습니다.
Observer가 가지고 있는 onNext, onError, onComplete를 사용할 수 있습니다.
subscribe를 했을 때 데이터를 넘겨주는 방식에 따라 Publish, Behavior, Replay, Async로 나누어져 있습니다.
PublishSubject
구독한 시점부터 데이터를 새로 받아오는 Subject 입니다.
새로 구독하는 Observer는 이전의 데이터를 받지 못합니다.
//publishSubject
val publishSubject = PublishSubject.create<Int>()
publishSubject.subscribe { println("1번째 Observer -> $it") }
publishSubject.onNext(1)
publishSubject.subscribe { println("2번째 Obsevrer -> $it") }
publishSubject.onNext(2)
publishSubject.onNext(3)
publishSubject.subscribe { println("3번째 Observer -> $it") }
publishSubject.onNext(4)
실행결과
BehaviorSubject
BehaviorSubject는 구독하면 subscribe()한 시점에서의 마지막데이터까지 받아옵니다.
BehaviorSubject는 구독직전의 첫번째 데이터를 받아올 수 있으므로 RxBus를 사용해야 할 때 사용하기에 적합합니다.
//behaviorSubject
val behaviorSubject = BehaviorSubject.create<Int>()
behaviorSubject.subscribe { println("1번째 Observer -> $it") }
behaviorSubject.onNext(1)
behaviorSubject.subscribe { println("2번째 Observer -> $it") }
behaviorSubject.onNext(2)
behaviorSubject.onNext(3)
behaviorSubject.subscribe { println("3번째 Observer -> $it") }
behaviorSubject.onNext(4)
실행결과
ReplaySubject
ReplaySubject는 스트림의 중간에 구독을 해도 지금까지 발행된 데이터를 전부 받아올 수 있습니다.
//replaySubject
val replaySubject = ReplaySubject.create<Int>()
replaySubject.subscribe { println("1번째 Observer -> $it") }
replaySubject.onNext(1)
replaySubject.subscribe { println("2번째 Observer -> $it") }
replaySubject.onNext(2)
replaySubject.onNext(3)
replaySubject.subscribe { println("3번째 Observer -> $it") }
replaySubject.onNext(4)
실행결과
AsyncSubject
onComplete()가 되면 가장 마지막 데이터를 받게 됩니다.
//asyncSubject
val asyncSubject = AsyncSubject.create<Int>()
asyncSubject.subscribe { println("1번째 Observer -> $it") }
asyncSubject.onNext(1)
asyncSubject.subscribe { println("2번째 Observer -> $it") }
asyncSubject.onNext(2)
asyncSubject.onNext(3)
asyncSubject.subscribe { println("3번째 Observer -> $it") }
asyncSubject.onNext(4)
asyncSubject.onComplete()
실행결과
스케쥴러
Schedulers.computation()
-> 이벤트 룹에서 간단한 연산이나 콜백 처리를 위해 사용됩니다. RxComputationThreadPool라는 별도의 스레드 풀에서 돌아갑니다. 최대 갯수 개의 스레드 풀이 순환하면서 실행됩니다.
Schedulers.immediate()
-> 현재 스레드에서 즉시 수행합니다.
Schedulers.from(executor)
-> 특정 executor를 스케쥴러로 사용합니다.
Schedulers.io()
-> 동기 I/O를 별도로 처리시켜 비동기 효율을 얻기 위한 스케줄러입니다. 자체적인 스레드 풀 CachedThreadPool을 사용합니다. API 호출 등 네트워크를 사용한 호출 시 사용됩니다.
Schedulers.newThread()
-> 새로운 스레드를 만드는 스케쥴러입니다.
Schedulers.trampoline()
-> 큐에 있는 일이 끝나면 이어서 현재 스레드에서 수행하는 스케쥴러.
AndroidSchedulers.mainThread()
-> 안드로이드의 UI 스레드에서 동작합니다.
HandlerScheduler.from(handler)
-> 특정 핸들러 handler에 의존하여 동작합니다.