Що таке RxJS і як він інтегрується в Angular?
RxJS - це JavaScript-бібліотека для реактивного програмування, яка дозволяє описувати асинхронні потоки даних через Observable-и, оператори і підписки.
Теорія
TL;DR
- RxJS - це як конвеєрна стрічка: дані течуть через неї, ти підключаєшся через
subscribe(), і пропускаєш їх через оператори (фільтри, трансформації) перед отриманням. - Головна різниця від Promise: Promise вирішується один раз; Observable (спостережуваний потік) може видавати нуль, одне або багато значень з часом, і нічого не відбувається до виклику
subscribe(). - Правило вибору: кілька асинхронних джерел або операції з можливістю скасування → RxJS. Один HTTP-запит без трансформацій → Promise підходить.
- Angular 16+ постачається з RxJS 7.8+ і використовує Observable-и в HttpClient, Router і Forms за замовчуванням.
- Забути відписатися - це найпоширеніше джерело витоків пам'яті в Angular-проектах.
Швидкий приклад
import { of } from 'rxjs';
import { map, delay } from 'rxjs/operators';
const user$ = of('Alice').pipe(
map(name => `Hello, ${name}`), // трансформуємо значення
delay(1000) // імітуємо асинхронність
);
user$.subscribe(msg => console.log(msg)); // виводить "Hello, Alice" через 1сof('Alice') створює потік, який видає одне значення і завершується. pipe() з'єднує оператори без змін оригінального потоку. Нічого не запускається до виклику subscribe(). Це і є модель відкладеного виконання (lazy execution).
Ключова різниця
RxJS замінює callback-пекло (вкладені .then()) на декларативні пайплайни. Ти описуєш що робити з кожним значенням: трансформувати, відфільтрувати, скасувати, поєднати з іншим потоком. Бібліотека бере на себе тайминг і стан. Код читається зверху вниз, без ручного відстеження того, чи завершився попередній запит.
Як Angular використовує RxJS
Angular 16+ постачається з RxJS 7.8+ через @angular/core. Майже кожен асинхронний API в Angular повертає Observable:
| Функція Angular | Роль RxJS | Типовий оператор |
|---|---|---|
HttpClient | Всі запити повертають Observable<T> | switchMap для ланцюгових запитів |
| Router | router.events - це Observable<RouterEvent> | filter(e => e instanceof NavigationEnd) |
| Reactive Forms | form.valueChanges - це Observable<FormValue> | debounceTime(300) для валідації |
| Async pipe | Автоматично підписується і відписується в шаблонах | {{ data$ | async }} |
Async pipe заслуговує окремої уваги. Він очищає підписку при знищенні компонента, що прибирає найпоширеніше джерело витоків у Angular-шаблонах.
Як це працює всередині
Observable - це звичайний об'єкт з методом subscribe. Жодного спеціального рантайму. Коли ти викликаєш subscribe(), він запускає функцію-продюсер: синхронно або асинхронно залежно від джерела. Оператори повертають нові Observable-и, які проксують емісії через ланцюг підписників. У RxJS 7+ використовується TeardownLogic для очищення: коли потік завершується або виникає помилка, автоматично виконуються teardown-колбеки.
Тут важлива різниця між cold і hot Observable. Cold Observable запускає новий продюсер для кожного підписника. HttpClient - cold: кожна підписка робить новий HTTP-запит. Hot Observable ділить один продюсер між усіма підписниками. Subject - hot: нові підписники пропускають попередні емісії. Якщо обгорнути cold Observable у shareReplay(1), він поводитиметься як hot.
Коли використовувати
- Кілька асинхронних джерел, які треба поєднати (HTTP + кліки користувача) →
combineLatestабоmerge. - Операції з можливістю скасування, як пошук під час введення →
switchMapскасовує попередній inner Observable. - Потоки в реальному часі (WebSocket, інтервали) → разом з
takeUntilDestroyed()для очищення. - Простий одиночний асинхронний запит без трансформацій → нативний Promise підходить.
- Без асинхронності взагалі → звичайні функції.
Типові помилки
1. Забути відписатися
// неправильно: інтервал працює після знищення компонента
ngOnInit() {
interval(1000).subscribe(v => console.log(v));
}
// виправлення через takeUntilDestroyed (Angular 16+)
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
readonly count$ = interval(1000).pipe(
takeUntilDestroyed() // автоматично відписується при знищенні компонента
);
// шаблон: {{ count$ | async }}Observable-и не очищаються самостійно. Інтервал продовжує працювати після знищення компонента. Використовуй async pipe або takeUntilDestroyed().
2. Використовувати Observable як Promise
// неправильно: TypeError - .then is not a function
this.http.get('/api/users').then(data => console.log(data));
// виправлення
this.http.get('/api/users').subscribe(data => console.log(data));
// або явне перетворення
const data = await firstValueFrom(this.http.get('/api/users'));3. Кілька підписок на cold Observable
// неправильно: виконує 2 окремі HTTP-запити
const users$ = this.http.get('/api/users');
users$.subscribe(u => this.list = u);
users$.subscribe(u => this.count = u.length);
// виправлення: ділимо одне виконання
const users$ = this.http.get('/api/users').pipe(shareReplay(1));4. Ігнорувати помилки
// неправильно: потік зупиняється без жодного сліду
obs$.subscribe(v => console.log(v));
// виправлення: завжди обробляй помилки
obs$.subscribe({
next: v => console.log(v),
error: e => console.error('Потік впав:', e)
});Де зустрічається в реальних проектах
- Angular HttpClient →
switchMapланцюгує оновлення токена перед основним API-запитом. - Angular Router →
router.events.pipe(filter(...))для route guards і аналітики. - NgRx Effects →
actions$.pipe(ofType(loadUsers), switchMap(() => this.service.get())). - Angular Material autocomplete →
debounceTime(300) + distinctUntilChanged() + switchMapна події введення. - Angular 18 →
toSignal()конвертує Observable у сигнал для дрібнозернистої реактивності.
Питання на співбесіді
Q: Яка різниця між cold і hot Observable?
A: Cold Observable запускає новий продюсер для кожного підписника: кожна підписка на HttpClient = новий HTTP-запит. Hot Observable ділить один продюсер між підписниками: Subject і fromEvent на DOM-елементі - hot. Якщо обгорнути cold Observable у shareReplay(1), він поводитиметься як hot.
Q: Як Angular запобігає витокам пам'яті з RxJS?
A: async pipe автоматично відписується при знищенні компонента. З Angular 16 takeUntilDestroyed() робить те саме для ручних підписок у конструкторі або injection context.
Q: Яка різниця між switchMap і mergeMap?
A: switchMap скасовує попередній inner Observable при новій емісії - підходить для пошуку, де застарілі результати непотрібні. mergeMap виконує всі inner Observable паралельно - підходить для незалежних запитів, які мають завершитися всі.
Q: Як конвертувати Observable у Promise?
A: firstValueFrom(obs$) вирішується з першим значенням. lastValueFrom(obs$) чекає завершення потоку і вирішується з останнім значенням.
Q: Спроєктуй NgRx effect, який повторює HTTP-запит після відновлення з'єднання з мережею.
A: Використовуй retry({ delay: () => fromEvent(window, 'online') }) з RxJS 7+ всередині switchMap. Effect чекатиме підключення до мережі перед повтором, а не одразу надсилатиме нові запити. Додай take(1) на retry-сигнал, щоб уникнути нескінченних повторів.
Приклади
Базовий: емісії cold Observable
import { Observable } from 'rxjs';
const data$ = new Observable(sub => {
console.log('Продюсер запущено'); // виводиться при кожній підписці
sub.next(42);
sub.complete();
});
data$.subscribe(v => console.log('Підписка 1:', v)); // Продюсер запущено | Підписка 1: 42
data$.subscribe(v => console.log('Підписка 2:', v)); // Продюсер запущено знову | Підписка 2: 42Кожен виклик subscribe() створює незалежне виконання. Для HTTP це означає дві підписки = два мережевих запити. Обгорни shareReplay(1), якщо потрібно поділити один результат між кількома споживачами.
Середній: пошуковий input з debounce і скасуванням запитів
import { fromEvent } from 'rxjs';
import { debounceTime, map, distinctUntilChanged, switchMap } from 'rxjs/operators';
import { HttpClient } from '@angular/common/http';
const input = document.querySelector<HTMLInputElement>('#search')!;
fromEvent(input, 'input').pipe(
debounceTime(300), // чекаємо 300мс після останнього натискання
map(e => (e.target as HTMLInputElement).value),
distinctUntilChanged(), // пропускаємо, якщо значення не змінилось
switchMap(term => this.http.get(`/api/users?q=${term}`)) // скасовуємо попередній запит
).subscribe(results => console.log(results));switchMap - ключовий оператор у цьому патерні. Коли користувач вводить новий символ до завершення попереднього запиту, switchMap скасовує старий і починає новий. Без нього відповіді можуть прийти в неправильному порядку і перезаписати одна одну.
Просунутий: витік пам'яті і виправлення через takeUntilDestroyed
import { Component, OnInit } from '@angular/core';
import { interval } from 'rxjs';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
// неправильно: витік після знищення компонента
@Component({ selector: 'app-broken', template: '{{ count }}' })
class BrokenComponent implements OnInit {
count = 0;
ngOnInit() {
interval(1000).subscribe(v => {
this.count = v; // продовжує оновлювати знищений компонент
});
}
}
// виправлення: автоматична відписка через API Angular 16+
@Component({ selector: 'app-fixed', template: '{{ count$ | async }}' })
class FixedComponent {
readonly count$ = interval(1000).pipe(
takeUntilDestroyed() // відписується при знищенні компонента
);
}Я зустрічав такий патерн витоку майже в кожній Angular-кодовій базі, де не вистачало code review на lifecycle-методи. Комбінація async pipe і takeUntilDestroyed() зараз є стандартом, і інтерв'юери цілеспрямовано запитують про це, перевіряючи знання Angular 16+.
Коротка відповідь
Для співбесідиКоротка відповідь допоможе вам впевнено відповідати на цю тему під час співбесіди.