YOGAE

TODO: FIXME:

RxJS

07 Mar 2019

개념

  • 리액티브 프로그래밍

    • 데이터 스트림이란 역속적인 데이터의 흐름을 말하며 리액티브 프로그래밍은 기본적으로 모든 것을 데이터 스트림으로 봅니다.

    • 동기 / 비동기와 관계없이 데이터를 생산하는 것이라면 무엇이든 시간축을 따라 연속적으로 흐르는 데이터 스트림으로 처리합니다.

    • 다양한 테이터를 데이터 스트림이라는 하나의 일관된 형식으로 만들고, 이 데이터 스트림을 subscribe하여 데이터 스트림의 상태 변화에 반응하는 방식으로 동작합니다.

  • 옵저버블(Observable)

    데이터 스트림을 생성하고 방출하는 객체 - 데이터 생산자

  • 옵저버(Observer)

    옵저버블이 방출한 값을 획득하여 사용하는 객체 - 데이터 소비자

  • 구독(subscription)

    옵저버는 옵저버블에 연결되어 옵저버블의 상태를 관찰

Pull vs Push

  • Pull

    소비자가 Data 생산자로 부터 data를 언제 받을지 결정합니다. 생산자는 소비자에서 언제 data가 전달되었는지 알 수 없습니다. 모든 Javascript Function은 pull system입니다.

  • Push

    Javascript에서 Promise가 push system의 일반적인 type입니다. Promise는 등록된 callback(소비자)에게 받은 data를 전달합니다.

Tutorial

Converting to Observavles

// From one or multiple values
Rx.Observable.of('foo', 'bar');

// From array of values
Rx.Observable.from([1,2,3]);

// From an event
Rx.Observable.fromEvent(document.querySelector('button'), 'click');

// From a Promise
Rx.Observable.fromPromise(fetch('/users'));

// From a callback (last argument is a callback)
// fs.exists = (path, cb(exists))
var exists = Rx.Observable.bindCallback(fs.exists);
exists('file.txt').subscribe(exists => console.log('Does file exist?', exists));

// From a callback (last argument is a callback)
// fs.rename = (pathA, pathB, cb(err, result))
var rename = Rx.Observable.bindNodeCallback(fs.rename);
rename('file.txt', 'else.txt').subscribe(() => console.log('Renamed!'));

Creating observables

외부적으로 새로운 event를 생성하는 방법

var myObservable = new Rx.Subject();
myObservable.subscribe(value => console.log(value));
myObservable.next('foo');

내부적으로 새로운 event를 생성하는 방법

var myObservable = Rx.Observable.create(observer => {
  observer.next('foo');
  setTimeout(() => observer.next('bar'), 1000);
});
myObservable.subscribe(value => console.log(value));

Controlling the flow

// typing "hello world"
var input = Rx.Observable.fromEvent(document.querySelector('input'), 'input');

// Filter out target values less than 3 characters long
input.filter(event => event.target.value.length > 2)
  .map(event => event.target.value)
  .subscribe(value => console.log(value)); // "hel"

// Delay the events
input.delay(200)
  .map(event => event.target.value)
  .subscribe(value => console.log(value)); // "h" -200ms-> "e" -200ms-> "l" ...

// Only let through an event every 200 ms
input.throttleTime(200)
  .map(event => event.target.value)
  .subscribe(value => console.log(value)); // "h" -200ms-> "w"

// Let through latest event after 200 ms
input.debounceTime(200)
  .map(event => event.target.value)
  .subscribe(value => console.log(value)); // "o" -200ms-> "d"

// Stop the stream of events after 3 events
input.take(3)
  .map(event => event.target.value)
  .subscribe(value => console.log(value)); // "hel"

// Passes through events until other observable triggers an event
var stopStream = Rx.Observable.fromEvent(document.querySelector('button'), 'click');
input.takeUntil(stopStream)
  .map(event => event.target.value)
  .subscribe(value => console.log(value)); // "hello" (click)

Producing values

// typing "hello world"
var input = Rx.Observable.fromEvent(document.querySelector('input'), 'input');

// Pass on a new value
input.map(event => event.target.value)
  .subscribe(value => console.log(value)); // "h"

// Pass on a new value by plucking it
input.pluck('target', 'value')
  .subscribe(value => console.log(value)); // "h"

// Pass the two previous values
input.pluck('target', 'value').pairwise()
  .subscribe(value => console.log(value)); // ["h", "e"]

// Only pass unique values through
input.pluck('target', 'value').distinct()
  .subscribe(value => console.log(value)); // "helo wrd"

// Do not pass repeating values through
input.pluck('target', 'value').distinctUntilChanged()
  .subscribe(value => console.log(value)); // "helo world"

Reference

  • https://poiemaweb.com/angular-rxjs
  • http://reactivex.io/rxjs/manual/tutorial.html#converting-to-observables