RxJava 개념정리에 대한 포스트들을 정리할 때 참고한 책은 스다 토모유키님이 쓰신 RxJava 리액티브 프로그래밍이고 StudyFork(정석준님 안드로이드 스터디)에서 진행한 RxJava 스터디에서 공부한 내용 또한 같이 정리하였습니다.
Reactive Programming이란?
Rxjava
를 공부하기에 앞서서 Reactive Programming이란 무엇인가에 대해 알아야 할 필요가 있습니다.
Reactive Programming이란, 데이터가 통지될 때마다 관련 프로그램이 반응(Reaction)해 데이터를 처리하는
프로그래밍 방식입니다.
예를 들면, GPS 위치 정보가 변경될 때의 데이터 전송 흐름을 상상해보면 이해가 쉽습니다. 이동해 위치 정보가 변경될 때마다
데이터를 전송하고 이동을 멈추면 데이터 전송도 중지하는 것 처럼 생성되는 데이터를 한 번에 보내지 않고 각각의 데이터가 생성될 때마다
순서대로 보냅니다. 이러한 데이터 흐름을 데이터 스트림(data stream)
이라고 합니다.
다른 예를 들면, 'abc'라고 입력하면 입력 이벤트가 발생 할 때 다음과 같은 데이터가 생성된다고 생각할 수 있습니다.
- [a]
- [ab]
- [abc]
버튼을 누르는 행위에 대한 구체적인 데이터가 없더라도 '버튼을 누르다'라는 이벤트는 데이터가 생성된다고 생각할 수 있습니다.
버튼을 계속 누른다면 누른 횟수만큼 '버튼을 '누르다'라는 이벤트가 계속 발생합니다. 즉, 이벤트도 발생할 때마다 데이터를
전송하는 데이터 스트림으로 다룰 수 있습니다.
리액티브 프로그래밍은 이러한 데이터 스트림으로 데이터를 전달받은 프로그램이 그때마다 적절히 처리할 수 있게 구성되었습니다.
종합해보면 프로그램이 필요한 데이터를 직접 가져와 처리하는 것이 아니라 보내온 데이터를 받은 시점에 반응해 이를 처리하는 프로그램을 만드는 것이 Reactive Programming
입니다.
Rxjava의 특징
Rxjava
는 디자인 패턴인 옵저버(Observer) 패턴을 확장했습니다. 옵저버 패턴은 객체의 상태 변화를 관찰자들, 즉 옵저버들의 목록을 객체에 등록하여 상태 변화가 있을 때 마다
메서드 등을 통해 객체가 직접 목록의 각 옵저버에게 통지하도록 하는 디자인 패턴입니다. 이 패턴의 특징을 잘 살리면 쉽게 데이터를 생성하는 측과 데이터를 소비하는 측으로 나눌 수 있기 때문에 쉽게 데이터 스트림을 처리할 수 있습니다.
즉, 데이터를 통지하는 측이 무엇을 하더라도 데이터를 받는 측의 처리가 받은 데이터에 의해서만 바뀌게 된다면 비동기로 쉽게 전환할 수 있습니다.
코틀린으로 구현한 옵저버 패턴
typealias Observer<T> = (event:T) -> Unit
class EventSource<T> {
private var observers = mutableListOf<Observer<T>>()
fun addObserver(observer: Observer<T>) {
observers.add(observer)
}
fun notify(event: T) {
for(observer in observers) {
observer.invoke(event)
}
}
}
fun main() {
val eventSource = EventSource<String> ()
eventSource.addObserver { println("첫 번째 옵저버: $it") }
eventSource.addObserver { println("두 번째 옵저버: $it") }
eventSource.notify("Hello")
eventSource.notify("Observer Pattern")
}
실행결과
Reactive Streams
Reactive Streams
란 라이브러리나 프레임워크에 상관없이 데이터 스트림을 비동기로 다룰 수 있는 공통 메커니즘으로 이 메커니즘을 편리하게 사용할 수 있는 인터페이스를 제공합니다.
Reactvie Streams는 데이터를 만들어 통지하는 Publisher(생산자)와 통지된 데이터를 받아 처리하는 Subscriber(소비자)로 구성됩니다. Subscriber가 Publisher를 구독(subscribe)하면 Publisher가 통지한 데이터를 Subscriber가 받을 수 있습니다.
그럼 Publisher가 데이터를 통지한 후 Subscriber가 이 데이터를 받을 때 까지의 데이터 흐름을 살펴봅시다.
먼저 Publisher는 통지 준비가 끝나면 이를 Subscriber에 통지(onSubscribe)합니다. 해당 통지를 받은 Subscriber는 받고자 하는 데이터 개수를 요청합니다. 이때 Subscriber가 자신이 통지 받을 데이터 개수를 요청하지 않으면 Publisher는 통지해야 할 데이터 개수 요청을 기다리게 되므로 통지를 시작할 수 없습니다.
그 다음, Publisher는 데이터를 만들어 Subscriber에 통지(onNext)합니다. 이 데이터를 받은 Subscrbier는 받은 데이터를 사용해 처리 작업을 수행합니다. Publisher는 요청받은 만큼의 데이터를 통지한 뒤 Subscriber로부터 다음 요청이 올 때 까지 데이터 통지를 중단합니다. 이후 Subscriber가 처리 작업을 완료하면 다음에 받을 데이터 개수를 Publisher에 요청합니다. 이 요청을 보내지 않으면 Publisher는 요청 대기 상태가 돼 Subscriber에 데이터를 통지할 수 없습니다.
Publisher는 Subscriber에 모든 데이터를 통지하고 마지막으로 데이터 전송이 완료돼 정상 종료됐다고 통지(onComplete)합니다. 완료 통지를 하고 나면 Publisher는 이 구독 건에 대해 어떤 통지도 하지 않습니다. 또한, Publisher는 처리 도중에 에러가 발생하면 Subscriber에 발생한 에러 객체와 함께 에러를 통지(onError)합니다.
Subscriber가 Publisher에 통지받을 데이터 개수를 요청하는 것은 Publisher가 통지하는 데이터 개수를 제어하기 위해서입니다. 예를 들어, Publisher와 Subscriber의 처리가 각각 다른 스레드에서 진행되는데 Publisher의 통지 속도가 빠르면 Subscriber가 소화할 수 없을 만큼 많은 처리 대기 데이터가 쌓입니다. 이를 막기 위해 Publisher가 통지할 데이터 개수를 Subscriber가 처리할 수 있을 만큼 제어하는 수단이 필요합니다.
Publisher<T>
package org.reactivestreams;
/**
* A {@link Publisher} is a provider of a potentially unbounded number of sequenced elements, publishing them according to
* the demand received from its {@link Subscriber}(s).
* <p>
* A {@link Publisher} can serve multiple {@link Subscriber}s subscribed {@link #subscribe(Subscriber)} dynamically
* at various points in time.
*
* @param <T> the type of element signaled.
*/
// 데이터를 통지하는 생산자
public interface Publisher<T> {
/**
* Request {@link Publisher} to start streaming data.
* <p>
* This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}.
* <p>
* Each {@link Subscription} will work for only a single {@link Subscriber}.
* <p>
* A {@link Subscriber} should only subscribe once to a single {@link Publisher}.
* <p>
* If the {@link Publisher} rejects the subscription attempt or otherwise fails it will
* signal the error via {@link Subscriber#onError}.
*
* @param s the {@link Subscriber} that will consume signals from this {@link Publisher}
*/
// 데이터를 받는 Subscriber 등록
public void subscribe(Subscriber<? super T> s);
}
Subscriber<T>
package org.reactivestreams;
/**
* Will receive call to {@link #onSubscribe(Subscription)} once after passing an instance of {@link Subscriber} to {@link Publisher#subscribe(Subscriber)}.
* <p>
* No further notifications will be received until {@link Subscription#request(long)} is called.
* <p>
* After signaling demand:
* <ul>
* <li>One or more invocations of {@link #onNext(Object)} up to the maximum number defined by {@link Subscription#request(long)}</li>
* <li>Single invocation of {@link #onError(Throwable)} or {@link Subscriber#onComplete()} which signals a terminal state after which no further events will be sent.
* </ul>
* <p>
* Demand can be signaled via {@link Subscription#request(long)} whenever the {@link Subscriber} instance is capable of handling more.
*
* @param <T> the type of element signaled.
*/
// 데이터를 받아 처리하는 소비자
public interface Subscriber<T> {
/**
* Invoked after calling {@link Publisher#subscribe(Subscriber)}.
* <p>
* No data will start flowing until {@link Subscription#request(long)} is invoked.
* <p>
* It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(long)} whenever more data is wanted.
* <p>
* The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}.
*
* @param s
* {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
*/
// 구독 시작 시 처리
public void onSubscribe(Subscription s);
/**
* Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(long)}.
*
* @param t the element signaled
*/
// 데이터 통지 시 처리
public void onNext(T t);
/**
* Failed terminal state.
* <p>
* No further events will be sent even if {@link Subscription#request(long)} is invoked again.
*
* @param t the throwable signaled
*/
// 에러 통지 시 처리
public void onError(Throwable t);
/**
* Successful terminal state.
* <p>
* No further events will be sent even if {@link Subscription#request(long)} is invoked again.
*/
// 완료 통지 시 처리
public void onComplete();
}
Subscription
package org.reactivestreams;
/**
* A {@link Subscription} represents a one-to-one lifecycle of a {@link Subscriber} subscribing to a {@link Publisher}.
* <p>
* It can only be used once by a single {@link Subscriber}.
* <p>
* It is used to both signal desire for data and cancel demand (and allow resource cleanup).
*
*/
// 생산자와 소비자를 연결하는 인터페이스
public interface Subscription {
/**
* No events will be sent by a {@link Publisher} until demand is signaled via this method.
* <p>
* It can be called however often and whenever needed—but if the outstanding cumulative demand ever becomes Long.MAX_VALUE or more,
* it may be treated by the {@link Publisher} as "effectively unbounded".
* <p>
* Whatever has been requested can be sent by the {@link Publisher} so only signal demand for what can be safely handled.
* <p>
* A {@link Publisher} can send less than is requested if the stream ends but
* then must emit either {@link Subscriber#onError(Throwable)} or {@link Subscriber#onComplete()}.
*
* @param n the strictly positive number of elements to requests to the upstream {@link Publisher}
*/
// 통지받을 데이터 개수 요청
public void request(long n);
/**
* Request the {@link Publisher} to stop sending data and clean up resources.
* <p>
* Data may still be sent to meet previously signalled demand after calling cancel.
*/
// 구독 해지
public void cancel();
}
Processor
package org.reactivestreams;
/**
* A Processor represents a processing stage—which is both a {@link Subscriber}
* and a {@link Publisher} and obeys the contracts of both.
*
* @param <T> the type of element signaled to the {@link Subscriber}
* @param <R> the type of element signaled by the {@link Publisher}
*/
// Publisher와 Subscriber의 기능이 모두 있는 인터페이스
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
Publisher와 Subscriber가 사용하는 Subscription은 통지받을 데이터 개수를 지정해 데이터 통지를 요청하거나 통지받지 않게 구독을 해지할 때 사용하는 인터페이스입니다. Subscription은 Publisher에서 인스턴스가 생성돼 통지 준비가 끝났을 때 호출하는 onSubscrie메서드의 인자로 Subscriber에 전달됩니다. 이 Subscription을 받은 Subscriber는 Subscription의 메서드를 호출해 데이터 개수를 요청하거나 구독을 해지합니다.
또한, onNext 메서드에서 이 Subscription을 사용하려면 onSubscribe 메서드로 전달받은 Subscription이 Subscriber 내부에 있어야 합니다.
val flowable = Flowable.just(1)
.subscribe(object: Subscriber<Int> {
// 데이터 개수 요청과 구독 해지를 하는 객체
private lateinit var subscription: Subscription
// 구독 시작시 처리
override fun onSubscribe(s: Subscription?) {
// 받은 Subscription을 Subscriber 내부에 보관한다.
if(s != null) {
subscription = s
// 받을 데이터 개수를 요청한다
subscription.request(1L)
}
}
override fun onNext(t: Int?) {
// 다음에 받을 데이터 개수를 요청한다.
subscription.request(1L)
}
override fun onError(t: Throwable?) {
t?.stackTrace
}
override fun onComplete() {
println("완료")
}
})
다만, Subscription의 처리가 Subscriber 외부에서는 호출되지 않는다는 것을 전제로 구현할 수 있습니다. 하지만 RxJava처럼 외부에서 구독을 해지할 방법을 제공한다면 Subscription이 비동기로 호출돼도 문제가 없게 구현해야 합니다.
추가로, Processor라는 인터페이스가 있는데, 이 Processor는 Publisher와 Subscriber 모두를 상속받아 데이터 통지와 수신이 가능합니다. 즉, Processor는 다른 Publisher를 구독하거나 다른 Subscriber가 자신을 구독하게 할 수 있습니다.
Reactive Streams의 규칙
Reactive Streams
는 앞서 소개한 인터페이스로 데이터를 통지하는 구조를 제공합니다. 하지만 이 구조가 제대로 작동하려면 Reactive Streams의 규칙을 따라야합니다.
Reactive Streams의 기본 규칙은 다음과 같습니다.
- 구독 시작 통지(onSubscribe)는 해당 구독에서 한 번만 발생한다.
- 통지는 순차적으로 이루어진다.
- null을 통지하지 않는다.
- Publisher의 처리는 완료(onComplete) 또는 에러(onError)를 통지해 종료한다.
Reactive Streams에서 구독 시작 통지는 해당 구독에서 한 번만 수행됩니다. 따라서 Subscriber의 onSubscribe 메서드는 구독을 시작해 통지 준비가 끝났을 때 한 번만 실행됩니다. 단, 다음 작업은 추천하지 않지만, 처리가 종료된 후에 같은 Publisher와 Subscriber로 subscribe 메서드를 호출하면 다시 onSubscribe 메서드가 실행됩니다. 이는 처리가 끝난 뒤에 subscribe 메서드를 호출하면 새로운 구독을 시작한다고 생각하기 때문입니다. 그러나 같은 인스턴스를 다시 사용해 subscribe 메서드를 호출할 때 Publisher나 Subscriber 내부의 관리 상태를 초기화하지 않으면 의도하지 않은 결과가 발생할 수도 있으므로 주의해야 합니다.
또한, Reactive Streams에서는 데이터 통지가 순차적으로 이루어집니다. 즉, 여러 통지를 동시에 할 수 없습니다. 이는 Reactive Streams 사양에 큰 영향을 미친 RxJava의 'Observable 규악(Observable contract) 이라는 규칙에 따른 것으로, 데이터가 동시에 통지돼 불일치가 발생하는 것을 방지합니다.
그리고 Reactive Streams에서는 null을 통지할 수 없습니다. 만약 null을 통지하면 Reactive Streams에서 NullPointerException이 발생합니다. 이는 데이터를 통지할 때만 아니라 에러를 통지할 때도 마찬가지입니다.
마지막으로 Reactive Streams에서는 완료나 에러를 통지하면 Publisher가 처리를 끝마친 것으로 판단합니다. 이는 완료나 에러 통지를 마친 구독은 더 이상 통지하지 않는다는 의미입니다. 예를 들어, 완료를 통지한 뒤에 에러가 발생했다면 이 에러는 통지하지 않으므로 정상적으로 종료됐다고 생각할 위험성이 있습니다.
Rxjava
의 또 다른 특징으로 쉬운 비동기 처리를 만들 수 있습니다. Reactive Streams 규칙의 근간이 되는 Observable 규약이라는 Rxjava 개발 가이드라인을 따른 구현이라면 직접
스레드(Thread)를 관리하는 번거로움에서 해방될 뿐만 아니라 구현도 간단하게 할 수 있습니다. 또한 동기 처리나 비동기 처리나 구현 방법에 큰 차이가 없는 것도 RxJava의 큰 특징 입니다.
RxJava는 ReactiveX(Reactive Extensions)의 JVM 구현입니다.
Rx에는 크게 3가지의 역할이 있습니다.
생산자
-> 데이터를 생산하여 전달
소비자
-> 데이터를 받아서 처리
연산자
-> 데이터를 중간에서 가공(생산, 변환, 조합, 거름) 처리
Rxjava는 소비자(Subscriber/Observer)가 생산자(Observable/Flowable/Completable/Maybe/Single)를 구독하는 형태로 만들어져 있습니다.
간단히 말하면, 생산자(Publisher)는 데이터를 통지하는 클래스이며, 소비자(Subscriber)는 통지된 데이터를 전달받아 이 데이터를 처리합니다.
3가지 역할의 예시
Observable // 생산자
.just(0, 1, 2, 3) // 생산 연산자
.map { it * 2 } // 변경 연산자
.subscribe { // 소비자
println(it)
}
그리고 각 역할들은 각각 다른 Scheduler에서 처리를 할 수 있습니다.
Disposable이란?
Disposable
은 구독을 해지하는 메서드를 포함하는 인터페이스
subscribe()함수를 실행하면 Disposable 객체가 반환 됩니다.
Disposable을 통해 데이터를 생산해서 소비까지 시간이 오래 걸리는 경우에 현재 작업을 취소할 수 있습니다.
Disposable#dispose() 함수를 통해 현재 작업을 취소할 수 있습니다.
구독 해지 뿐만 아니라 자원을 해제하는 등의 처리에도 활용 가능합니다. 만약, 처리가 끝날 때에 자원을 해제해야 한다면 dispose()메소드에 구현하면 됩니다.
예시
- 새로운 작업을 진행 할 때 이전 작업을 취소
- 화면을 떠날 때 진행 하던 작업을 취소
CompositeDisposable이란?
CompositeDisposable
은 Disposable을 저장하는 Container
CompositeDisposable은 여러 Disposable을 모아 CompositeDisposable의 dispose()메소드를 호출함으로써 가지고 있는 모든 Disposable의 dispose 메서드를 호출할 수 있는 클래스 입니다.
add(), addAll() 함수로 Disposable 객체를 추가 할 수 있으며, clear(), dispose() 함수로 dispose()를 처리 할 수 있습니다.
clear()와 dispose()의 차이는 무엇일까?
- clear()를 호출하면 호출한 CompositeDisposable 객체는 재사용 가능합니다.
- dispose()를 호출하면 호출한 CompositeDisposable 객체는 재사용이 불가능합니다. ( add()하는 시점에 dispose() 시켜버립니다.)
val compositeDisposable = CompositeDisposable() val disposable1 = Observable.just(1, 2) .subscribe { println(it) } compositeDisposable.add(disposable1) compositeDisposable.clear() // dispose() val disposable2 = Observable.just(3, 4) .delay(1, TimeUnit.SECONDS) .subscribe { println(it) } compositeDisposable.add(disposable2) Thread.sleep(2000L) val compositeDisposable2 = CompositeDisposable() val disposable3 = Observable.just(1, 2) .subscribe { println(it) } compositeDisposable2.add(disposable3) compositeDisposable2.dispose() // dispose() val disposable4 = Observable.just(3, 4) .delay(1, TimeUnit.SECONDS) .subscribe { println(it) } compositeDisposable2.add(disposable4) Thread.sleep(2000L)
실행결과
위에 실행결과를 보면 알 수 있듯이 첫번째 compositable 객체는 clear()함수를 호출해서 재사용 가능 하므로
1,2,3,4가 모두 찍히는 것을 알 수 있고
두번째 CompositeDisposable 객체는 dispose()함수의 호출로 인해 재사용이 불가능 하므로
1,2만 찍히는 것을 알 수 있습니다.
clear()함수의 내부구조
public void clear() {
if (disposed) {
return;
}
OpenHashSet<Disposable> set;
synchronized (this) {
if (disposed) {
return;
}
set = resources;
resources = null;
}
dispose(set);
}
dispose()함수의 내부구조
@Override
public void dispose() {
if (disposed) {
return;
}
OpenHashSet<Disposable> set;
synchronized (this) {
if (disposed) {
return;
}
disposed = true;
set = resources;
resources = null;
}
dispose(set);
}
clear()와는 달리 dipose()함수는 내부구조에서 disposed 라는 flag 변수를 통해
함수가 불릴 때 disposed = true가 되어서 재활용 할 수 없음을 알 수 있습니다.
마블 다이어그램
출처 : http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html
마블 다이어그램은 데이터의 흐름을 도식화한 것 입니다.
알록달록한 색깔로 되어 있는 도형들은 데이터이고 첫번 째 줄에서 시간 순으로 데이터가 발행 되는 것을 뜻 합니다.
데이터를 발행할 때에는 onNext()함수를 통해 발행 됩니다.
통지하는 데이터를 생성하거나 필터링 또는 변환하는 메서드를 연산자라고 합니다.
메서드를 순차적으로 연결해 나가는 것을 메서드 체이닝(method chaining)
이라고 하고 이러한 메서드들의 연결을 통해서
최종 데이터의 처리를 단계적으로 설정할 수 있습니다.
위 그림에서 점선 화살표는 각각 입력, 출력을 뜻하고, 연산자를 통해 변환 되는 것을 알 수 있고, 실제로는 존재하지 않는 연산자지만
flip()함수를 통해 출력된 도형의 모양이 뒤집혀 진 것을 확인 할 수 있고 결과는 새로운 Observable 객체가 생성됩니다.
파이프(ㅣ)
-> 정상적으로 데이터 발행을 완료했다는 것을 의미합니다. 이 때 onComplete() 함수가 실행 됩니다.
x 표시
-> 에러가 발생하였음을 의미합니다. 이 때 onError() 함수가 실행 됩니다.
따라서 이 마블 다이어그램은 데이터가 3개까지만 발행되고 에러가 발생되었 다는 것을 알 수 있습니다.
다음 포스트에는 생산자와 소비자에 대해 정리하겠습니다.