Rxjava를 사용할 때 예외처리 과정에서 retryWhen()을 사용하면 재시도를 구현할 수 있습니다.
이번 포스트에서는 retryWhen
메소드에 대해 정리해보겠습니다.
retryWhen의 마블 다이어그램
retryWhen의 마블 다이어그램을 해석
먼저, Observable에서 에러가 발생됩니다. 그러면 retryWhen
을 타게 되는데
retryWhen
에서 데이터가 발행되면 -> upstream을 재실행합니다.
retryWhen
에서 에러가 발행되면 -> 최종적으로 error를 발행합니다.
Flowable의 retryWhen 구현부
public abstract class Flowable<T> implements Publisher<T> {
// ...
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> retryWhen(
final Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler) {
ObjectHelper.requireNonNull(handler, "handler is null");
return RxJavaPlugins.onAssembly(new FlowableRetryWhen<T>(this, handler));
}
}
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
R apply(@NonNull T t) throws Exception;
}
retryWhen함수는 Rxjava에서 제공하는 Function
인터페이스인 handler 인자로 받습니다.
그리고 handler는 제네릭으로 Flowable<Throwable>과 Publisher<*>를 받게 됩니다.
결국, Function의 apply()함수는 Flowable<Throwable>을 매개변수로 받고, Publisher<*>를 return하게 됩니다.
Function<Flowable<Throwable>, Publisher<*>>를 상속한 RetryWithDelaySingle 이용
Function<Flowable<Throwable>, Publisher<*>>를 상속한 RetryWithDelaySingle
을 만들어줍니다.
apply() 함수에서는 timer()를 이용해서 retryCount 만큼 delayMillis의 딜레이마다 데이터를 던지게 만들어주고
retryCount가 maxRetries를 넘어가면 error를 던지도록 만들어줍니다.
import io.reactivex.Flowable
import io.reactivex.functions.Function
import org.reactivestreams.Publisher
import java.util.concurrent.TimeUnit
class RetryWithDelaySingle(private val maxRetries: Int, private val retryDelayMillis: Int) : Function<Flowable<Throwable>, Publisher<*>> {
constructor(retryDelayMillis: Int) : this(5, retryDelayMillis)
private var retryCount = 0
override fun apply(flowable: Flowable<Throwable>): Flowable<*> = flowable
.flatMap { throwable ->
if (maxRetries < 0 || ++retryCount <= maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed).
Flowable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS)
} else Flowable.error(throwable)
}
}
일정시간의 간격으로 5번의 재시도 후에 error를 던지도록 해줍니다.
fun main() {
Single.error<Throwable>(IllegalArgumentException())
.doOnError {
println("error")
}
.retryWhen(RetryWithDelaySingle(5, 500))
.subscribe({
println("onSuccess()")
}, {
println("onError()")
})
Thread.sleep(2500L)
}
실행결과
첫번째 실행 이후, 뒤에 5번의 재시도를 실행하고, 최종적으로 onError()를 던지는 것을 확인할 수 있습니다.
retryWhen에서 delay()와 take()를 이용
fun main() {
Single.error<Throwable>(IllegalArgumentException())
.doOnError {
println("error")
}
.retryWhen {
it
.delay(500, TimeUnit.MILLISECONDS)
.take(5)
}
.subscribe({
println("onSuccess()")
}, {
println("onError()")
})
Thread.sleep(5000L)
}
람다식을 사용할 경우에는 간단하게 Publisher<*>를 반환해주면 되므로, 위와 같이 delay와 take를 조합하여 간단하게 만들어 줄 수 있습니다.
실행결과도 예제1과 같은 동작을 수행하게 됩니다.
실행결과
감사합니다!