Skip to main content

Звичайні оператори RxJS в Angular

Оператори RxJS - це чисті функції, які будують ланцюжки Observable для трансформації, фільтрації та комбінування потоків асинхронних даних в Angular.

Теорія

TL;DR

  • Оператори як інструменти на конвеєрі: кожен обробляє або сортує елементи, не зупиняючи лінію
  • Чотири категорії: трансформація (map, switchMap), фільтрація (filter, debounceTime), комбінування (combineLatest, forkJoin), обробка помилок (catchError, retry)
  • Правило вибору: switchMap для пошуку (перемагає останній), mergeMap для паралельних запитів (всі виконуються), concatMap для збережень у порядку (черга)
  • Оператори не змінюють джерельний Observable. Кожен повертає новий.

Швидкий приклад

typescript
import { of } from 'rxjs'; import { map, filter } from 'rxjs/operators'; of(1, 2, 3, 4).pipe( filter(n => n % 2 === 0), // залишає: 2, 4 map(n => n * 2) // перетворює на: 4, 8 ).subscribe(console.log); // виводить: 4, 8

filter спрацьовує першим, map обробляє те, що пройшло. Джерело не змінюється.

Оператори вирівнювання: switchMap, mergeMap, concatMap

Ці три трапляються майже на кожній Angular-співбесіді. Всі перетворюють емісії джерела на внутрішні Observable, але по-різному вирішують питання паралельності.

switchMap скасовує попередній внутрішній Observable, коли надходить нова емісія. Користувач набрав "an", потім "ang"? Запит для "an" скасовано. Ідеально для пошукових полів.

mergeMap запускає всі внутрішні Observable паралельно. Три емісії запускають три одночасних запити. Підходить для незалежних запитів, де порядок не важливий.

concatMap ставить внутрішні Observable у чергу і обробляє їх по одному. Наступний стартує лише після того, як попередній завершився. Використовуй, коли порядок має значення.

Коли використовувати

  • Пошук під час введення: debounceTime(300) + switchMap - debounce зменшує кількість запитів, switchMap скасовує застарілі
  • Паралельні API-запити: mergeMap або forkJoin - всі запити виконуються одночасно
  • Збереження у порядку: concatMap - перший запит завершується до початку другого
  • Форма з кількома джерелами: combineLatest - реагує на зміну в будь-якому потоці
  • Одноразовий HTTP-запит: take(1) - завершується після першої емісії
  • Очищення компонента: takeUntil(destroy$) - зупиняє потік при знищенні компонента

Таблиця порівняння

ОператорПоведінкаСкасовує попередній?Паралельно?Приклад використання
mapТрансформує кожну емісіюНіНіВитягти дані з відповіді API
switchMapПеремикається на внутрішній, скасовує попереднійТакНіПошукове поле
mergeMapЗапускає всі внутрішні паралельноНіТакЗавантаження кількох профілів
concatMapЧерга внутрішніх, один за однимНіНіПослідовне збереження
filterПропускає лише якщо умова виконанаN/AN/ANull-перевірки, умови
debounceTimeЧекає паузу перед емісієюN/AN/AДроселювання натискань клавіш
combineLatestЕмітує коли будь-яке джерело емітує (з усіма останніми значеннями)НіТакКілька джерел даних
forkJoinЧекає завершення всіх джерелНіТакПаралельні HTTP-запити
catchErrorПерехоплює помилки, повертає запасне значенняN/AN/AВідновлення після помилок

Як оператори працюють внутрішньо

Кожен оператор повертає новий Observable, який підписується на джерело та застосовує логіку через об'єкти Subscriber у ланцюжку pipe. Джерело ніколи не змінюється. Zone.js патчить асинхронні операції (fetch, таймери, події браузера), тому Angular автоматично підхоплює емісії та оновлює DOM. Саме тому після HTTP-відповіді через async pipe не потрібно вручну викликати detectChanges().

Типові помилки

Забули .pipe():

typescript
// Помилка - .map() не є прямим методом Observable в RxJS 6+ this.http.get('/api/users').map(res => res.data); // Правильно this.http.get('/api/users').pipe(map(res => res.data));

switchMap без debounceTime перевантажує сервер:

typescript
// Помилка - кожне натискання клавіші надсилає запит this.searchCtrl.valueChanges.pipe( switchMap(q => this.http.get(`/api/search?q=${q}`)) ); // Правильно this.searchCtrl.valueChanges.pipe( debounceTime(300), switchMap(q => this.http.get(`/api/search?q=${q}`)) );

Вкладені subscribe замість вирівнювання:

typescript
// Помилка - немає очищення, callback hell this.http.get('/api/user').subscribe(user => { this.http.get(`/api/orders/${user.id}`).subscribe(orders => { /* ... */ }); }); // Правильно this.http.get<User>('/api/user').pipe( switchMap(user => this.http.get(`/api/orders/${user.id}`)) ).subscribe(orders => { /* ... */ });

catchError на зовнішньому потоці вбиває все після першої помилки:

typescript
// Помилка - один невдалий запит завершує весь потік input$.pipe( switchMap(val => this.http.get(`/api/${val}`)), catchError(() => of([])) // потік завершується тут після першої помилки ).subscribe(); // Правильно - обробляй помилки всередині внутрішнього Observable input$.pipe( switchMap(val => this.http.get(`/api/${val}`).pipe( catchError(() => of([])) // зовнішній потік виживає ) ) ).subscribe();

Не відписуватись від довготривалих потоків:

typescript
// Помилка - витік пам'яті після знищення компонента ngOnInit() { this.router.events.subscribe(event => { /* ... */ }); } // Правильно private destroy$ = new Subject<void>(); ngOnInit() { this.router.events.pipe( takeUntil(this.destroy$) ).subscribe(event => { /* ... */ }); } ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); }

Де зустрічається в реальних проектах

  • Angular Material autocomplete: debounceTime + switchMap для підказок під час введення
  • NGRX Effects: switchMap або mergeMap для HTTP-запитів, спровокованих дією
  • AngularFire (Firestore): combineLatest для синхронізації кількох real-time колекцій
  • Route guards: switchMap для завантаження даних до завершення навігації
  • Очищення компонентів: takeUntil - стандартний патерн проти витоків пам'яті

Питання на співбесіді

Q: Яка різниця між switchMap, mergeMap і concatMap?
A: switchMap скасовує попередній внутрішній Observable при новій емісії, перемагає останній. mergeMap запускає всі паралельно. concatMap ставить їх у чергу і обробляє по одному, зберігаючи порядок.

Q: Коли використовувати exhaustMap?
A: Коли потрібно ігнорувати нові емісії, поки поточний внутрішній Observable ще активний. Класичний приклад - кнопка відправки форми: перший клік обробляється, повторні кліки під час запиту ігноруються. switchMap би скасував запит в польоті; exhaustMap дає йому завершитись.

Q: Як обробити помилку, не вбиваючи зовнішній потік?
A: Загорни catchError всередині внутрішнього Observable. Якщо catchError стоїть на зовнішньому потоці, перша помилка завершує все. Всередині switchMap він ловить помилку лише для цієї конкретної емісії, зовнішній потік продовжує працювати.

Q: Навіщо поєднувати debounceTime з distinctUntilChanged?
A: debounceTime чекає паузу після введення. distinctUntilChanged пропускає емісію, якщо значення не змінилося. Разом вони усувають зайві API-запити, коли користувач робить паузу, видаляє символ і вводить той самий знову.

Q: Як налагодити ланцюжок pipe?
A: Додай tap(console.log) між операторами, щоб бачити що виходить на кожному кроці. Краще іменувати: tap(val => console.log('after filter:', val)). Розширення RxJS devtools для браузера може також візуалізувати часову шкалу потоку.

Q: Senior: розбери race condition (стан гонки) з mergeMap у пошуку і поясни виправлення.
A: З mergeMap запити A (повільний) і B (швидкий) виконуються паралельно. B повертається першим і показує правильний результат. Потім повертається A і перезаписує B застарілими даними. UI показує не те. switchMap скасовує A в момент, коли B стартує, тому до підписника доходить тільки B. Скасування можна підтвердити через finalize() всередині внутрішнього pipe - він спрацює при кожному скасуванні switchMap.

Приклади

Базовий: map і filter у ланцюжку

typescript
import { of } from 'rxjs'; import { map, filter } from 'rxjs/operators'; // Залишити лише парні числа, потім подвоїти їх of(1, 2, 3, 4, 5).pipe( filter(n => n % 2 === 0), // залишає: 2, 4 map(n => n * 2) // подвоює до: 4, 8 ).subscribe(console.log); // Вивід: 4, 8

filter зменшує те, що тече далі по ланцюжку, map змінює форму кожного елемента. Джерельний Observable не змінюється.

Середній: пошук під час введення через switchMap

typescript
import { FormControl } from '@angular/forms'; import { debounceTime, distinctUntilChanged, switchMap, map } from 'rxjs/operators'; searchCtrl = new FormControl(''); users$ = this.searchCtrl.valueChanges.pipe( debounceTime(300), // чекати 300мс після останнього натискання distinctUntilChanged(), // пропустити якщо значення не змінилося switchMap(query => this.http.get<{ users: User[] }>('/api/users', { params: { q: query } }).pipe( map(res => res.users) // витягти масив з обгортки відповіді ) ) ); // Шаблон: *ngFor="let user of users$ | async"

Кожне нове натискання (після вікна debounce) скасовує попередній HTTP-запит. Застарілі результати до шаблону не доходять.

Просунутий: ізоляція помилок всередині switchMap

typescript
import { Subject, of } from 'rxjs'; import { switchMap, catchError, finalize } from 'rxjs/operators'; const input$ = new Subject<string>(); input$.pipe( switchMap(val => this.http.get(`/api/data/${val}`).pipe( catchError(err => { console.warn('запит не вдався, повертаємо fallback', err); return of([]); // тримає зовнішній потік живим }), finalize(() => console.log('внутрішній завершився або скасований switchMap')) ) ) ).subscribe(console.log); // Невдалий запит повертає [] замість того, щоб зупинити потік. // finalize спрацьовує і при завершенні, і при скасуванні. // Наступна емісія input$ працює незалежно від того, що було раніше.

Чимало команд витрачає годину на пошук того, чому потік зупиняється після першого 404. Відповідь завжди одна: catchError стоїть зовні внутрішнього pipe. Перемістити всередину. Проблема вирішена.

Коротка відповідь

Для співбесіди
Premium

Коротка відповідь допоможе вам впевнено відповідати на цю тему під час співбесіди.

Дочитали статтю?