Звичайні оператори RxJS в Angular
Оператори RxJS - це чисті функції, які будують ланцюжки Observable для трансформації, фільтрації та комбінування потоків асинхронних даних в Angular.
Теорія
TL;DR
- Оператори як інструменти на конвеєрі: кожен обробляє або сортує елементи, не зупиняючи лінію
- Чотири категорії: трансформація (map, switchMap), фільтрація (filter, debounceTime), комбінування (combineLatest, forkJoin), обробка помилок (catchError, retry)
- Правило вибору: switchMap для пошуку (перемагає останній), mergeMap для паралельних запитів (всі виконуються), concatMap для збережень у порядку (черга)
- Оператори не змінюють джерельний Observable. Кожен повертає новий.
Швидкий приклад
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
of(1, 2, 3, 4).pipe(
filter(n => n % 2 === 0), // залишає: 2, 4
map(n => n * 2) // перетворює на: 4, 8
).subscribe(console.log); // виводить: 4, 8filter спрацьовує першим, map обробляє те, що пройшло. Джерело не змінюється.
Оператори вирівнювання: switchMap, mergeMap, concatMap
Ці три трапляються майже на кожній Angular-співбесіді. Всі перетворюють емісії джерела на внутрішні Observable, але по-різному вирішують питання паралельності.
switchMap скасовує попередній внутрішній Observable, коли надходить нова емісія. Користувач набрав "an", потім "ang"? Запит для "an" скасовано. Ідеально для пошукових полів.
mergeMap запускає всі внутрішні Observable паралельно. Три емісії запускають три одночасних запити. Підходить для незалежних запитів, де порядок не важливий.
concatMap ставить внутрішні Observable у чергу і обробляє їх по одному. Наступний стартує лише після того, як попередній завершився. Використовуй, коли порядок має значення.
Коли використовувати
- Пошук під час введення:
debounceTime(300)+switchMap- debounce зменшує кількість запитів, switchMap скасовує застарілі - Паралельні API-запити:
mergeMapабоforkJoin- всі запити виконуються одночасно - Збереження у порядку:
concatMap- перший запит завершується до початку другого - Форма з кількома джерелами:
combineLatest- реагує на зміну в будь-якому потоці - Одноразовий HTTP-запит:
take(1)- завершується після першої емісії - Очищення компонента:
takeUntil(destroy$)- зупиняє потік при знищенні компонента
Таблиця порівняння
| Оператор | Поведінка | Скасовує попередній? | Паралельно? | Приклад використання |
|---|---|---|---|---|
map | Трансформує кожну емісію | Ні | Ні | Витягти дані з відповіді API |
switchMap | Перемикається на внутрішній, скасовує попередній | Так | Ні | Пошукове поле |
mergeMap | Запускає всі внутрішні паралельно | Ні | Так | Завантаження кількох профілів |
concatMap | Черга внутрішніх, один за одним | Ні | Ні | Послідовне збереження |
filter | Пропускає лише якщо умова виконана | N/A | N/A | Null-перевірки, умови |
debounceTime | Чекає паузу перед емісією | N/A | N/A | Дроселювання натискань клавіш |
combineLatest | Емітує коли будь-яке джерело емітує (з усіма останніми значеннями) | Ні | Так | Кілька джерел даних |
forkJoin | Чекає завершення всіх джерел | Ні | Так | Паралельні HTTP-запити |
catchError | Перехоплює помилки, повертає запасне значення | N/A | N/A | Відновлення після помилок |
Як оператори працюють внутрішньо
Кожен оператор повертає новий Observable, який підписується на джерело та застосовує логіку через об'єкти Subscriber у ланцюжку pipe. Джерело ніколи не змінюється. Zone.js патчить асинхронні операції (fetch, таймери, події браузера), тому Angular автоматично підхоплює емісії та оновлює DOM. Саме тому після HTTP-відповіді через async pipe не потрібно вручну викликати detectChanges().
Типові помилки
Забули .pipe():
// Помилка - .map() не є прямим методом Observable в RxJS 6+
this.http.get('/api/users').map(res => res.data);
// Правильно
this.http.get('/api/users').pipe(map(res => res.data));switchMap без debounceTime перевантажує сервер:
// Помилка - кожне натискання клавіші надсилає запит
this.searchCtrl.valueChanges.pipe(
switchMap(q => this.http.get(`/api/search?q=${q}`))
);
// Правильно
this.searchCtrl.valueChanges.pipe(
debounceTime(300),
switchMap(q => this.http.get(`/api/search?q=${q}`))
);Вкладені subscribe замість вирівнювання:
// Помилка - немає очищення, callback hell
this.http.get('/api/user').subscribe(user => {
this.http.get(`/api/orders/${user.id}`).subscribe(orders => { /* ... */ });
});
// Правильно
this.http.get<User>('/api/user').pipe(
switchMap(user => this.http.get(`/api/orders/${user.id}`))
).subscribe(orders => { /* ... */ });catchError на зовнішньому потоці вбиває все після першої помилки:
// Помилка - один невдалий запит завершує весь потік
input$.pipe(
switchMap(val => this.http.get(`/api/${val}`)),
catchError(() => of([])) // потік завершується тут після першої помилки
).subscribe();
// Правильно - обробляй помилки всередині внутрішнього Observable
input$.pipe(
switchMap(val =>
this.http.get(`/api/${val}`).pipe(
catchError(() => of([])) // зовнішній потік виживає
)
)
).subscribe();Не відписуватись від довготривалих потоків:
// Помилка - витік пам'яті після знищення компонента
ngOnInit() {
this.router.events.subscribe(event => { /* ... */ });
}
// Правильно
private destroy$ = new Subject<void>();
ngOnInit() {
this.router.events.pipe(
takeUntil(this.destroy$)
).subscribe(event => { /* ... */ });
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}Де зустрічається в реальних проектах
- Angular Material autocomplete:
debounceTime + switchMapдля підказок під час введення - NGRX Effects:
switchMapабоmergeMapдля HTTP-запитів, спровокованих дією - AngularFire (Firestore):
combineLatestдля синхронізації кількох real-time колекцій - Route guards:
switchMapдля завантаження даних до завершення навігації - Очищення компонентів:
takeUntil- стандартний патерн проти витоків пам'яті
Питання на співбесіді
Q: Яка різниця між switchMap, mergeMap і concatMap?
A: switchMap скасовує попередній внутрішній Observable при новій емісії, перемагає останній. mergeMap запускає всі паралельно. concatMap ставить їх у чергу і обробляє по одному, зберігаючи порядок.
Q: Коли використовувати exhaustMap?
A: Коли потрібно ігнорувати нові емісії, поки поточний внутрішній Observable ще активний. Класичний приклад - кнопка відправки форми: перший клік обробляється, повторні кліки під час запиту ігноруються. switchMap би скасував запит в польоті; exhaustMap дає йому завершитись.
Q: Як обробити помилку, не вбиваючи зовнішній потік?
A: Загорни catchError всередині внутрішнього Observable. Якщо catchError стоїть на зовнішньому потоці, перша помилка завершує все. Всередині switchMap він ловить помилку лише для цієї конкретної емісії, зовнішній потік продовжує працювати.
Q: Навіщо поєднувати debounceTime з distinctUntilChanged?
A: debounceTime чекає паузу після введення. distinctUntilChanged пропускає емісію, якщо значення не змінилося. Разом вони усувають зайві API-запити, коли користувач робить паузу, видаляє символ і вводить той самий знову.
Q: Як налагодити ланцюжок pipe?
A: Додай tap(console.log) між операторами, щоб бачити що виходить на кожному кроці. Краще іменувати: tap(val => console.log('after filter:', val)). Розширення RxJS devtools для браузера може також візуалізувати часову шкалу потоку.
Q: Senior: розбери race condition (стан гонки) з mergeMap у пошуку і поясни виправлення.
A: З mergeMap запити A (повільний) і B (швидкий) виконуються паралельно. B повертається першим і показує правильний результат. Потім повертається A і перезаписує B застарілими даними. UI показує не те. switchMap скасовує A в момент, коли B стартує, тому до підписника доходить тільки B. Скасування можна підтвердити через finalize() всередині внутрішнього pipe - він спрацює при кожному скасуванні switchMap.
Приклади
Базовий: map і filter у ланцюжку
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
// Залишити лише парні числа, потім подвоїти їх
of(1, 2, 3, 4, 5).pipe(
filter(n => n % 2 === 0), // залишає: 2, 4
map(n => n * 2) // подвоює до: 4, 8
).subscribe(console.log); // Вивід: 4, 8filter зменшує те, що тече далі по ланцюжку, map змінює форму кожного елемента. Джерельний Observable не змінюється.
Середній: пошук під час введення через switchMap
import { FormControl } from '@angular/forms';
import { debounceTime, distinctUntilChanged, switchMap, map } from 'rxjs/operators';
searchCtrl = new FormControl('');
users$ = this.searchCtrl.valueChanges.pipe(
debounceTime(300), // чекати 300мс після останнього натискання
distinctUntilChanged(), // пропустити якщо значення не змінилося
switchMap(query =>
this.http.get<{ users: User[] }>('/api/users', { params: { q: query } }).pipe(
map(res => res.users) // витягти масив з обгортки відповіді
)
)
);
// Шаблон: *ngFor="let user of users$ | async"Кожне нове натискання (після вікна debounce) скасовує попередній HTTP-запит. Застарілі результати до шаблону не доходять.
Просунутий: ізоляція помилок всередині switchMap
import { Subject, of } from 'rxjs';
import { switchMap, catchError, finalize } from 'rxjs/operators';
const input$ = new Subject<string>();
input$.pipe(
switchMap(val =>
this.http.get(`/api/data/${val}`).pipe(
catchError(err => {
console.warn('запит не вдався, повертаємо fallback', err);
return of([]); // тримає зовнішній потік живим
}),
finalize(() => console.log('внутрішній завершився або скасований switchMap'))
)
)
).subscribe(console.log);
// Невдалий запит повертає [] замість того, щоб зупинити потік.
// finalize спрацьовує і при завершенні, і при скасуванні.
// Наступна емісія input$ працює незалежно від того, що було раніше.Чимало команд витрачає годину на пошук того, чому потік зупиняється після першого 404. Відповідь завжди одна: catchError стоїть зовні внутрішнього pipe. Перемістити всередину. Проблема вирішена.
Коротка відповідь
Для співбесідиКоротка відповідь допоможе вам впевнено відповідати на цю тему під час співбесіди.