LearnApplyShare

RxJS 의 이해와 사용

April 24, 2020 - [react, RxJS]

RxJS 는 이벤트 기반 프로그래밍에서 함수형 프로그래밍을 이용해 보다 선언적으로 이벤트를 처리할 수 있도록 도와준다.

RxJS는 모든 이벤트를 observable 로 추상화하여 시간에 따른 스트림으로 간주할 수 있게 한다. 그리고 각 이벤트가 observer(이벤트핸들러)에게 전달되기 전에 map, filter 등의 operator 를 이용해 이벤트를 필요한 형태로 재가공하여 observer 에게 전달할 수 있다.

RxJS 는 observable 에서 발생한 이벤트가 observer 에게 전달되기 전에 여러가지 operator 들을 이용하여 이벤트를 보다 선언적으로(직관적으로) 처리할 수 있도록 해준다.

구체적인 예를 통해서 RxJS 의 개념과 동작방식을 알아보자


일반적인 이벤트 처리 방식 예제이다

document.addEventListener('click', () => console.log('Clicked!'))

이를 RxJS 를 처리하면 아래와 같다.

import {fromEvent} from 'rxjs'

const observable = fromEvent(document, 'click')
const observer = () => console.log('Clicked!')
observable.subscribe(observer)
  1. fromEvent 를 이용해서 이벤트를 observable 객체로 만든다
  2. observer(이벤트핸들러) 를 정의한다
  3. observer 가 observable 을 구독한다

아직까지는 간단한 것을 굳이 왜 이렇게 복잡하게 해야하나 의문이 생긴다. 아직까지는 RxJS 의 능력?을 실감할 수 없다.


요건이 조금 추가되어서 클릭한 횟수를 매번 표시해야 한다고 해보자.

let count = 0
document.addEventListener('click', () => console.log(`Clicked ${++count} times`))

핸들러가 count 변수를 참조하면서 순수성을 잃었다. 하지만 RxJS 를 이용하면 아래와 같이 순수함수만을 이용하여 처리가 가능하다.

import {fromEvent} from 'rxjs'
import {scan} from 'rxjs/operators'

const observable = fromEvent(document, 'click').pipe(scan(count => count + 1, 0))
const observer = count => console.log(`Clicked ${count} times`)
observable.subscribe(observer)

Note)

RxJS 를 이용하여 이벤트를 다룰 때는 항상 아래와 같은 절차를 거치게 된다.

  1. observable 정의

    • 어떤 이벤트들을 하나의 observable 로 묶을 것인가
    • 적절한 operator 를 이용하여 obserable 을 가공
  2. observer 정의

    • observable 로 부터 이벤트를 전달받으면 어떻게 처리할 지를 정의
  3. observable 와 observer 를 연결

    • observer 와 observable 을 연결한다(observable.subscribe())

Observable

Observable 은 시간 흐름에 따라 발생하는 이벤트들의 집합이다. 단순하게 이벤트 스트림으로서 생각해 볼 수 있다.

import {Observable} from 'rxjs'

const observable = new Observable(subscriber => {
  subscriber.next(1)
  subscriber.next(2)
  subscriber.next(3)
  setTimeout(() => {
    subscriber.next(4)
    subscriber.complete()
  }, 1000)
})

console.log('just before subscribe')
observable.subscribe({
  next(x) {
    console.log('got value ' + x)
  },
  error(err) {
    console.error('something wrong occurred: ' + err)
  },
  complete() {
    console.log('done')
  },
})
console.log('just after subscribe')

위에서 정의된 observable 은 observer 에게 연속으로 1,2,3 을 푸시하고 1초 후 4를 푸시하고 마치는 간단한 observable 이다.

이 observable 에 observer 가 등록되면(observer 가 observable 을 구독하면) observer 는 연속으로 1,2,3 을 전달 받고 1초 후 4를 전달받고 끝이 난다.


Note)

  1. observable.subscribe 는 observer 를 인자로 받는다.
  2. observable 구독시 next, error, complete 3가지 핸들러 2가지 형태로 전달할 수 있다.

    1. ex1) observable.subscribe({next, error, complete})
    2. ex2) observable.subscribe(next, error, complete)
  3. observer 가 observable 을 구독할 때 1,2,3 을 전달받은 시점이 ‘just after subscribe’ 출력 전에 이루어졌다는 것에 유의한다.

    • observable 은 이벤트를 동기 또는 비동기로 발생시킬 수 있음을 꼭 기억한다.

Subscription

observable 를 구독하면(subscribe) subscription 객체가 리턴된다. subscription 객체를 이용해 해당 구독을 취소할 수 있다. subscription 은 구독을 취소하기 위한 용도로서만 사용된다.

import {interval} from 'rxjs'

const observable = interval(1000)
const subscription = observable.subscribe(x => console.log(x))
// Later:
// This cancels the ongoing Observable execution which
// was started by calling subscribe with an Observer.
subscription.unsubscribe()

Subject

Subject 는 멀티캐스트를 위한 특별한 유형의 Observable 이다. 일반적으로는 하나의 observable 에는 하나의 observer 만 등록할 수 있다. 하지만 Subject 를 이용하면 하나의 observable 을 구독하는 여러개의 observer 를 생성하고 등록할 수 있다.

import { Subject, from } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});

const observable = from([1, 2, 3]);

observable.subscribe(subject); // You can subscribe providing a Subject

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

위 예제는 먼저 subject 를 생성하고 해당 subject 를 구독하는 2개의 observer 를 subject 를 등록한다. 그리고 subject 가 observer 로서 observable 을 구독하게 한다. 그러면 observable 의 이벤트가 그대로 subject 에 등록된 2개의 observer 에게 모두 전달된다.


Subject 는 observable 이지만 observer 로서 또 다른 observable 을 구독할 수 있다


Ref.

https://rxjs-dev.firebaseapp.com/guide/overview