Publié lundi 30 avril 2012 15:45 par Groc

Reactive Extensions : Consommer des services avec Rx Partie 2, consommer un service

Dans cet article, je vais montrer un exemple d’utilisation de rx pour la construction d’une couche de service dans une application wp7. Pour l’exemple, j’utilise un service qui expose en json une liste de joueurs et une méthode pour récupérer le score d’un joueur.

Basiquement, dans ce genre de scénario, avec ou sans rx, la première étape consiste à créer une web request.

var request = (HttpWebRequest)WebRequest.Create("http://localhost/WcfService1/Service1.svc/Players");

Le premier opérateur qui nous sera utile est celui qui gère automatiquement le pattern async utilisé par les webrequest. Ainsi, la méthode ci dessous me retournera une séquence observable de webresponse.

Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)()

A partir de là, je peux utiliser des opérateurs de projection tels que Select ou SelectMany afin d’extraire une liste d’objets depuis le flux json de la réponse. Si une exception était levée à ce stade, elle serait automatiquement captée par l’observer.

Ainsi, je peux écrire la première méthode de ma couche de service.

public IObservable<Player> GetPlayers() { var request = (HttpWebRequest)WebRequest.Create("http://localhost/WcfService1/Service1.svc/Players"); return Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)() .SelectMany(response => { using (var responseStream = response.GetResponseStream()) { var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>)); return dataContractJsonSerializer.ReadObject(responseStream) as List<Player>; } }); }

Rx nous propose des opérateurs de base pour gérer automatiquement timeout et politique de retry. Ainsi, dans l’exemple ci dessous, en cas de congestion réseau, si le temps d’execution de la requête dépasse 20 secondes, une exception sera levée, et un nouvel essai aura lieu, et ainsi de suite 3 fois avant que l’exception soit remontée à mon observer.

public IObservable<Player> GetPlayers() { var request = (HttpWebRequest)WebRequest.Create("http://localhost/WcfService1/Service1.svc/Players"); return Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)() .Timeout(TimeSpan.FromSeconds(20)) .Retry(3) .SelectMany(response => { using (var responseStream = response.GetResponseStream()) { var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>)); return dataContractJsonSerializer.ReadObject(responseStream) as List<Player>; } }); }

Enfin, dans mon view model, mon observer recevra simplement les objets player un par un, et je pourrais gérer propement mon erreur.

playerService.GetPlayers() .Subscribe(player => { _players.Add(player); }, error => { });

Mamheureusement, si on execute l’application à ce stade, l’exception suivante apparaît.

image

En fait, le traitement effectué par la méthode FromAsyncPattern est déporté sur le thread pool. A partir de là, les opérations qui suivent sont executées sur le même thread, y compris les callbacks de l’observer.

Pour preuve, ajoutons quelques infos de debug …

private void Button_Click(object sender, RoutedEventArgs e) { Debug.WriteLine(Thread.CurrentThread.ManagedThreadId); var playerService = new PlayerService(); playerService.GetPlayers() .Subscribe(player => { Debug.WriteLine(Thread.CurrentThread.ManagedThreadId); _players.Add(player); }, error => { }); }
public IObservable<Player> GetPlayers() { var request = (HttpWebRequest)WebRequest.Create("http://localhost/WcfService1/Service1.svc/Players"); return Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)() .Timeout(TimeSpan.FromSeconds(20)) .Retry(3) .SelectMany(response => { Debug.WriteLine(Thread.CurrentThread.ManagedThreadId); using (var responseStream = response.GetResponseStream()) { var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>)); return dataContractJsonSerializer.ReadObject(responseStream) as List<Player>; } }); }

Rx offre la possibilité de refaire passer le traitement sur un autre scheduler via la méthode ObserveOn, et, dans le cas du dispatcher, directement via la méthode ObserveOnDispatcher.

playerService.GetPlayers() .ObserveOnDispatcher() .Subscribe(player => { Debug.WriteLine(Thread.CurrentThread.ManagedThreadId); _players.Add(player); }, error => { });

Il y a un effet que j’aime bien sur WP7, c’est le fait de voir les éléments “popper” un par un dans une liste. Je me suis donc fait mon petit opérateur pour cet effet… La méthode zip renvoie le résullutat d’un selecteur que l’on applique à deux sources observables. L’observer sera alors notifié dès qu’un élément sera dispo dans les deux collections. Ainsi si je fais un zip entre mon observable de player et un timer de n ms, je recevrais bien un player tous les n ms.

public static IObservable<T> Cadence<T>(this IObservable<T> observable, TimeSpan cadence) { return observable.Zip(Observable.Timer(DateTime.Now, cadence), (r, t) => r); }

Je peux appliquer mon nouvel opérateur.

playerService.GetPlayers() .Cadence(TimeSpan.FromMilliseconds(100)) .ObserveOnDispatcher() .Subscribe(player => { _players.Add(player); }, error => { });

Il y a un point important qui peut encore poser problème : la création d’une webrequest peut lever une exception et nous n’avons pas géré ce cas. Par exemple avec …

var request = (HttpWebRequest)WebRequest.Create("htt://localhost/WcfService1/Service1.svc/Players");

Pour remédier à ce problème, nous allons créer une source observable comme vu dans le précédent article, et nous déporterons la création de la requête sur le threadpool. Ainsi la création ne sera plus bloquante et l’erreur pourra être gérée proprement via l’observer.

public static IObservable<WebResponse> GetResponseWithRetries(string url) { return Observable.Create<WebRequest>(observer => { Scheduler.ThreadPool.Schedule(() => { try { observer.OnNext((HttpWebRequest)WebRequest.Create(url)); observer.OnCompleted(); } catch (Exception e) { observer.OnError(e); } }); return () => { }; }) .SelectMany(r => Observable.FromAsyncPattern<WebResponse>(r.BeginGetResponse, r.EndGetResponse)()) .Timeout(TimeSpan.FromSeconds(20)) .Retry(3); }

La méthode de notre couche de service peut donc s’écrire de la manière suivante …

public IObservable<Player> GetPlayers() { return ObservableExtensions.GetResponseWithRetries("http://localhost/WcfService1/Service1.svc/Players") .SelectMany(response => { using (var responseStream = response.GetResponseStream()) { var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>)); return dataContractJsonSerializer.ReadObject(responseStream) as List<Player>; } }); }

Ou dans le cas d’une autre méthode…

public IObservable<int> GetPlayerRankValue(int id) { return ObservableExtensions.GetResponseWithRetries("http://localhost/WcfService1/Service1.svc/PlayerRank/" + id) .Select(response => { using (var responseStream = response.GetResponseStream()) { var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(Rank)); return dataContractJsonSerializer.ReadObject(responseStream) as Rank; } }) .Select(rank => rank.Value); }

Enfin, je peux même combiner très facilement le résultat de mes requêtes …

public IObservable<Player> GetPlayersWithRank() { return from player in GetPlayers() select new Player { Id = player.Id, Name = player.Name, Rank = GetPlayerRankValue(player.Id).Single() }; }

Voila, j’espère que cet article vous aura aidé à construire des applications plus réactives grâce à Rx !

Ce post vous a plu ? Ajoutez le dans vos favoris pour ne pas perdre de temps à le retrouver le jour où vous en aurez besoin :

Classé sous ,

# re: Reactive Extensions : Consommer des services avec Rx Partie 2, consommer un service @ lundi 30 avril 2012 16:31

Bien content de voir enfin des gens utiliser Rx (et correctement).

Juste un detail.

Dans ta methode GetResponseWithRetries, au lieu de faire un Schedule dans le Create tu pourrais simplement faire un SubscribeOn(Scheduler.ThreadPool) apres le Create :)

Aussi dans ta premiere methode GetPlayers il vaudrait mieux englober le FromAsyncPattern dans un Observable.Defer pour deferer l'execution jusqu'a ce que quelqu'un fasse un subscribe dessus.

Sinon rien a redire.

Sinon rien à redire.

broux

# re: Reactive Extensions : Consommer des services avec Rx Partie 2, consommer un service @ lundi 30 avril 2012 17:36

Merci !

Je note pour le subscribeon.

Ensuite pour le deffer, ce n'est pas nécessaire, avec Create, j'ai déjà une séquence qui n'est exécutée que lorsque que l'on s'y subscribe.

Groc

# re: Reactive Extensions : Consommer des services avec Rx Partie 2, consommer un service @ lundi 30 avril 2012 17:55

Oui oui pour le Create c'est pas utile mais je parlais de la toute premiere methode GetPlayers de l'article (celle qui utilise juste Observable.FromAsyncPattern)

broux

# re: Reactive Extensions : Consommer des services avec Rx Partie 2, consommer un service @ lundi 30 avril 2012 18:33

Ah d'acc !

Ben du coup oui :)

Groc


Les 10 derniers blogs postés

- Office 365: Gestion des Thumbnails dans Office 365 Video Portal par Blog Technique de Romelard Fabrice le 07-22-2016, 17:09

- Office 365: Gestion des sous-titres dans Office 365 Video Portal par Blog Technique de Romelard Fabrice le 07-22-2016, 10:50

- SharePoint 2016 : Mais ou se trouve le “Open in file explorer” dans les modern libraries ??? par The Mit's Blog le 07-07-2016, 14:11

- Office 2016 : Au revoir le Document Information Panel (DIP) par The Mit's Blog le 07-05-2016, 17:59

- Error 1300 lors de la certification d’un app UWP sur le Store par Blog de Jérémy Jeanson le 07-02-2016, 22:46

- Lancez-vous un défi accessibilité ! par Blog de Jérémy Jeanson le 06-05-2016, 20:57

- C’est quoi l’accessibilité en informatique ? par Blog de Jérémy Jeanson le 06-05-2016, 17:24

- Module web pour rediriger les utilisateurs non autorisés par Blog de Jérémy Jeanson le 05-31-2016, 13:06

- Entity Framework et Delete par Blog de Jérémy Jeanson le 05-26-2016, 12:23

- Entity Framwork et mises à jour de données avec ou sans contraintes par Blog de Jérémy Jeanson le 05-24-2016, 13:04