Publié lundi 30 avril 2012 15:45 par Groc

Reactive Extensions : Consommer des services avec Rx Partie 2, consommer un service

Dans cet article, je vais montrer un exemple d’utilisation de rx pour la construction d’une couche de service dans une application wp7. Pour l’exemple, j’utilise un service qui expose en json une liste de joueurs et une méthode pour récupérer le score d’un joueur.

Basiquement, dans ce genre de scénario, avec ou sans rx, la première étape consiste à créer une web request.

var request = (HttpWebRequest)WebRequest.Create("http://localhost/WcfService1/Service1.svc/Players");

Le premier opérateur qui nous sera utile est celui qui gère automatiquement le pattern async utilisé par les webrequest. Ainsi, la méthode ci dessous me retournera une séquence observable de webresponse.

Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)()

A partir de là, je peux utiliser des opérateurs de projection tels que Select ou SelectMany afin d’extraire une liste d’objets depuis le flux json de la réponse. Si une exception était levée à ce stade, elle serait automatiquement captée par l’observer.

Ainsi, je peux écrire la première méthode de ma couche de service.

public IObservable<Player> GetPlayers() { var request = (HttpWebRequest)WebRequest.Create("http://localhost/WcfService1/Service1.svc/Players"); return Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)() .SelectMany(response => { using (var responseStream = response.GetResponseStream()) { var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>)); return dataContractJsonSerializer.ReadObject(responseStream) as List<Player>; } }); }

Rx nous propose des opérateurs de base pour gérer automatiquement timeout et politique de retry. Ainsi, dans l’exemple ci dessous, en cas de congestion réseau, si le temps d’execution de la requête dépasse 20 secondes, une exception sera levée, et un nouvel essai aura lieu, et ainsi de suite 3 fois avant que l’exception soit remontée à mon observer.

public IObservable<Player> GetPlayers() { var request = (HttpWebRequest)WebRequest.Create("http://localhost/WcfService1/Service1.svc/Players"); return Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)() .Timeout(TimeSpan.FromSeconds(20)) .Retry(3) .SelectMany(response => { using (var responseStream = response.GetResponseStream()) { var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>)); return dataContractJsonSerializer.ReadObject(responseStream) as List<Player>; } }); }

Enfin, dans mon view model, mon observer recevra simplement les objets player un par un, et je pourrais gérer propement mon erreur.

playerService.GetPlayers() .Subscribe(player => { _players.Add(player); }, error => { });

Mamheureusement, si on execute l’application à ce stade, l’exception suivante apparaît.

image

En fait, le traitement effectué par la méthode FromAsyncPattern est déporté sur le thread pool. A partir de là, les opérations qui suivent sont executées sur le même thread, y compris les callbacks de l’observer.

Pour preuve, ajoutons quelques infos de debug …

private void Button_Click(object sender, RoutedEventArgs e) { Debug.WriteLine(Thread.CurrentThread.ManagedThreadId); var playerService = new PlayerService(); playerService.GetPlayers() .Subscribe(player => { Debug.WriteLine(Thread.CurrentThread.ManagedThreadId); _players.Add(player); }, error => { }); }
public IObservable<Player> GetPlayers() { var request = (HttpWebRequest)WebRequest.Create("http://localhost/WcfService1/Service1.svc/Players"); return Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)() .Timeout(TimeSpan.FromSeconds(20)) .Retry(3) .SelectMany(response => { Debug.WriteLine(Thread.CurrentThread.ManagedThreadId); using (var responseStream = response.GetResponseStream()) { var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>)); return dataContractJsonSerializer.ReadObject(responseStream) as List<Player>; } }); }

Rx offre la possibilité de refaire passer le traitement sur un autre scheduler via la méthode ObserveOn, et, dans le cas du dispatcher, directement via la méthode ObserveOnDispatcher.

playerService.GetPlayers() .ObserveOnDispatcher() .Subscribe(player => { Debug.WriteLine(Thread.CurrentThread.ManagedThreadId); _players.Add(player); }, error => { });

Il y a un effet que j’aime bien sur WP7, c’est le fait de voir les éléments “popper” un par un dans une liste. Je me suis donc fait mon petit opérateur pour cet effet… La méthode zip renvoie le résullutat d’un selecteur que l’on applique à deux sources observables. L’observer sera alors notifié dès qu’un élément sera dispo dans les deux collections. Ainsi si je fais un zip entre mon observable de player et un timer de n ms, je recevrais bien un player tous les n ms.

public static IObservable<T> Cadence<T>(this IObservable<T> observable, TimeSpan cadence) { return observable.Zip(Observable.Timer(DateTime.Now, cadence), (r, t) => r); }

Je peux appliquer mon nouvel opérateur.

playerService.GetPlayers() .Cadence(TimeSpan.FromMilliseconds(100)) .ObserveOnDispatcher() .Subscribe(player => { _players.Add(player); }, error => { });

Il y a un point important qui peut encore poser problème : la création d’une webrequest peut lever une exception et nous n’avons pas géré ce cas. Par exemple avec …

var request = (HttpWebRequest)WebRequest.Create("htt://localhost/WcfService1/Service1.svc/Players");

Pour remédier à ce problème, nous allons créer une source observable comme vu dans le précédent article, et nous déporterons la création de la requête sur le threadpool. Ainsi la création ne sera plus bloquante et l’erreur pourra être gérée proprement via l’observer.

public static IObservable<WebResponse> GetResponseWithRetries(string url) { return Observable.Create<WebRequest>(observer => { Scheduler.ThreadPool.Schedule(() => { try { observer.OnNext((HttpWebRequest)WebRequest.Create(url)); observer.OnCompleted(); } catch (Exception e) { observer.OnError(e); } }); return () => { }; }) .SelectMany(r => Observable.FromAsyncPattern<WebResponse>(r.BeginGetResponse, r.EndGetResponse)()) .Timeout(TimeSpan.FromSeconds(20)) .Retry(3); }

La méthode de notre couche de service peut donc s’écrire de la manière suivante …

public IObservable<Player> GetPlayers() { return ObservableExtensions.GetResponseWithRetries("http://localhost/WcfService1/Service1.svc/Players") .SelectMany(response => { using (var responseStream = response.GetResponseStream()) { var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>)); return dataContractJsonSerializer.ReadObject(responseStream) as List<Player>; } }); }

Ou dans le cas d’une autre méthode…

public IObservable<int> GetPlayerRankValue(int id) { return ObservableExtensions.GetResponseWithRetries("http://localhost/WcfService1/Service1.svc/PlayerRank/" + id) .Select(response => { using (var responseStream = response.GetResponseStream()) { var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(Rank)); return dataContractJsonSerializer.ReadObject(responseStream) as Rank; } }) .Select(rank => rank.Value); }

Enfin, je peux même combiner très facilement le résultat de mes requêtes …

public IObservable<Player> GetPlayersWithRank() { return from player in GetPlayers() select new Player { Id = player.Id, Name = player.Name, Rank = GetPlayerRankValue(player.Id).Single() }; }

Voila, j’espère que cet article vous aura aidé à construire des applications plus réactives grâce à Rx !

Ce post vous a plu ? Ajoutez le dans vos favoris pour ne pas perdre de temps à le retrouver le jour où vous en aurez besoin :

Classé sous ,

# re: Reactive Extensions : Consommer des services avec Rx Partie 2, consommer un service @ lundi 30 avril 2012 16:31

Bien content de voir enfin des gens utiliser Rx (et correctement).

Juste un detail.

Dans ta methode GetResponseWithRetries, au lieu de faire un Schedule dans le Create tu pourrais simplement faire un SubscribeOn(Scheduler.ThreadPool) apres le Create :)

Aussi dans ta premiere methode GetPlayers il vaudrait mieux englober le FromAsyncPattern dans un Observable.Defer pour deferer l'execution jusqu'a ce que quelqu'un fasse un subscribe dessus.

Sinon rien a redire.

Sinon rien à redire.

broux

# re: Reactive Extensions : Consommer des services avec Rx Partie 2, consommer un service @ lundi 30 avril 2012 17:36

Merci !

Je note pour le subscribeon.

Ensuite pour le deffer, ce n'est pas nécessaire, avec Create, j'ai déjà une séquence qui n'est exécutée que lorsque que l'on s'y subscribe.

Groc

# re: Reactive Extensions : Consommer des services avec Rx Partie 2, consommer un service @ lundi 30 avril 2012 17:55

Oui oui pour le Create c'est pas utile mais je parlais de la toute premiere methode GetPlayers de l'article (celle qui utilise juste Observable.FromAsyncPattern)

broux

# re: Reactive Extensions : Consommer des services avec Rx Partie 2, consommer un service @ lundi 30 avril 2012 18:33

Ah d'acc !

Ben du coup oui :)

Groc


Les 10 derniers blogs postés

- Office 365: Script PowerShell pour assigner des droits Full Control à un groupe défini par Blog Technique de Romelard Fabrice le 04-30-2017, 09:22

- SharePoint 20XX: Script PowerShell pour exporter en CSV toutes les listes d’une ferme pour auditer le contenu avant migration par Blog Technique de Romelard Fabrice le 03-28-2017, 17:53

- Les pièges de l’installation de Visual Studio 2017 par Blog de Jérémy Jeanson le 03-24-2017, 13:05

- UWP or not UWP sur Visual Studio 2015 ? par Blog de Jérémy Jeanson le 03-08-2017, 19:12

- Désinstallation de .net Core RC1 Update 1 ou SDK de Core 1 Preview 2 par Blog de Jérémy Jeanson le 03-07-2017, 19:29

- Office 365: Ajouter un utilisateur ou groupe dans la liste des Site collection Administrator d’un site SharePoint Online via PowerShell et CSOM par Blog Technique de Romelard Fabrice le 02-24-2017, 18:52

- Office 365: Comment créer une document library qui utilise les ContentTypeHub avec PowerShell et CSOM par Blog Technique de Romelard Fabrice le 02-22-2017, 17:06

- [TFS] Supprimer en masse les dépendances à SQL Enterprise ou Developer avant de procéder à une migration par Blog de Jérémy Jeanson le 02-20-2017, 20:30

- Office 365: Attention au volume utilisé par les fichiers de Thèmes de SharePoint Online par Blog Technique de Romelard Fabrice le 02-07-2017, 18:19

- [SCVMM] Supprimer une machine bloquée par Blog de Jérémy Jeanson le 01-31-2017, 21:22