RxJS
07 Mar 2019
개념
-
리액티브 프로그래밍
-
데이터 스트림이란 역속적인 데이터의 흐름을 말하며 리액티브 프로그래밍은 기본적으로 모든 것을 데이터 스트림으로 봅니다.
-
동기 / 비동기와 관계없이 데이터를 생산하는 것이라면 무엇이든 시간축을 따라 연속적으로 흐르는 데이터 스트림으로 처리합니다.
-
다양한 테이터를 데이터 스트림이라는 하나의 일관된 형식으로 만들고, 이 데이터 스트림을 subscribe하여 데이터 스트림의 상태 변화에 반응하는 방식으로 동작합니다.
-
-
옵저버블(Observable)
데이터 스트림을 생성하고 방출하는 객체 - 데이터 생산자
-
옵저버(Observer)
옵저버블이 방출한 값을 획득하여 사용하는 객체 - 데이터 소비자
-
구독(subscription)
옵저버는 옵저버블에 연결되어 옵저버블의 상태를 관찰
Pull vs Push
-
Pull
소비자가 Data 생산자로 부터 data를 언제 받을지 결정합니다. 생산자는 소비자에서 언제 data가 전달되었는지 알 수 없습니다. 모든 Javascript Function은 pull system입니다.
-
Push
Javascript에서 Promise가 push system의 일반적인 type입니다. Promise는 등록된 callback(소비자)에게 받은 data를 전달합니다.
Tutorial
Converting to Observavles
// From one or multiple values
Rx.Observable.of('foo', 'bar');
// From array of values
Rx.Observable.from([1,2,3]);
// From an event
Rx.Observable.fromEvent(document.querySelector('button'), 'click');
// From a Promise
Rx.Observable.fromPromise(fetch('/users'));
// From a callback (last argument is a callback)
// fs.exists = (path, cb(exists))
var exists = Rx.Observable.bindCallback(fs.exists);
exists('file.txt').subscribe(exists => console.log('Does file exist?', exists));
// From a callback (last argument is a callback)
// fs.rename = (pathA, pathB, cb(err, result))
var rename = Rx.Observable.bindNodeCallback(fs.rename);
rename('file.txt', 'else.txt').subscribe(() => console.log('Renamed!'));
Creating observables
외부적으로 새로운 event를 생성하는 방법
var myObservable = new Rx.Subject();
myObservable.subscribe(value => console.log(value));
myObservable.next('foo');
내부적으로 새로운 event를 생성하는 방법
var myObservable = Rx.Observable.create(observer => {
observer.next('foo');
setTimeout(() => observer.next('bar'), 1000);
});
myObservable.subscribe(value => console.log(value));
Controlling the flow
// typing "hello world"
var input = Rx.Observable.fromEvent(document.querySelector('input'), 'input');
// Filter out target values less than 3 characters long
input.filter(event => event.target.value.length > 2)
.map(event => event.target.value)
.subscribe(value => console.log(value)); // "hel"
// Delay the events
input.delay(200)
.map(event => event.target.value)
.subscribe(value => console.log(value)); // "h" -200ms-> "e" -200ms-> "l" ...
// Only let through an event every 200 ms
input.throttleTime(200)
.map(event => event.target.value)
.subscribe(value => console.log(value)); // "h" -200ms-> "w"
// Let through latest event after 200 ms
input.debounceTime(200)
.map(event => event.target.value)
.subscribe(value => console.log(value)); // "o" -200ms-> "d"
// Stop the stream of events after 3 events
input.take(3)
.map(event => event.target.value)
.subscribe(value => console.log(value)); // "hel"
// Passes through events until other observable triggers an event
var stopStream = Rx.Observable.fromEvent(document.querySelector('button'), 'click');
input.takeUntil(stopStream)
.map(event => event.target.value)
.subscribe(value => console.log(value)); // "hello" (click)
Producing values
// typing "hello world"
var input = Rx.Observable.fromEvent(document.querySelector('input'), 'input');
// Pass on a new value
input.map(event => event.target.value)
.subscribe(value => console.log(value)); // "h"
// Pass on a new value by plucking it
input.pluck('target', 'value')
.subscribe(value => console.log(value)); // "h"
// Pass the two previous values
input.pluck('target', 'value').pairwise()
.subscribe(value => console.log(value)); // ["h", "e"]
// Only pass unique values through
input.pluck('target', 'value').distinct()
.subscribe(value => console.log(value)); // "helo wrd"
// Do not pass repeating values through
input.pluck('target', 'value').distinctUntilChanged()
.subscribe(value => console.log(value)); // "helo world"
Reference
- https://poiemaweb.com/angular-rxjs
- http://reactivex.io/rxjs/manual/tutorial.html#converting-to-observables