image auteur
Lucas BENTO-VERSACE 14min • 14-03-2022

Angular - RxJS : Introduction

Angular - RxJS : Introduction

Bienvenue sur CodeWise ! Si vous êtes nouveau ici, vous voudrez sans doute découvrir notre Starter Kit Angular : il contient une roadmap du parcours d'un dev Angular, ainsi qu'un cookbook des commandes les plus utiles et un extrait de formation. Cliquez ici pour le télécharger (c’est gratuit !)

Dans cet article, nous allons découvrir les Observables avec RxJS.

C'est quoi RxJS ?

RxJS (Reactive extensions for JavaScript) est une librairie de programmation réactive qui permet aux développeurs de composer facilement des flux de données asynchrones.

Elle fournit une interface pour combiner, transformer des données provenant de sources diverses.

Concrètement, dans un projet Angular on utilise RxJS pour faire circuler l'information.

Par exemple, quand l'utilisateur se déconnecte et qu'on doit modifier l'apparence de plusieurs composants en réponse à ce changement, on utilise RxJS pour prévenir tous les composants du changement.

Le système se base sur le design pattern Observer. Un Observable émet des informations, tandis que des Observers écoutent et réagissent à ces informations.

Définitions

Avant de voir le détail du fonctionnement, quelques définitions.

Observable

Les Observables sont des objets qui émettent des messages à travers les différentes parties de votre application. Ils sont fréquemment utilisés dans Angular en tant que technique pour gérer les événements, la programmation asynchrone ou des flux de données.

Observer

Un Observer est un objet contenant 3 callbacks :

  • next : callback en cas de succès
  • error : callback en cas d’erreur
  • complete : callback quand l’Observable se termine

La souscription d'un Observer déclenche l'exécution de l'Observable (par défaut). On dit alors que l'Observable est lazy car il attend une souscription pour déclencher le traitement.

Les callbacks contenues dans l'Observer peuvent être utilisées par l'Observable pour gérer les différents cas (réussite, erreur et complétion).

Quand un Observer se souscrit à un Observable, cela veut dire qu'il écoute les données émises par cet Observable pour pouvoir y réagir. On peut le désinscrire si besoin pour stopper l'écoute.

Les cas error et complete sont facultatifs. Une callback par défaut leur est assignée si vous n'en fournissez pas.

Stream

Un stream (ou flux en français) représente une suite infinie d'éléments, comme un courant d’eau par exemple.

En informatique, un stream est une source de donnée qui sera traitée de façon séquentielle plutôt que globale parce que les données sont potentiellement illimitées.

Dans le cas d'RxJS, un stream part d'un Observable qui représente la source d'émission des données, et termine sur un Observer (mono-stream) ou plusieurs Observers (multi-stream) qui vont récupérer les données émises.

Voyons maintenant comment on se sert d'RxJS dans nos applications.

Souscrire à un Observable

Il y a 2 manières de souscrire à un Observable : la manière raccourcie et la manière complète.

Souscription raccourcie

En méthode raccourcie, il vous suffit de passer une fonction callback au subscribe.

1
this.observable$.subscribe(data => console.log(data))
Souscription raccourcie

Un Observer sera généré pour vous et cette fonction sera considérée comme la fonction next de l'Observer. Une fonction par défaut sera utilisée pour error et complete.

Souscription complète

Dans le cas d'une souscription complète, vous devez passer directement un objet de type Observer.

1
2
3
4
5
this.observable$.subscribe({
  next: data => console.log(data),
  error: error => console.error(error),
  complete: () => console.log('Execution completed')
});
Souscription complète

Utilisez cette méthode si vous devez gérer les erreurs ou surcharger la complétion.

Créer un Observable

Un Observable se construit à partir d'une fonction pure qui décrit son traitement.

Cette fonction prend en paramètre un Observer. Elle se sert des callbacks contenues dans l'Observer faire réagir les Observers pendant son traitement.

Voici comment on crée un Observable de manière générale.

1
2
3
4
5
6
7
8
9
this.observable$ = new Observable(observer: Observer => {
  // Traitement (ex : appel à un serveur)
  if (result) {
    observer.next(result);
  } else if (error) {
    observer.error(error);
  }
  observer.complete();
})
Création d'un Observable

Les Observers qui se souscrivent à cet Observable seront utilisés comme décrit dans la fonction de traitement.

Exemple : créer un Observable timer

Prenons l'exemple d'un Observable qui émet le temps passé toutes les secondes.

1
2
3
4
this.timer$ = new Observable(obv: Observer => {
  let count = 0;
  setInterval(() => obv.next(++count), 1000);
})
timer.component.ts

Chaque seconde, l’Observable fait appel à la fonction next de l'Observer, en lui passant la valeur du compteur. Un Observer qui se souscrit à cet Observable sera donc déclenché toutes les secondes.

Remarquez que cet Observable ne fait appel qu'à la fonction next de l'Observer. Il ne fait pas directement appel à la fonction error. Il ne pourra donc échouer que si une exception est levée pendant l'exécution.

Il ne fait pas non plus appel à la fonction complete. Cet Observable ne se termine donc jamais.

Types d’Observables RxJS

Plain Observable

Les Observables simples (plain Observable) sont mono-streams, c'est-à-dire que chaque Observer souscrit possède une exécution indépendante de l’Observable.

De plus, un plain Observable est lazy, c'est à dire qu'il ne s'exécute qu’au moment où un Observer s'y souscrit.

Exemple

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import { Component, OnInit } from '@angular/core';
import {  Observable } from 'rxjs';
 
@Component({
 selector: 'app-subscribe',
 templateUrl: './subscribe.component.html',
 styleUrls: ['./subscribe.component.css']
})
export class SubscribeComponent implements OnInit {
 
  timer$: Observable<any>;
 
  constructor() {
    this.timer$ = new Observable(obv => {
      let count = 0;
      setInterval(() => obv.next(++count), 1000);
    })
  }
 
  addObserver(): void {
    this.timer$.subscribe(data => console.log(data));
  }
}
subscribe.component.ts

A chaque appel de addObserver(), un nouvel Observer se souscrit à l'Observable.

Das ce cas, chaque souscription d'un Observer déclenche une exécution de l'Observable. Et l'Observer qui a déclenché le traitement est le seul à recevoir les évènements envoyés par cette exécution (mono-stream).

Subject

Un Subject est un type complexe d’Observable, qui permet de diffuser à plusieurs Observers les données d'une même source (multi-stream).

Le Subject est à la fois Observable et Observer.

Le Subject est un Observable : vous pouvez donc y souscrire, en fournissant un Observer qui commencera à recevoir des valeurs.

De manière interne au Subject, l'appel à subscribe() ne déclenche pas d'exécution qui délivre des valeurs. L'Observer est simplement ajouté à une liste d'Observers, de la même manière que fonctionnerait une méthode addListener() dans un autre langage.

Le Subject est un Observer : c'est un objet qui contient les méthodes next(v), error(e) et complete(). Pour que votre Subject émette une valeur, appelez simplement la méthode next(theValue), et elle sera multicastée aux Observers qui ont souscrit au Subject.

Vous pouvez utiliser un Subject de 2 manières différentes.

Emettre des valeurs manuellement

Dans l'exemple ci-dessous, on attache 2 Observers au Subject, et on émet des valeurs manuellement depuis le Subject :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import { Subject } from 'rxjs';
     
const subject = new Subject<number>();
 
subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});
 
subject.next(1);
subject.next(2);
 
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
Exemple d'utilisation du Subject

Multicaster les valeurs venant d'un Observable

Du fait que le Subject est un Observer, on peut le souscrire à un Observable pour multicaster les valeurs de l'Observable.

On pourrait prendre l'analogie d'une multi-prise qui permet de rediriger le courant d'une source unique vers plusieurs appareils.

schéma subject
Multicaster un Observable avec un Subject

1 - Les Observers se souscrivent au Subject, ce qui ne déclenche pas de traitement.

2 - Ensuite le Subject se souscrit à un Observable. L'Observable lance alors son traitement et émet des évènements au Subject.

3 - Le Subject redirige alors les informations qu'il recoit à tous ses Observers.

Utiliser un Subject permet de faire en sorte que tous les Observers recoivent la même information. Ce qui n'est pas possible avec un plain Observable seul car il est mono-stream et déclenche un nouveau traitement à chaque souscription.

C'est la même chose si on veut que plusieurs appareils recoivent le courant d'une même source : on doit utiliser une multi-prise.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import { Subject, from } from 'rxjs';
     
const subject = new Subject<number>();
 
subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});
 
const observable = from([1, 2, 3]);
 
observable.subscribe(subject); // You can subscribe providing a Subject
 
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
Subject en tant que redirecteur multicast

Ici, une seul exécution est déclenchée car seul le Subject s'est souscrit à l'Observable, 1 fois.

Les Observers recoivent tous la même valeur, puisqu'ils sont reliés au Subject qui multicaste à tous ses Observers.

BehaviorSubject

Le BehaviorSubject est une extension du Subject qui permet aussi le multicasting. A la différence du Subject, il émet une valeur lors de la souscription. Il garde en mémoire une valeur et émet cette valeur à tout nouveau souscripteur.

Si la valeur est modifiée, tous les souscripteurs enregistrés reçoivent la nouvelle valeur.

Exemple :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import { BehaviorSubject } from 'rxjs';

const subject = new BehaviorSubject(123);

// two new subscribers will get initial value => output: 123, 123
subject.subscribe(console.log);
subject.subscribe(console.log);

// two subscribers will get new value => output: 456, 456
subject.next(456);

// new subscriber will get latest value (456) => output: 456
subject.subscribe(console.log);

// all three subscribers will get new value => output: 789, 789, 789
subject.next(789);

// output: 123, 123, 456, 456, 456, 789, 789, 789
Utilisation d'un BehaviorSubject

Remarquez qu'il faut nécessairement fournir une valeur de départ au BehaviorSubject.

Utilisez un BehaviorSubject si vous souhaitez un Observable multicast qui émet une valeur à la souscription.

Les opérateurs RxJS

Les opérateurs sont des fonctions. Il en existe 2 types :

Les opérateurs pipeables, qui sont des opérateurs qui prennent en entrée un Observable et renvoient en sortie un nouvel Observable modifié.

Et les opérateurs de création qui peuvent être appelés pour créer un Observable.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import { Component, OnInit } from '@angular/core';
import { interval, Observable } from 'rxjs';
import { map } from 'rxjs/operators';
 
@Component({
 selector: 'app-subscribe',
 templateUrl: './subscribe.component.html',
 styleUrls: ['./subscribe.component.css']
})
export class SubscribeComponent implements OnInit {
 
 observable$: Observable<any>;
 
 constructor() {
   this.observable$ = interval(1000);
   this.observable$
   .pipe(map((x) => x * 2))
   .subscribe(data => console.log(data))
 }
 
 ngOnInit(): void {
 }
}
subscribe.component.ts

Dans cet exemple, l’Observable enverra toutes les secondes la valeur d'un compteur qui s'incrémente. Exactement comme le timer que nous avions créé manuellement dans les exemples précédents.

Pour créer l'Observable, on a utilisé l'opérateur de création "interval".

Vous pouvez remarquer aussi l'utilisation de la fonction pipe() qui contient l’opérateur map(). Cet opérateur transforme les données de l’Observable avant de les émettre vers l'Observer.

Pipe() est en fait la méthode qui permet d'ajouter des opérateurs pipeables entre l'Observable et les Observers. Cette méthode prend autant d'opérateurs que l'on souhaite en paramètre.

Conclusion

Voilà pour cette intro à RxJS. Cet article est assez complexe, n’hésitez pas à laisser un commentaire pour nous aider à l'améliorer. En espérant que cela vous a plu !

Téléchargez votre Starter Kit Angular

Starter Kit Angular

En souscrivant vous recevrez :

  • Une roadmap d'un développeur Angular
  • Un cookbook Angular des commandes les plus utiles
  • Un extrait de formation

100% gratuit.

Auteur

image auteur
Lucas BENTO-VERSACE Alternant chez Codewise / Étudiant en développement web lucas.bentoversace@ynov.com
"Alternant chez Codewise, passioné par l'informatique et les technologies depuis mon plus jeune âge. J'ai découvert le monde du web et depuis je ne m'en lasse pas ! Fan de jeux vidéos, de sports et de musiques. "