Wednesday, 8 June 2011

Asynchronous Data Processing With the TPL Dataflow Library

The Microsoft Task Parallel Library (TPL) defines a set of data structures and methods for partitioning the processing performed by an application into asynchronous tasks. The primary abstraction implemented by the TPL is the Task class, which provides a layer over the .NET Framework ThreadPool enabling you to build applications that can take advantage of multi-threading quickly and easily. A key advantage of the TPL is that it is scalable, adapting automatically to the number of available processor cores and the current workload of the computer without the need for you to modify your code.

The TPL Dataflow Library (TDF) builds on the TPL with features that enable you to build services that can produce and consume data asynchronously. The TDF is based around a set of datatypes that can act as sources and sinks of data, and linkages that connect sources to sinks.

The simplest sink is provided by the generic ActionBlock type. This class implements a method named Post that receives data. You can specify the processing to be performed for each item received by associating a delegate with the ActionBlock object. The following example shows how to create an ActionBlock object that passes all integer data it receives to the method named ProcessData.

ActionBlock<int> intProcessor = new ActionBlock<int>(i => ProcessData(i));
...
// Send data to the intProcessor ActionBlock
intProcessor.Post(99);
intProcessor.Post(1);
intProcessor.Post(101);

As each item is received, it is handled asynchronously and the ProcessData method runs by using a Task scheduled on a separate thread. An overload is available for the ActionBlock constructor that enables you to specify the scheduler to use for arranging how and when to run each Task.

The ActionBlock class is an example of an executor block other executor block types are available, such as the TransformBlock and the TransformManyBlock classes. The TDF also includes a number of classes that simply implement data buffering; you can retrieve and process each item individually. A prime example is the generic BufferBlock class illustrated in the following example.

BufferBlock<int> buffer = new BufferBlock<int>();

// Add data to the BufferBlock
buffer.Post(99);
buffer.Post(100);

// Retrieve data from the BufferBlock
int data = buffer.Receive();
Task t = buffer.ReceiveAsync();

The Receive method of the BufferBlock class retrieves the next available item synchronously, blocking if no item is available. The ReceiveAsync method performs the receive operation asynchronously by using a Task. You can connect a BufferBlock object to an ActionBlock object by using the LinkTo method. In the following example, the data sent to the BufferBlock object is automatically forwarded to the ActionBlock object for asynchronous processing.
ActionBlock&ltint> intProcessor = new ActionBlock&ltint>(i => ProcessData(i));
BufferBlock<int> buffer = new BufferBlock<int>();

// Connect the ActionBlock to the BufferBlock
// - items received by the BufferBlock are passed to the ActionBlock for processing
buffer.LinkTo(intProcessor);
A useful variation on the BufferBlock type is the BroadcastBlock class. This buffer class enables you to link to multiple executor blocks simultaneously. Each item of data is passed to every linked executor block. You also provide a delegate to a cloning method that specifies any transformations to apply to the data before sending it to each executor block.

As a complete worked example, the following WPF application captures data generated by an event source and plots this data on a graph (the graphCanvas element that occupies the major part of the window). The user clicks the Plot Graph button to initiate the event source and start collecting data. The data is also logged to a file, just to show how to use a BroadcastBlock object to direct data to multiple destinations.



<Window x:Class="GraphDemo.GraphWindow"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
Title="Graph Demo" Height="343" Width="675" ResizeMode="CanResize" SizeToContent="WidthAndHeight">
<Grid>
<Button Height="23" HorizontalAlignment="Left" Margin="12,38,0,0" Name="plotButton" VerticalAlignment="Top" Width="75" Click="plotButton_Click">Plot Graph</Button>
<Canvas Height="304" Name="graphCanvas" Background="Black" Margin="105,0,0,0" VerticalAlignment="Top" HorizontalAlignment="Left" Width="548" />
</Grid>
</Window>
The following listing shows the complete code for the event source, implemented by the EventDataSource class. This class raises a DataPointGenerated event for each datum. The data itself consists of Point objects. This data is generated by the GenerateData method, which slowly performs a complex trigonometric calculation (the actual function is immaterial, but this method produces data which results in a rather pretty graph).
using System;
using System.Threading;
using System.Windows;

namespace GraphDemo
{
public delegate void DataPointGeneratedDelegate(object sender, PointEventArgs pt);

public class PointEventArgs : EventArgs
{
public Point Data;
}

class EventDataSource
{
#region public members

public event DataPointGeneratedDelegate DataPointGenerated = null;

public void GenerateData(int sizeOfXDimension, int sizeOfYDimension)
{
int a = sizeOfXDimension / 2;
int b = a * a;
int c = sizeOfYDimension / 2;

for (int x = 0; x < a; x++)
{
int s = x * x;
double p = Math.Sqrt(b - s);
for (double i = -p; i < p; i += 3)
{
double r = Math.Sqrt(s + i * i) / a;
double q = (r - 1) * Math.Sin(24 * r);
double y = i / 3 + (q * c);
Thread.Sleep(10);
Point pt = new Point((int)(-x + a), (int)(y + c));
raiseDataPointGeneratedEvent(new PointEventArgs { Data = pt });
pt = new Point((int)(x + a), (int)(y + c));
raiseDataPointGeneratedEvent(new PointEventArgs { Data = pt });
}
}
}

#endregion

#region private members

private void raiseDataPointGeneratedEvent(PointEventArgs pt)
{
if (DataPointGenerated != null)
DataPointGenerated(this, pt);
}

#endregion
}
}

Finally, the following code shows the application logic behind the GraphWindow window.
using System;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Media;
using System.Windows.Shapes;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Linq;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks.Dataflow;

namespace GraphDemo
{
public partial class GraphWindow : Window
{
private int pixelWidth = 600;
private int pixelHeight = 300;
private EventDataSource dataSource = null;

public GraphWindow()
{
InitializeComponent();
dataSource = new EventDataSource();
}

private void plotButton_Click(object sender, RoutedEventArgs e)
{
TaskScheduler uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();

BroadcastBlock<Point> pointBroadcaster = new BroadcastBlock<Point>(pt => pt);
ActionBlock<Point> graphPlotter = new ActionBlock<Point>(pt => plotPoint(pt, graphCanvas), new ExecutionDataflowBlockOptions { TaskScheduler = uiScheduler });
ActionBlock<Point> dataLogger = new ActionBlock<Point>(pt => logPoint(pt));
pointBroadcaster.LinkTo(graphPlotter);
pointBroadcaster.LinkTo(dataLogger);

dataSource.DataPointGenerated += (s, pt) => pointBroadcaster.Post<Point>(pt.Data);
Task.Factory.StartNew(() => dataSource.GenerateData(pixelWidth, pixelHeight));
}

// Log the data – code not shown
private void logPoint(Point pt)
{
...
}

// Plot the data as a graph on the specified Canvas
private void plotPoint(Point pointToPlot, Canvas c)
{
Rectangle point = new Rectangle();
point.Width = 0.5;
point.Height = 0.5;
point.Fill = new SolidColorBrush(Colors.White);

double xCoord = c.Width / (this.pixelWidth / pointToPlot.X);
double yCoord = c.Height / (this.pixelHeight / pointToPlot.Y);

Canvas.SetLeft(point, xCoord);
Canvas.SetTop(point, yCoord);
c.Children.Add(point);
}
}
}
The plotButton_Click method runs when the user clicks the Plot Button button. This method creates an instance of the BroadcastBlock class; the cloning method specified simply copies the data as-is to each executor block.

The graphPlotter ActionBlock executor takes each Point object passed to it and invokes the plotPoint method to plot the corresponding point on the graphCanvas element of the WPF window. Note that the Task created to actually run the plotPoint method must execute on the UI thread, so the ExecutiondataflowBlockOptions object provided to the ActionBlock constructor specifies that it should utilize the task scheduler for the user interface synchronization context. The dataLogger ActionBlock executor logs the same Point data to a file (the code is omitted from the example). In this case, the Task created to handle this data should not run on the UI thread, so the code does not specify any scheduler options allowing the TDF to employ its default scheduling semantics.

Both ActionBlock executors are attached to the pointBroadcaster buffer block by using the LinkTo method, before the code arranges to capture DataPointGenerated events form the event source and post them to the pointBroadcaster object. Finally, the code starts the event source running asynchronously.

When you start the application and click Plot Button, the application slowly displays the graph in the WPF window (the event source raises a pair of events for each data point every 10 milliseconds, but there are a large number of data points). However, the user interface is still responsive (you can move the window around) because the plotPoint and logPoint methods are executed asynchronously.


The TDF is a highly extensible collection of types for implementing asynchronous data processing based on the producer/consumer model. This article has shown some of the basic features, but there are several other executor and buffer classes available. Furthermore, you can define your own custom executor and buffer classes by implementing the ISourceBlock, ITargetBlock, and IPropagatorBlock interfaces of the TDF. Visit the Introduction to TPL Dataflow page on the Microsoft Web site to download the library and documentation.

No comments: