Reactive Extensions - Simple asynchronous repository

5 minutes read

In Silverlight, all webservices calls are asynchronous. Therefore, when implementing a repository in Silverlight we have to do things a little bit differently as we would have done in Asp.Net or WPF.

Let’s take an example. We have a website exposing a list of customers through a WCF service. We want our Silverlight application to list all these customers inside a ListBox. The service can return tenth of thousands of customers. Because of that we cannot retrieve all of them within a single call.

Let’s see the definition of the Service :

[ServiceContract(Name = "CustomerService")]
public interface ICustomerService {
    [OperationContract]
    int Count();

    [OperationContract]
    IEnumerable<Customer> Get(int start, int count);
}

We can see that to retrieve all the customers, we first have to know how many there are using the Count method and then call the Get method to retrieve a certain number of them.

Let’s see how a traditional asynchronous implementation of a repository using this service could look like :

public class CustomerAsyncRepository
{
    public void GetAll(Action<IEnumerable<Customer>> callback)
    {
        var list = new List<Customer>();
        var client = new CustomerServiceClient();
        client.CountCompleted +=
            (sender, e) =>
            {
                if (e.Result > 1000)
                {
                    var state = new GetState { Count = e.Result, Offset = 0, Step = 500 };
                    ((CustomerServiceClient)sender).GetAsync(state.Offset, state.Step, state);
                }
                else ((CustomerServiceClient)sender).GetAsync(0, e.Result);
            };

        client.GetCompleted += (sender, e) =>
        {
            list.AddRange(e.Result);

            var state = e.UserState as GetState;

            if (state != null && state.Offset + state.Step < state.Count)
            {
                state.Offset += state.Step;
                ((CustomerServiceClient)sender).GetAsync(state.Offset, state.Step, state);
            }
            else {
                ((CustomerServiceClient)sender).CloseAsync();
                callback(list);
            }
        };

        client.CountAsync();
    }

    private class GetState {
        public int Offset { get; set; }
        public int Step { get; set; }
        public int Count { get; set; }
    }
}

The GetAll method takes a callback as parameter which we will call when all the customers have been retrieved. This callback will be called with the list of all the customers retrieved, it’s then up to the caller to do whatever necessary (here setting the ItemsSource of a listbox to the list of customers). It also checks the number of customers to retrieve and if there’s more than a thousand, it switches into a mode where it gets all the customers by group of 500 and by calling the service as many times as necessary. To be able to do that, we created the GetState class that represents the current state of the multiple service calls operation.

Here is how the GetAll method is called and its return used :

var repository = new CustomerAsyncRepository();
repository.GetAll(customers => lstBox.ItemsSource = new ObservableCollection<Customer>(customers));

While this technique works great and is completely asynchronous, it has some flaws. The main issue here is that we have to wait until we get all the customers to invoke the callback, and it could last for a minute or more. Even if we have already retrieved half of the customers, the user would not know it and would wait. To give the user a chance to work with the set of data already loaded, we could invoke the callback for every call of the webservice but we would then face the problem that the caller of the GetAll method would not know when all the customers have been retrieved. There’s still some ways to work around this but I’ll stop here and start to make things clean by using Rx.

With Rx, instead of having a callback as a parameter, the GetAll method would just return an IObservable of Customer. This IObservable represents an asynchronous collection of elements where the data is pushed to us instead of being pulled like with IEnumerable (this is called the duality between IEnumerable and IObservable). Furthermore, IObservable provides a way to know when all elements have been retrieved which is perfect for us.

After a little bit of refactoring the repository would look like this :

public class CustomerReactiveRepository
{
    public IObservable<Customer> GetAll()
    {
        return Observable.Create<Customer>(observer => OnSubscribe(observer));
    }

    private static Action OnSubscribe(IObserver<Customer> observer)
    {
        try {
            var client = new CustomerServiceClient();
            client.CountCompleted += (sender, e) =>
            {
                if (e.Result > 1000)
                {
                    var state = new GetState { Count = e.Result, Offset = 0, Step = 500 };
                    ((CustomerServiceClient)sender).GetAsync(state.Offset, state.Step, state);
                }
                else ((CustomerServiceClient)sender).GetAsync(0, e.Result);
            };

            client.GetCompleted += (sender, e) =>
            {
                foreach (var c in e.Result)
                    observer.OnNext(c);

                var state = e.UserState as GetState;

                if (state != null && state.Offset + state.Step < state.Count)
                {
                    state.Offset += state.Step;
                    ((CustomerServiceClient)sender).GetAsync(state.Offset, state.Step,
                                                                state);
                }
                else {
                    ((CustomerServiceClient)sender).CloseAsync();
                    observer.OnCompleted();
                }
            };

            client.CountAsync();
        }
        catch (Exception e)
        {
            observer.OnError(e);
        }

        return () => { };
    }

    private class GetState {
        public int Offset { get; set; }
        public int Step { get; set; }
        public int Count { get; set; }
    }
}

Most of the code here remains unchanged, only the signature of the method and the way the customers are returned really changes. The GetAll method now returns an IObservable of Customers by using the Observable.Create method. This method takes into parameter a delegate to the method to invoke when the observable will be subscribed to passing an IObserver of customer. The IObserver is for IObservable what IEnumerator is for IEnumerable.

IObserver provides three important methods :

  • OnNext (the method to invoke when we want to push a new item into the observable)
  • OnCompleted (the method to invoke when all the elements have be pushed, this terminates the observable)
  • OnError (the method to invoke when the process of getting all the elements encountered an error, this terminates the observable)

Whenever we receive new customers, we just push them into the observable and they become immediately available to the subscriber.

The OnSubscribe method returns a delegate to the method to invoke to free all the ressources used by the process. This method is invoked when the observable is terminated. Here we don’t have anything to free so we just return an empty method.

So now let’s see how this repository is used :

var customers = new ObservableCollection<Customer>();
lstBox.ItemsSource = customers;
var repository = new CustomerReactiveRepository();
repository.GetAll().Subscribe(customers.Add);

We create an ObservableCollection as before and use it as the ItemsSource of Listbox. We then subscribe to the observable returned by the GetAll method passing it a delegate to the Add method of the previously declared ObservableCollection.

Doing so, the listbox will be populated as new items become available, making the user experience much better.

By using Rx, we also have access to its Linq operators allowing us to easily put more controls to our collection. For exemple, we could easily tell Rx to execute the get operation in the threadpool’s thread and to give us the results back to the dispatcher thread like this :

repository.GetAll()
    .ObserveOn(new DispatcherSynchronizationContext())
    .SubscribeOn(Scheduler.ThreadPool)
    .Subscribe(customers.Add);

I invite you to download the sources used to write this article on my skydrive, so you can better understand how it improves both the code and the user experience.

Updated:

Leave a Comment