Rxjava를 사용할 때 예외처리 과정에서 retryWhen()을 사용하면 재시도를 구현할 수 있습니다.
이번 포스트에서는 retryWhen 메소드에 대해 정리해보겠습니다.
retryWhen의 마블 다이어그램

retryWhen의 마블 다이어그램을 해석
먼저, Observable에서 에러가 발생됩니다. 그러면 retryWhen을 타게 되는데
retryWhen에서 데이터가 발행되면 -> upstream을 재실행합니다.
retryWhen에서 에러가 발행되면 -> 최종적으로 error를 발행합니다.
Flowable의 retryWhen 구현부
public abstract class Flowable<T> implements Publisher<T> {
// ...
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> retryWhen(
final Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler) {
ObjectHelper.requireNonNull(handler, "handler is null");
return RxJavaPlugins.onAssembly(new FlowableRetryWhen<T>(this, handler));
}
}
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
R apply(@NonNull T t) throws Exception;
}
retryWhen함수는 Rxjava에서 제공하는 Function인터페이스인 handler 인자로 받습니다.
그리고 handler는 제네릭으로 Flowable<Throwable>과 Publisher<*>를 받게 됩니다.
결국, Function의 apply()함수는 Flowable<Throwable>을 매개변수로 받고, Publisher<*>를 return하게 됩니다.
Function<Flowable<Throwable>, Publisher<*>>를 상속한 RetryWithDelaySingle 이용
Function<Flowable<Throwable>, Publisher<*>>를 상속한 RetryWithDelaySingle을 만들어줍니다.
apply() 함수에서는 timer()를 이용해서 retryCount 만큼 delayMillis의 딜레이마다 데이터를 던지게 만들어주고
retryCount가 maxRetries를 넘어가면 error를 던지도록 만들어줍니다.
import io.reactivex.Flowable
import io.reactivex.functions.Function
import org.reactivestreams.Publisher
import java.util.concurrent.TimeUnit
class RetryWithDelaySingle(private val maxRetries: Int, private val retryDelayMillis: Int) : Function<Flowable<Throwable>, Publisher<*>> {
constructor(retryDelayMillis: Int) : this(5, retryDelayMillis)
private var retryCount = 0
override fun apply(flowable: Flowable<Throwable>): Flowable<*> = flowable
.flatMap { throwable ->
if (maxRetries < 0 || ++retryCount <= maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed).
Flowable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS)
} else Flowable.error(throwable)
}
}
일정시간의 간격으로 5번의 재시도 후에 error를 던지도록 해줍니다.
fun main() {
Single.error<Throwable>(IllegalArgumentException())
.doOnError {
println("error")
}
.retryWhen(RetryWithDelaySingle(5, 500))
.subscribe({
println("onSuccess()")
}, {
println("onError()")
})
Thread.sleep(2500L)
}
실행결과

첫번째 실행 이후, 뒤에 5번의 재시도를 실행하고, 최종적으로 onError()를 던지는 것을 확인할 수 있습니다.
retryWhen에서 delay()와 take()를 이용
fun main() {
Single.error<Throwable>(IllegalArgumentException())
.doOnError {
println("error")
}
.retryWhen {
it
.delay(500, TimeUnit.MILLISECONDS)
.take(5)
}
.subscribe({
println("onSuccess()")
}, {
println("onError()")
})
Thread.sleep(5000L)
}
람다식을 사용할 경우에는 간단하게 Publisher<*>를 반환해주면 되므로, 위와 같이 delay와 take를 조합하여 간단하게 만들어 줄 수 있습니다.
실행결과도 예제1과 같은 동작을 수행하게 됩니다.
실행결과

감사합니다!
