Tuesday, 31 May 2011

Handling Events with the Microsoft Reactive Extensions

The Microsoft Reactive Extensions (RX) are a library of methods and types that extend the LINQ standard query operators to encompass event-based data sources and asynchronous operations. RX extends the ability of LINQ to handle dynamic observable collections.

In the traditional LINQ model, you define an enumerable collection of objects, and then iterate over that collection and process each item in turn. The collection that you iterate over must provide a means of enumerating elements in that collection, and so it commonly implements the IEnumerable interface (either directly or indirectly, possibly via the IQueryable interface). The IEnumerable interface defines the GetEnumerator method which returns an IEnumerator object. This object actually does the work of retrieving elements from the collection, providing a property called Current which returns the current item from the collection, and a method named MoveNext which is used to move on to the next item in the collection (returning true if there is such an item, or false if there are no more items). LINQ enables you to write code that looks like this, where customers is an enumerable collection of customer objects:
var customersAgedOver30 = from cust in customers
where cust.Age > 30
select cust;

foreach (var customer in customersAgedOver30)
{
ProcessCustomer(customer);
}

This much is “old hat”, and most .NET Framework developers are familiar with this model. However, the scheme specified by implementing the IEnumerable interface is focused on explicitly pulling data, on-demand, from the data source. If there is no data left, the MoveNext method of the enumerator returns false, and the application processing the data assumes that that is no more data to fetch so it stops trying to retrieve any more.

RX takes the view that not all collections match this behavior. Instead, RX enables you to define an observable collection. An observable collection is a dynamic set containing a potentially infinite number of elements; new elements may be added to the collection at any time. If you want to examine the data in this collection, then merely attempting to enumerate the elements that it contains is no longer an appropriate strategy (enumerating an infinite collection will take a very long time!). The Observer pattern provides an ideal solution to this problem, based on pushing data out to parties that are interested in it. This pattern has been around for years, and is well-documented by Gamma et al in their Design Patterns book.

The Observer pattern defines two types of entity: subjects that expose data, and observers that need to know when this data changes. A subject provides a means to enable observers to register their interest, and then notifies these observers when data is updated. In the past, the .NET Framework has supported the Observer pattern through the ObservableCollection class, but RX enables a more generalized implementation through the Observer and Notification types, together with a number of extension methods that make the functionality of these types accessible through LINQ. Using RX, you can easily convert an enumerable collection into an observable one simply by applying the ToObservable extension method, and then you can create an observer by using the Subscribe method of the observable collection; this method expects you to provide a delegate to a method that runs as each element in the collection is observed.

In its simplest form, you might simply consider RX as a simple recasting of code to iterate through enumerable collections. However, the real power of RX lies in being able to apply LINQ semantics to data that was previously non-enumerable. Consider a device capturing data such as the locations and magnitude of seismic waves caused by an earthquake and raising an event each time a new shock was detected. If you were writing a Windows program to capture and process this data, you would typically add a handler that listens to the corresponding events emitted by the device with code similar to this:

EventDataSource earthQuakeDataSource = new EventDataSource();
...
earthQuakeDataSource.EarthquakeDetected += (source, eventArgs) => ProcessData(eventArgs);


Now suppose that you wanted to filter the data so that it only detected earthquakes of magnitude 5 or more. You might amend the code in this way.

earthQuakeDataSource.EarthquakeDetected += source, eventArgs) =>
{
if (eventArgs.Magntitude >= 5)
ProcessData(eventArgs);
};

Additionally, consider what you might need to do if the eventArgs object contains a lot of information that is superfluous to the ProcessData method, and that you only want to pass the data in the Location property (specifying the coordinates of the epicenter of the earthquake) of this object to the ProcessData method:

earthQuakeDataSource.EarthquakeDetected += (source, eventArgs) =>
{
if (eventArgs.Magntitude >= 5)
ProcessData(eventArgs.Location);
};

Each of these coding changes is arguably quite small, but each one starts to obscure the information that you are passing to the ProcessData method. As you add more conditions (for example, suppose you wanted to refine the data further and only capture the details of earthquakes recorded in Alaska), the code could quickly become much more complicated. If you need to change the magnitude and location requirements at a later date, you could quite easily miss the code that implements them.

If you think about it, what you are actually doing is applying a predicate to filter the data, and then performing a projection operation. This is exactly the sort of thing that LINQ is good at with its where and select operators. However, events are not an enumerable data source, and applying LINQ to them was quite tricky. RX fixes this.

With RX you can observe events using the static FromEventPattern method of the Observable class. This is a generic method that takes the event source and the name of the event as arguments. You can then subscribe to this observable collection and arrange for a piece of code to be run to be run each time a new event is detected, as follows.

var earthquakeEvents = Observable.FromEventPattern<QuakeEventArgs>(earthQuakeDataSource, "QuakeDetected");

var subscription = earthquakeEvents.Subscribe(args => ProcessData(args.EventArgs.Location));

Note: Although it exhibits some collection behavior, the earthquakeEvents variable is not really a true collection; the data defined for each event is not retained in any form of queryable structure, and once the corresponding notification to call the ProcessData method has been fired the data that defines the event is discarded.

The Observable class provides extension methods that enable you to apply LINQ operators, so to filter and project the earthquake event data as specified earlier you can simply add the appropriate where and select clauses:

var earthquakeEvents = from evt in Observable.FromEventPattern<QuakeEventArgs>(earthQuakeDataSource, "QuakeDetected")
where evt.Magnitude >= 5
select evt.EventArgs.Location;

var subscription = earthquakeEvents.Subscribe(args => ProcessData(args));

At first glance it might seem that the FromEventPattern really does little more than provide an alternative syntax for handling events, but as soon as you adopt this approach you can quickly start to gain from many of the other benefits that RX provides. To extend the earthquake device scenario, suppose that you wanted to stop monitoring for earthquake events if the user pressed the Escape key on the keyboard. In the traditional approach, you might add another event handler to listen for keyboard events, filter these events to determine whether the user had pressed the Escape key, and then unsubscribe the earthquakeDataSource from the EarthquakeDetected event. You can achieve the same results with RX by observing the KeyDown event, and then applying the TakeUntil method to combine the two event observations together, as follows:
var earthquakeEvents = from evt in Observable.FromEventPattern<QuakeEventArgs>(earthQuakeDataSource, "QuakeDetected")
where evt.Magnitude >= 5
select evt.EventArgs.Location;

var escapeKeyPressed = from key in Observable.FromEventPattern<KeyEventArgs>(this, "KeyDown")
where key.EventArgs.Key == Key.Escape
select key;

var dataUntilEscPressed = earthquakeEvents.TakeUntil(escapeKeyPressed);

var subscription = dataUntilEscPressed.Subscribe(args => ProcessData(args));

The TakeUntil method causes the subscription to the first observable event to be canceled if an instance of the event defined by its parameter occurs.

Using RX to subscribe events enables you to separate the code that handles the events from the definitions of those events, leading to more easily readable and maintainable code. Additionally, the ability that RX provides for composing and combining events together makes for a very elegant solution to many common event-handling problems.

No comments: