Publié lundi 30 avril 2012 15:45 par Groc

Reactive Extensions : Consommer des services avec Rx Partie 2, consommer un service

Dans cet article, je vais montrer un exemple d’utilisation de rx pour la construction d’une couche de service dans une application wp7. Pour l’exemple, j’utilise un service qui expose en json une liste de joueurs et une méthode pour récupérer le score d’un joueur.

Basiquement, dans ce genre de scénario, avec ou sans rx, la première étape consiste à créer une web request.

var request = (HttpWebRequest)WebRequest.Create("http://localhost/WcfService1/Service1.svc/Players");

Le premier opérateur qui nous sera utile est celui qui gère automatiquement le pattern async utilisé par les webrequest. Ainsi, la méthode ci dessous me retournera une séquence observable de webresponse.

Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)()

A partir de là, je peux utiliser des opérateurs de projection tels que Select ou SelectMany afin d’extraire une liste d’objets depuis le flux json de la réponse. Si une exception était levée à ce stade, elle serait automatiquement captée par l’observer.

Ainsi, je peux écrire la première méthode de ma couche de service.

public IObservable<Player> GetPlayers() { var request = (HttpWebRequest)WebRequest.Create("http://localhost/WcfService1/Service1.svc/Players"); return Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)() .SelectMany(response => { using (var responseStream = response.GetResponseStream()) { var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>)); return dataContractJsonSerializer.ReadObject(responseStream) as List<Player>; } }); }

Rx nous propose des opérateurs de base pour gérer automatiquement timeout et politique de retry. Ainsi, dans l’exemple ci dessous, en cas de congestion réseau, si le temps d’execution de la requête dépasse 20 secondes, une exception sera levée, et un nouvel essai aura lieu, et ainsi de suite 3 fois avant que l’exception soit remontée à mon observer.

public IObservable<Player> GetPlayers() { var request = (HttpWebRequest)WebRequest.Create("http://localhost/WcfService1/Service1.svc/Players"); return Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)() .Timeout(TimeSpan.FromSeconds(20)) .Retry(3) .SelectMany(response => { using (var responseStream = response.GetResponseStream()) { var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>)); return dataContractJsonSerializer.ReadObject(responseStream) as List<Player>; } }); }

Enfin, dans mon view model, mon observer recevra simplement les objets player un par un, et je pourrais gérer propement mon erreur.

playerService.GetPlayers() .Subscribe(player => { _players.Add(player); }, error => { });

Mamheureusement, si on execute l’application à ce stade, l’exception suivante apparaît.

image

En fait, le traitement effectué par la méthode FromAsyncPattern est déporté sur le thread pool. A partir de là, les opérations qui suivent sont executées sur le même thread, y compris les callbacks de l’observer.

Pour preuve, ajoutons quelques infos de debug …

private void Button_Click(object sender, RoutedEventArgs e) { Debug.WriteLine(Thread.CurrentThread.ManagedThreadId); var playerService = new PlayerService(); playerService.GetPlayers() .Subscribe(player => { Debug.WriteLine(Thread.CurrentThread.ManagedThreadId); _players.Add(player); }, error => { }); }
public IObservable<Player> GetPlayers() { var request = (HttpWebRequest)WebRequest.Create("http://localhost/WcfService1/Service1.svc/Players"); return Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)() .Timeout(TimeSpan.FromSeconds(20)) .Retry(3) .SelectMany(response => { Debug.WriteLine(Thread.CurrentThread.ManagedThreadId); using (var responseStream = response.GetResponseStream()) { var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>)); return dataContractJsonSerializer.ReadObject(responseStream) as List<Player>; } }); }

Rx offre la possibilité de refaire passer le traitement sur un autre scheduler via la méthode ObserveOn, et, dans le cas du dispatcher, directement via la méthode ObserveOnDispatcher.

playerService.GetPlayers() .ObserveOnDispatcher() .Subscribe(player => { Debug.WriteLine(Thread.CurrentThread.ManagedThreadId); _players.Add(player); }, error => { });

Il y a un effet que j’aime bien sur WP7, c’est le fait de voir les éléments “popper” un par un dans une liste. Je me suis donc fait mon petit opérateur pour cet effet… La méthode zip renvoie le résullutat d’un selecteur que l’on applique à deux sources observables. L’observer sera alors notifié dès qu’un élément sera dispo dans les deux collections. Ainsi si je fais un zip entre mon observable de player et un timer de n ms, je recevrais bien un player tous les n ms.

public static IObservable<T> Cadence<T>(this IObservable<T> observable, TimeSpan cadence) { return observable.Zip(Observable.Timer(DateTime.Now, cadence), (r, t) => r); }

Je peux appliquer mon nouvel opérateur.

playerService.GetPlayers() .Cadence(TimeSpan.FromMilliseconds(100)) .ObserveOnDispatcher() .Subscribe(player => { _players.Add(player); }, error => { });

Il y a un point important qui peut encore poser problème : la création d’une webrequest peut lever une exception et nous n’avons pas géré ce cas. Par exemple avec …

var request = (HttpWebRequest)WebRequest.Create("htt://localhost/WcfService1/Service1.svc/Players");

Pour remédier à ce problème, nous allons créer une source observable comme vu dans le précédent article, et nous déporterons la création de la requête sur le threadpool. Ainsi la création ne sera plus bloquante et l’erreur pourra être gérée proprement via l’observer.

public static IObservable<WebResponse> GetResponseWithRetries(string url) { return Observable.Create<WebRequest>(observer => { Scheduler.ThreadPool.Schedule(() => { try { observer.OnNext((HttpWebRequest)WebRequest.Create(url)); observer.OnCompleted(); } catch (Exception e) { observer.OnError(e); } }); return () => { }; }) .SelectMany(r => Observable.FromAsyncPattern<WebResponse>(r.BeginGetResponse, r.EndGetResponse)()) .Timeout(TimeSpan.FromSeconds(20)) .Retry(3); }

La méthode de notre couche de service peut donc s’écrire de la manière suivante …

public IObservable<Player> GetPlayers() { return ObservableExtensions.GetResponseWithRetries("http://localhost/WcfService1/Service1.svc/Players") .SelectMany(response => { using (var responseStream = response.GetResponseStream()) { var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>)); return dataContractJsonSerializer.ReadObject(responseStream) as List<Player>; } }); }

Ou dans le cas d’une autre méthode…

public IObservable<int> GetPlayerRankValue(int id) { return ObservableExtensions.GetResponseWithRetries("http://localhost/WcfService1/Service1.svc/PlayerRank/" + id) .Select(response => { using (var responseStream = response.GetResponseStream()) { var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(Rank)); return dataContractJsonSerializer.ReadObject(responseStream) as Rank; } }) .Select(rank => rank.Value); }

Enfin, je peux même combiner très facilement le résultat de mes requêtes …

public IObservable<Player> GetPlayersWithRank() { return from player in GetPlayers() select new Player { Id = player.Id, Name = player.Name, Rank = GetPlayerRankValue(player.Id).Single() }; }

Voila, j’espère que cet article vous aura aidé à construire des applications plus réactives grâce à Rx !

Ce post vous a plu ? Ajoutez le dans vos favoris pour ne pas perdre de temps à le retrouver le jour où vous en aurez besoin :

Classé sous ,

# re: Reactive Extensions : Consommer des services avec Rx Partie 2, consommer un service @ lundi 30 avril 2012 16:31

Bien content de voir enfin des gens utiliser Rx (et correctement).

Juste un detail.

Dans ta methode GetResponseWithRetries, au lieu de faire un Schedule dans le Create tu pourrais simplement faire un SubscribeOn(Scheduler.ThreadPool) apres le Create :)

Aussi dans ta premiere methode GetPlayers il vaudrait mieux englober le FromAsyncPattern dans un Observable.Defer pour deferer l'execution jusqu'a ce que quelqu'un fasse un subscribe dessus.

Sinon rien a redire.

Sinon rien à redire.

broux

# re: Reactive Extensions : Consommer des services avec Rx Partie 2, consommer un service @ lundi 30 avril 2012 17:36

Merci !

Je note pour le subscribeon.

Ensuite pour le deffer, ce n'est pas nécessaire, avec Create, j'ai déjà une séquence qui n'est exécutée que lorsque que l'on s'y subscribe.

Groc

# re: Reactive Extensions : Consommer des services avec Rx Partie 2, consommer un service @ lundi 30 avril 2012 17:55

Oui oui pour le Create c'est pas utile mais je parlais de la toute premiere methode GetPlayers de l'article (celle qui utilise juste Observable.FromAsyncPattern)

broux

# re: Reactive Extensions : Consommer des services avec Rx Partie 2, consommer un service @ lundi 30 avril 2012 18:33

Ah d'acc !

Ben du coup oui :)

Groc


Les 10 derniers blogs postés

- [TFS] Supprimer un projet de Visual Studio Online par Blog de Jérémy Jeanson le il y a 1 heure et 23 minutes

- Nouveau blog en anglais / New blog in english ! par Le blog de Patrick [MVP SharePoint] le 09-18-2014, 18:42

- [ #Yammer ] From Mailbox to Yammer and back / De votre messagerie vers Yammer et retour ! par Le blog de Patrick [MVP SharePoint] le 09-15-2014, 11:31

- [ #Office 365 ] New service settings panel / Nouveau panneau de paramétrage des services par Le blog de Patrick [MVP SharePoint] le 09-11-2014, 08:50

- Problème de déploiement pour une démo SharePoint/TFS? par Blog de Jérémy Jeanson le 09-10-2014, 21:52

- [ #Office365 ] Delve first impressions / Premières impressions sur Delve par Le blog de Patrick [MVP SharePoint] le 09-09-2014, 16:57

- [ #Office365 ] How to change Administration console language ? / Comment changer la langue de la console d’administration ? par Le blog de Patrick [MVP SharePoint] le 09-09-2014, 08:25

- [ #SharePoint 2013 ] Suppression de bases de données en état “Pas de Réponse” par Le blog de Patrick [MVP SharePoint] le 09-04-2014, 14:10

- Changer l’adresse d’une ferme Office Web Apps associée à SharePoint par Blog de Jérémy Jeanson le 09-01-2014, 22:21

- Une ferme #SharePoint 2013 dans @Azure en quelques clics (1ère partie) ! par Le blog de Patrick [MVP SharePoint] le 08-28-2014, 18:52