Bienvenue sur CodeWise ! Si vous êtes nouveau ici, vous voudrez sans doute découvrir notre Starter Kit Angular : il contient une roadmap du parcours d'un dev Angular, ainsi qu'un cookbook des commandes les plus utiles et un extrait de formation. Cliquez ici pour le télécharger (c’est gratuit !)
Dans cet article, nous allons découvrir les Observables avec RxJS.
C'est quoi RxJS ?
RxJS (Reactive extensions for JavaScript) est une librairie de programmation réactive qui permet aux développeurs de composer facilement des flux de données asynchrones.
Elle fournit une interface pour combiner, transformer des données provenant de sources diverses.
Concrètement, dans un projet Angular on utilise RxJS pour faire circuler l'information.
Par exemple, quand l'utilisateur se déconnecte et qu'on doit modifier l'apparence de plusieurs composants en réponse à ce changement, on utilise RxJS pour prévenir tous les composants du changement.
Le système se base sur le design pattern Observer. Un Observable émet des informations, tandis que des Observers écoutent et réagissent à ces informations.
Définitions
Avant de voir le détail du fonctionnement, quelques définitions.
Observable
Les Observables sont des objets qui émettent des messages à travers les différentes parties de votre application. Ils sont fréquemment utilisés dans Angular en tant que technique pour gérer les événements, la programmation asynchrone ou des flux de données.
Observer
Un Observer est un objet contenant 3 callbacks :
next
: callback en cas de succèserror
: callback en cas d’erreurcomplete
: callback quand l’Observable se termine
La souscription d'un Observer déclenche l'exécution de l'Observable (par défaut). On dit alors que l'Observable est lazy car il attend une souscription pour déclencher le traitement.
Les callbacks contenues dans l'Observer peuvent être utilisées par l'Observable pour gérer les différents cas (réussite, erreur et complétion).
Quand un Observer se souscrit à un Observable, cela veut dire qu'il écoute les données émises par cet Observable pour pouvoir y réagir. On peut le désinscrire si besoin pour stopper l'écoute.
Les cas error
et complete
sont facultatifs. Une callback par défaut leur est assignée si vous n'en fournissez pas.
Stream
Un stream (ou flux en français) représente une suite infinie d'éléments, comme un courant d’eau par exemple.
En informatique, un stream est une source de donnée qui sera traitée de façon séquentielle plutôt que globale parce que les données sont potentiellement illimitées.
Dans le cas d'RxJS, un stream part d'un Observable qui représente la source d'émission des données, et termine sur un Observer (mono-stream) ou plusieurs Observers (multi-stream) qui vont récupérer les données émises.
Voyons maintenant comment on se sert d'RxJS dans nos applications.
Souscrire à un Observable
Il y a 2 manières de souscrire à un Observable : la manière raccourcie et la manière complète.
Souscription raccourcie
En méthode raccourcie, il vous suffit de passer une fonction callback au subscribe.
1
this.observable$.subscribe(data => console.log(data))
Un Observer sera généré pour vous et cette fonction sera considérée comme la fonction next
de l'Observer. Une fonction par défaut sera utilisée pour error
et complete
.
Souscription complète
Dans le cas d'une souscription complète, vous devez passer directement un objet de type Observer.
1
2
3
4
5
this.observable$.subscribe({
next: data => console.log(data),
error: error => console.error(error),
complete: () => console.log('Execution completed')
});
Utilisez cette méthode si vous devez gérer les erreurs ou surcharger la complétion.
Créer un Observable
Un Observable se construit à partir d'une fonction pure qui décrit son traitement.
Cette fonction prend en paramètre un Observer. Elle se sert des callbacks contenues dans l'Observer faire réagir les Observers pendant son traitement.
Voici comment on crée un Observable de manière générale.
1
2
3
4
5
6
7
8
9
this.observable$ = new Observable(observer: Observer => {
// Traitement (ex : appel à un serveur)
if (result) {
observer.next(result);
} else if (error) {
observer.error(error);
}
observer.complete();
})
Les Observers qui se souscrivent à cet Observable seront utilisés comme décrit dans la fonction de traitement.
Exemple : créer un Observable timer
Prenons l'exemple d'un Observable qui émet le temps passé toutes les secondes.
1
2
3
4
this.timer$ = new Observable(obv: Observer => {
let count = 0;
setInterval(() => obv.next(++count), 1000);
})
Chaque seconde, l’Observable fait appel à la fonction next
de l'Observer, en lui passant la valeur du compteur. Un Observer qui se souscrit à cet Observable sera donc déclenché toutes les secondes.
Remarquez que cet Observable ne fait appel qu'à la fonction next
de l'Observer. Il ne fait pas directement appel à la fonction error
. Il ne pourra donc échouer que si une exception est levée pendant l'exécution.
Il ne fait pas non plus appel à la fonction complete
. Cet Observable ne se termine donc jamais.
Types d’Observables RxJS
Plain Observable
Les Observables simples (plain Observable) sont mono-streams, c'est-à-dire que chaque Observer souscrit possède une exécution indépendante de l’Observable.
De plus, un plain Observable est lazy, c'est à dire qu'il ne s'exécute qu’au moment où un Observer s'y souscrit.
Exemple
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
selector: 'app-subscribe',
templateUrl: './subscribe.component.html',
styleUrls: ['./subscribe.component.css']
})
export class SubscribeComponent implements OnInit {
timer$: Observable<any>;
constructor() {
this.timer$ = new Observable(obv => {
let count = 0;
setInterval(() => obv.next(++count), 1000);
})
}
addObserver(): void {
this.timer$.subscribe(data => console.log(data));
}
}
A chaque appel de addObserver(), un nouvel Observer se souscrit à l'Observable.
Das ce cas, chaque souscription d'un Observer déclenche une exécution de l'Observable. Et l'Observer qui a déclenché le traitement est le seul à recevoir les évènements envoyés par cette exécution (mono-stream).
Subject
Un Subject est un type complexe d’Observable, qui permet de diffuser à plusieurs Observers les données d'une même source (multi-stream).
Le Subject est à la fois Observable et Observer.
Le Subject est un Observable : vous pouvez donc y souscrire, en fournissant un Observer qui commencera à recevoir des valeurs.
De manière interne au Subject, l'appel à subscribe() ne déclenche pas d'exécution qui délivre des valeurs. L'Observer est simplement ajouté à une liste d'Observers, de la même manière que fonctionnerait une méthode addListener() dans un autre langage.
Le Subject est un Observer : c'est un objet qui contient les méthodes next(v)
, error(e)
et complete()
.
Pour que votre Subject émette une valeur, appelez simplement la méthode next(theValue)
, et elle sera multicastée aux Observers qui ont souscrit au Subject.
Vous pouvez utiliser un Subject de 2 manières différentes.
Emettre des valeurs manuellement
Dans l'exemple ci-dessous, on attache 2 Observers au Subject, et on émet des valeurs manuellement depuis le Subject :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import { Subject } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(1);
subject.next(2);
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
Multicaster les valeurs venant d'un Observable
Du fait que le Subject est un Observer, on peut le souscrire à un Observable pour multicaster les valeurs de l'Observable.
On pourrait prendre l'analogie d'une multi-prise qui permet de rediriger le courant d'une source unique vers plusieurs appareils.

1 - Les Observers se souscrivent au Subject, ce qui ne déclenche pas de traitement.
2 - Ensuite le Subject se souscrit à un Observable. L'Observable lance alors son traitement et émet des évènements au Subject.
3 - Le Subject redirige alors les informations qu'il recoit à tous ses Observers.
Utiliser un Subject permet de faire en sorte que tous les Observers recoivent la même information. Ce qui n'est pas possible avec un plain Observable seul car il est mono-stream et déclenche un nouveau traitement à chaque souscription.
C'est la même chose si on veut que plusieurs appareils recoivent le courant d'une même source : on doit utiliser une multi-prise.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import { Subject, from } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
const observable = from([1, 2, 3]);
observable.subscribe(subject); // You can subscribe providing a Subject
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
Ici, une seul exécution est déclenchée car seul le Subject s'est souscrit à l'Observable, 1 fois.
Les Observers recoivent tous la même valeur, puisqu'ils sont reliés au Subject qui multicaste à tous ses Observers.
BehaviorSubject
Le BehaviorSubject est une extension du Subject qui permet aussi le multicasting. A la différence du Subject, il émet une valeur lors de la souscription. Il garde en mémoire une valeur et émet cette valeur à tout nouveau souscripteur.
Si la valeur est modifiée, tous les souscripteurs enregistrés reçoivent la nouvelle valeur.
Exemple :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(123);
// two new subscribers will get initial value => output: 123, 123
subject.subscribe(console.log);
subject.subscribe(console.log);
// two subscribers will get new value => output: 456, 456
subject.next(456);
// new subscriber will get latest value (456) => output: 456
subject.subscribe(console.log);
// all three subscribers will get new value => output: 789, 789, 789
subject.next(789);
// output: 123, 123, 456, 456, 456, 789, 789, 789
Remarquez qu'il faut nécessairement fournir une valeur de départ au BehaviorSubject.
Utilisez un BehaviorSubject si vous souhaitez un Observable multicast qui émet une valeur à la souscription.
Les opérateurs RxJS
Les opérateurs sont des fonctions. Il en existe 2 types :
Les opérateurs pipeables, qui sont des opérateurs qui prennent en entrée un Observable et renvoient en sortie un nouvel Observable modifié.
Et les opérateurs de création qui peuvent être appelés pour créer un Observable.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import { Component, OnInit } from '@angular/core';
import { interval, Observable } from 'rxjs';
import { map } from 'rxjs/operators';
@Component({
selector: 'app-subscribe',
templateUrl: './subscribe.component.html',
styleUrls: ['./subscribe.component.css']
})
export class SubscribeComponent implements OnInit {
observable$: Observable<any>;
constructor() {
this.observable$ = interval(1000);
this.observable$
.pipe(map((x) => x * 2))
.subscribe(data => console.log(data))
}
ngOnInit(): void {
}
}
Dans cet exemple, l’Observable enverra toutes les secondes la valeur d'un compteur qui s'incrémente. Exactement comme le timer que nous avions créé manuellement dans les exemples précédents.
Pour créer l'Observable, on a utilisé l'opérateur de création "interval".
Vous pouvez remarquer aussi l'utilisation de la fonction pipe()
qui contient l’opérateur map()
.
Cet opérateur transforme les données de l’Observable avant de les émettre vers l'Observer.
Pipe()
est en fait la méthode qui permet d'ajouter des opérateurs pipeables entre l'Observable et les Observers. Cette méthode prend autant d'opérateurs que l'on souhaite en paramètre.
Conclusion
Voilà pour cette intro à RxJS. Cet article est assez complexe, n’hésitez pas à laisser un commentaire pour nous aider à l'améliorer. En espérant que cela vous a plu !
Téléchargez votre Starter Kit Angular

En souscrivant vous recevrez :
- Une roadmap d'un développeur Angular
- Un cookbook Angular des commandes les plus utiles
- Un extrait de formation
100% gratuit.