Publié mercredi 25 avril 2012 12:06 par Groc

Reactive Extensions : Consommer des services avec Rx Partie 1, créer une source observable

Pour créer une source obsevable sans implémenter IObservable à la main, il y a deux principales solutions :

- utiliser la méthode Observable.Create<T> pour créer une séquence cold;

- utiliser l’instance d’une classe qui implémente ISubject<T>, et qui implémentera donc aussi IObservable<T> et IObserver<T> pour créer une séquence hot.

 

Une séquence cold signifie qu’elle est passive, c’est à dire qu’elle n’est exécutée que lorsque quelqu’un s’y abonne. Dans le cas d’un Observable.Create, la func que l’on passe en paramètre à create sera appelée à chaque subscribe. C’est typiquement ce genre de flux que l’on utilisera pour une requête asynchrone vers un web service.

Cette func reçoit en paramètre un observer et doit renvoyer une action qui sera éxécutée lors du désabonnement.

class Program { static IObservable<int> GetColdStream() { return Observable.Create<int>(observer => { observer.OnNext(1); observer.OnNext(2); observer.OnCompleted(); return () => { Console.WriteLine("unsubscribed"); }; }); } static void ConsumeStream(IObservable<int> observable) { var disposable = observable.Subscribe(n => { Console.WriteLine(n); }, exp => { }, () => { Console.WriteLine("completed"); }); disposable.Dispose(); } static void Main(string[] args) { var coldStream = GetColdStream(); ConsumeStream(coldStream); ConsumeStream(coldStream); Console.ReadLine(); } }

donne

1 2 completed unsubscribed 1 2 completed unsubscribed

 

A l’inverse une séquence hot est active, c’est à dire qu’elle vit sa vie et qu’un subscribe agit un peu comme l’abonnement à un évènement et on ne sera notifié que de ce qui se passe après s’être abonné (sauf cas particuliers ci dessous). C’est le genre de flux que l’on utiliserait pour réagir à des situations “continues” du type mouvement de la souris ou traffic UDP.

Un moyen simple de mettre votre propre séquence hot en place est d’utiliser un Subject. En fait, il y a plusieurs implémentations de ISubject<T> dans l’espace de noms System.Reactive.Subjects, mais elles ont toutes un comportement différent. Ainsi il ne faut surtout pas tomber dans  le piège du bien mal nommé AsyncSubject<T>

 

D’abord, Subject<T>, c’est l’implémentation basique de ISubject<T>. On ne sera notifié que de l’arrivée de nouveaux éléments après avoir fait un subscribe.

static void Main(string[] args) { var subject = new Subject<int>(); subject.OnNext(0); subject.Subscribe(n => { Console.WriteLine(n); }, error => { Console.WriteLine("error"); }, () => { Console.WriteLine("completed"); }); subject.OnNext(1); subject.OnNext(2); subject.OnCompleted(); Console.ReadLine(); }

… donne en sortie …

1 2 completed

ReplaySubject<T> notifiera automatiquement tout nouveau subscriber de l’intégralité des éléments qui sont passés dans le flux, y compris ceux antérieurs au subscribe.

static void Main(string[] args) { var subject = new ReplaySubject<int>(); subject.OnNext(0); subject.Subscribe(n => { Console.WriteLine(n); }, error => { Console.WriteLine("error"); }, () => { Console.WriteLine("completed"); }); subject.OnNext(1); subject.OnNext(2); subject.OnCompleted(); Console.ReadLine(); }

… donne en sortie …

0 1 2 completed

Un peu dans le même principe, BehaviorSubject<T> va conserver la dernière valeur ou une valeur par défaut (qu’il faut obligatoirement passer au constructeur).

static void Main(string[] args) { var subject = new BehaviorSubject<int>(-1); subject.Subscribe(n => { Console.WriteLine(n); }, error => { Console.WriteLine("error"); }, () => { Console.WriteLine("completed"); }); subject.OnNext(1); subject.OnNext(2); subject.OnCompleted(); Console.ReadLine(); }

… donne …

-1 1 2 completed

Enfin, AsyncSubject<T> n’a rien d’asynchrone : il ne retournera qu’une seule valeur, la dernière passée au flux, et la retournera que suite à un appel à OnCompleted();

static void Main(string[] args) { var subject = new AsyncSubject<int>(); subject.OnNext(0); subject.Subscribe(n => { Console.WriteLine(n); }, error => { Console.WriteLine("error"); }, () => { Console.WriteLine("completed"); }); subject.OnNext(1); subject.OnNext(2); subject.OnCompleted(); Console.ReadLine(); }

… donne …

2 completed

Maintenant que nous savons comment créer une source, la prochaine étape sera d’exposer le résultat d’un service au travers de cette source.

Ce post vous a plu ? Ajoutez le dans vos favoris pour ne pas perdre de temps à le retrouver le jour où vous en aurez besoin :

Classé sous ,

Les 10 derniers blogs postés

- Etendre le Team Web Access de TFS 2012 – Step 0 par Philippe Didiergeorges Aka Philess le 05-23-2013, 23:48

- Simuler facilement l’envoi de mail par Blog de Jérémy Jeanson le 05-22-2013, 12:52

- ProcDump 6.0 : support du filtrage sur messages d'exceptions .NET, des filtres multiples et du ciblage par nom de service par CoqBlog le 05-20-2013, 14:50

- Votez pour le TOP 10 des influenceurs SharePoint francophones ! par Le blog de Patrick [MVP SharePoint] le 05-20-2013, 12:59

- [Conf’SharePoint] Dernier rappel ! :-) par Le blog de Patrick [MVP SharePoint] le 05-20-2013, 09:09

- [ #SharePoint 2013 ] les modèles de sites standards… par Le blog de Patrick [MVP SharePoint] le 05-20-2013, 09:03

- 10 erreurs de compréhension concernant SharePoint… par Le blog de Patrick [MVP SharePoint] le 05-20-2013, 08:27

- Conf’SharePoint : 10 bonnes raisons pour ne pas la rater par Le petit blog de Pierre / Pierre's little blog le 05-14-2013, 02:24

- [Event] Soirée de lancement Agile .NET France à Lyon par Blog Agile/ALM de Vincent THAVONEKHAM le 05-13-2013, 01:29

- .NET / Debug : inspection de la mémoire d'applications .NET (dump ou processus live) : première livraison d'une librairie .NET par Microsoft par CoqBlog le 05-11-2013, 22:21