Reactive Extensions – Dépôt asynchrone simple

6 minutes read

En Silverlight, tous les appels de webservices sont asynchrones. Dès lors, lorsque l’on implémente un dépôt en Silverlight, on doit faire les choses d’une manière différente de ce que nous aurions fait en Asp.Net ou WPF.

Prenons un exemple. On a un site web exposant une liste de tous les clients d’une société au travers d’un service WCF. On souhaite que notre application Silverlight accède à cette liste afin de l’afficher dans une ListBox. Le service en question peut retourner des dizaines de milliers d’enregistrements. A cause de cela, on ne peux pas récupérer tous les clients au sein d’un même appel.

Voici à quoi ressemble le service :

[ServiceContract(Name = "CustomerService")]
public interface ICustomerService
{
    [OperationContract]
    int Count();

    [OperationContract]
    IEnumerable<Customer> Get(int start, int count);
}

On peut voir que pour récupérer tous les clients, on doit d’abord savoir combien il y en a en utilisant la méthode Count et on appelle ensuite la méthode Get pour en récupérer un certain nombre. Voyons comment un dépôt asynchrone utilisant ce service serait implémenté de manière classique :

public class CustomerAsyncRepository
{
    public void GetAll(Action<IEnumerable<Customer>> callback)
    {
        var list = new List<Customer>();
        var client = new CustomerServiceClient();
        client.CountCompleted +=
            (sender, e) =>
            {
                if (e.Result > 1000)
                {
                    var state = new GetState { Count = e.Result, Offset = 0, Step = 500 };
                    ((CustomerServiceClient)sender).GetAsync(state.Offset, state.Step, state);
                }
                else ((CustomerServiceClient)sender).GetAsync(0, e.Result);
            };

        client.GetCompleted += (sender, e) =>
        {
            list.AddRange(e.Result);

            var state = e.UserState as GetState;

            if (state != null && state.Offset + state.Step < state.Count)
            {
                state.Offset += state.Step;
                ((CustomerServiceClient)sender).GetAsync(state.Offset, state.Step, state);
            }
            else {
                ((CustomerServiceClient)sender).CloseAsync();
                callback(list);
            }
        };

        client.CountAsync();
    }

    private class GetState {
        public int Offset { get; set; }
        public int Step { get; set; }
        public int Count { get; set; }
    }
}

La méthode GetAll prend un callback en paramètre. Il sera appelé lorsque tous les clients auront été récupérés, charge ensuite à l’appelant de traiter la liste des clients (ici on assigne cette liste à l’ItemsSource d’une ListBox). Cette méthode vérifie aussi le nombre de clients à récupérer. Si ce nombre est supérieur à 1000, alors la méthode passe dans un mode ou elle télécharge la liste des clients par paquet de 500. Pour pouvoir faire cela, on a créé la classe GetState qui représente l’état courant de l’opération de téléchargement de la liste des clients.

Voici comment la méthode GetAll est appelée et son retour utilisé :

var repository = new CustomerAsyncRepository();
repository.GetAll(customers => lstBox.ItemsSource = new ObservableCollection<Customer>(customers));

Que cette technique fonctionne et soit complètement asynchrone est une chose, mais il y a, à mon sens, quelques problèmes. Le principal problème est que l’on doit attendre que tous les clients aient été chargés avant de pouvoir invoquer la callback, ce qui peut prendre un temps assez long. Même si on avait déjà téléchargé la moitié des données, l’utilisateur de notre application devra attendre la fin du chargement complet avant de pouvoir commencer à les utiliser. Pour donner une chance à l’utilisateur de travailler avec les données déjà chargées, on pourrait invoquer la callback à chaque retour du webservice ; cependant la méthode appelante serait dans l’incapacité de savoir quand tous les éléments seraient chargés. Il y a toujours des moyens de contourner ou de corriger ce problème mais je vais m’arrêter là et nous allons voir comment utiliser les bases de Reactive Extensions pour faire une implémentation plus souple et puissante de ce dépôt.

Avec Rx, au lieu d’avoir un callback en paramètre, la méthode GetAll revoie un IObservable de Customer. Cet IObservable représente une collection asynchrone où les données nous sont poussées au lieu qu’on ne les récupère comme avec une IEnumerable (on appelle ceci la dualité entre IObservable et IEnumerable). De plus, IObservable fournit un moyen de savoir quand tous les éléments de la collection ont été récupérés ce qui est parfait pour nous.

Après un peu de refactorisation, le dépôt ressemble à ceci :

public class CustomerReactiveRepository
{
    public IObservable<Customer> GetAll()
    {
        return Observable.Create<Customer>(observer => OnSubscribe(observer));
    }

    private static Action OnSubscribe(IObserver<Customer> observer)
    {
        try {
            var client = new CustomerServiceClient();
            client.CountCompleted += (sender, e) =>
            {
                if (e.Result > 1000)
                {
                    var state = new GetState { Count = e.Result, Offset = 0, Step = 500 };
                    ((CustomerServiceClient)sender).GetAsync(state.Offset, state.Step, state);
                }
                else ((CustomerServiceClient)sender).GetAsync(0, e.Result);
            };

            client.GetCompleted += (sender, e) =>
            {
                foreach (var c in e.Result)
                    observer.OnNext(c);

                var state = e.UserState as GetState;

                if (state != null && state.Offset + state.Step < state.Count)
                {
                    state.Offset += state.Step;
                    ((CustomerServiceClient)sender).GetAsync(state.Offset, state.Step,
                                                                state);
                }
                else {
                    ((CustomerServiceClient)sender).CloseAsync();
                    observer.OnCompleted();
                }
            };

            client.CountAsync();
        }
        catch (Exception e)
        {
            observer.OnError(e);
        }

        return () => { };
    }

    private class GetState {
        public int Offset { get; set; }
        public int Step { get; set; }
        public int Count { get; set; }
    }
}

La plupart du code reste inchangé ici, seules la signature de la méthode et la façon de retourner les clients à la fonction appelante change. La méthode GetAll renvoie maintenant un IObservable de Customer en utilisant la méthode Observable.Create. Cette méthode prend en paramètre un délégué vers une autre méthode,  invoquée lorsqu’une souscription à l’observable aura lieu et prenant en paramètre un IObserver de Customer. L’IObserver est à l’IObservable ce que l’IEnumerator est à l’IEnumerable.

IObserver fournit trois méthodes importantes :

  • OnNext (la méthode à invoquer lorsque l’on souhaite pousser un nouvel élément dans l’observable)
  • OnCompleted (la méthode à invoquer lorsque tous les éléments auront été poussés, ceci termine l’observable)
  • OnError (la méthode à invoquer lorsqu’une erreur s’est produite dans le processus de récupération des données à pousser dans l’observable, ceci termine le processus)

Dès que nous recevons un groupe de customers, on le pousse dans l’observable et ils deviennent immédiatement disponible au souscripteur.

La méthode OnSubscribe du code ci-dessus retourne un délégué vers la méthode à invoquer pour libérer les ressources utilisées par le processus de récupération des données. Cette méthode de libération sera invoquée lorsque l’observable sera terminée. Ici nous n’avons rien à libérer alors on renvoie juste une méthode vide.

Maintenant voyons comment ce dépôt est utilisé :

var customers = new ObservableCollection<Customer>();
lstBox.ItemsSource = customers;
var repository = new CustomerReactiveRepository();
repository.GetAll().Subscribe(customers.Add);

On crée une ObservableCollection et on l’utilise en tant qu’ItemsSource de la ListBox. On souscrit ensuite à l’observable renvoyé par la méthode GetAll en lui passant un délégué sur la méthode Add de l’ObservableCollection précédemment déclarée.

Ce faisant, la listbox sera alimentée dès que de nouveaux customers seront disponibles rendant l’expérience utilisateur bien meilleure.

En utilisant Rx on a aussi accès à tous ses opérateurs Linq, ce qui nous permet d’avoir un contrôle plus fin sur notre observable. Par exemple, on pourrait très facilement dire à Rx d’exécuter la récupération des customers dans un thread du pool de thread et de nous renvoyer les résultats sur le thread dispatcher ainsi :

repository.GetAll()
    .ObserveOn(new DispatcherSynchronizationContext())
    .SubscribeOn(Scheduler.ThreadPool)
    .Subscribe(customers.Add);

Dans cet article je n’ai présenté qu’un nombre limité des nombreux opérateurs facilitant encore plus le traitement des operations asynchrones offerts par les Reactive Extensions.

Je vous invite à télécharger les sources de l’application sur mon skydrive, afin que vous puissiez vous rendre compte par vous-même comment Rx améliore à la fois le code et l’expérience utilisateur.

PS : Benjamin Roux m’a proposé une version plus évoluée du dépôt implémentée avec Rx. Elle utilise un plus grand nombre de fonctions offertes par Rx et est plus en accord avec la philosophie de Rx. Si vous êtes plus à l’aise avec Rx que la cible de cet article cette implémentation peut vous plaire.

public static class ObservableExtensions
{
    public static IObservable<T> GetWithPages<T>(this IObservable<T> observable, GetState state, Func<int, int, IObservable<IEnumerable<T>>> getObservable)
    {
        for (int i = 0; i < state.Pages; i++)
        {
            observable = observable.Merge(getObservable(i * state.Step, Math.Min(state.Step, state.Count - i * state.Step)).SelectMany(__ => __));
        }

        return observable;
    }
}

public class CustomerAsyncRepository
{
    public IObservable<int> GetAll()
    {
        var client = (TestService.TestService)new TestServiceClient();

        var observable =
            Observable.Defer(() =>
                Observable.FromAsyncPattern(client.BeginCount, a => client.EndCount(a))()
                            .Select(c => new GetState { Count = c, Offset = 0, Step = 100 })
                            .SelectMany(g => Observable.Empty<int>().GetWithPages(g, Observable.FromAsyncPattern<int, int, IEnumerable<int>>(client.BeginGet, a => client.EndGet(a)))));

        return observable;
    }
}

public class GetState
{
    public int Offset { get; set; }
    public int Step { get; set; }
    public int Count { get; set; }

    public int Pages
    {
        get { return (int)Math.Ceiling(Count / (double)Step); }
    }
}

Updated:

Leave a Comment