변환 연산자
는 만들어진 데이터의 흐름을 변환하는 연산자 입니다.
buffer
buffer
는 데이터를 바로 전달하지 않고 buffer의 size만큼 모일 때 까지 기다렸다가 List의 형태로 데이터를 방출하는 연산자입니다.
count는 몇개씩 모아서 방출할 건지를 정하는 변수이고
skip은 몇개의 데이터가 모아졌을 때 전달할 건지 정하는 변수입니다.
뒤로가기 키를 눌렀을 때 호출되는 메소드인 onBackPressed()에서 buffer 연산자를 이용해서 두개의 시간값을 비교해서 뒤로가기를 빠르게 눌렀을 때 종료하는 기능을 만들 수 있습니다.
fun main() {
Observable.fromIterable(0..10)
.buffer(2, 1)
.subscribe { println(it)}
Thread.sleep(1000)
}
실행결과
flatMap
flatMap
은 데이터가 들어오면 여러개의 Observable을 모아서 하나의 새로운 Observable로 만들어 주는 연산자입니다.
즉, RxJava의 flatMap 연산자는 Collections API의 flatMap과 매우 유사하며 데이터 하나당 Observable 하나가 만들어지고 최종적으로는 각각의 Observable이 flatten된 하나의 Observable을 구독하는 형태가 됩니다.
결국, 사용할 때에는 flatMap 람다식 내부에서 각각의 데이터를 처리하는 생산자를 반환해주면 subscribe를 할 때는 이 Observable을 모은 Observable을 구독하게 되는 것입니다.
flatMap은 concatMap과 달리 다음 데이터가 들어오더라도 이전 데이터가 flatMap을 처리했는지에 대해 신경쓰지 않고 다음 데이터에 대한 flatMap에 대한 처리를 수행하기 때문에 일반적으로는 병렬처리를 해야할 때 사용 합니다.
fun main() {
Observable.fromIterable(0..2)
.flatMap({
Observable.interval(100L, TimeUnit.MILLISECONDS)
.take(3)
.map(Long::toInt)
.map {
list[it]
}
},{value, newData-> // Bifunction()
"${Thread.currentThread().getName()} value :$value, newData: $newData"
}).subscribe(::println)
Thread.sleep(1200)
}
실행결과
concatMap
concatMap
은 다음 데이터가 생성이 되어도 이전 데이터가 concatMap 연산자를 전부 처리할 때까지 데이터의 발행을 기다립니다. 일반적으로 순서가 보장되어야 할 때 사용합니다.
fun main() {
Observable.fromIterable(0..2)
.concatMap {
Observable.interval(100L, TimeUnit.MILLISECONDS)
.take(3)
.map(Long::toInt)
.map {
"${Thread.currentThread().getName()} value : $it, newData: ${list[it]}"
}
}.subscribe(::println)
Thread.sleep(1000)
}
실행결과
switchMap
switchMap
은 새로운 데이터가 생성이 되면 이전 Observable은 완료(onComplete)를 통지하고 생성된 데이터에 대한 처리를 수행 합니다.
map
map
은 스트림의 데이터를 다른 형태로 바꿀 때 사용하는 연산자입니다. flatMap, concatMap, switchMap과는 다르게 Observable을 더 추가로 만들거나 하지 않습니다.
fun main() {
Observable
.fromIterable(0..5)
.map { "hello: $it"}
.subscribe {
println(it)
}
Thread.sleep(1000)
}
실행결과
scan
scan
은 이전 데이터와 현재 데이터를 합쳐서 전달하는 연산자입니다.
하지만, 첫 데이터는 조합하지 않고 그대로 전달합니다.
fun main() {
Observable.fromIterable(0..3)
.scan { t1: Int, t2: Int -> t1 + t2 }
.subscribe { println(it) }
Thread.sleep(1000)
}
실행결과
GroupBy
GroupBy
는 Observable을 조건에 따라 여러개의 다른 Observable로 나눕니다.
예를들면, 아래의 예제는 range 연산자로 1~12의 데이터를 생성합니다.
그리고, groupBy 연산자의 조건에 it %3을 주면, 3으로 나눈 나머지가 같은 [0, 1, 2] 3개의 그룹의 Observable을 각각 생성합니다.
즉, 아래와 같은 결과가 나옵니다.
나머지 0 -> [3, 6, 9 , 12]
나머지 1 -> [1, 4, 7, 10]
나머지 2 -> [2, 5, 8, 11]
fun main() {
Observable.range(1, 12)
.groupBy { it % 3 }
.flatMapSingle { group ->
group
.toList()
.map {
group.key to it
}
}
.subscribe({ (key, list) ->
println("observable: key: $key, list: $list")
}, {
})
Thread.sleep(1000)
}
실행결과