Task Parallel Library : 2 of n
Demo code source : Tasks2.zip
Introduction
This is the 2nd part of my proposed series of articles on TPL. Last time I introduced Tasks, and covered this ground:
- Thread Versus Tasks
- Creating Tasks
- Trigger Methods/Properties
- Handling Exceptions
- Cancelling Tasks
- SynchronizationContext
Article Series Roadmap
This is article 2 of a possible 6, which I hope people will like. Shown below is the rough outline of what I would like to cover.
- Starting Tasks / Trigger Operations / ExceptionHandling / Cancelling / UI Synchronization
- Continuations / Cancelling Chained Tasks (This article)
- Parallel For / Custom Partioner / Aggregate Operations
- Parallel LINQ
- Pipelines
- Advanced Scenarios / v.Next For Tasks
Now I am aware that some folk will simply read this article and state that it is similar to what is currently available on MSDN, and I in part agree with that, however there are several reasons I have chosen to still take on the task of writing up these articles, which are as follows:
- It will only really be the first couple of articles which show simliar ideas to MSDN, after that I feel the material I will get into will not be on MSDN, and will be the result of some TPL research on my behalf, which I will be outlining in the article(s), so you will benefit from my research which you can just read...Aye, nice
- There will be screen shots of live output here which is something MSDN does not have that much off, which may help some readers to reinforce the article(s) text
- There may be some readers out here that have never even heard of Task Parallel Library so would not come across it in MSDN, you know the old story, you have to know what you are looking for in the 1st place thing.
- I enjoy threading articles, so like doing them, so I did them, will do them, have done them, and continue to do them
All that said, if people having read this article, truly think this is too similar to MSDN (which I still hope it won't be) let me know that as well, and I will try and adjust the upcoming articles to make amends.
Table Of Contents
Anyway what I am going to cover in this article is as follows:
- Some More TPL Background
- Continuation, What's that
- Simple continuation
- WPF Synchronization
- Continue "WhenAny"
- Continue "WhenAll"
- Using a continuation for exception handling
- Using continuation as pipeline
- Catching exception in continuation antecedent
- Cancelling a continuation
Some More TPL Background
This section is something I should really have talked about in the 1st article but I did not so I am including it here instead. I hope to explain why the designers of TPL did things the way they did and how that benefits us all.
The Default Task Scheduler
TPL relies on a scheduler to organise and run tasks, in .NET 4, the default
(which you can swap out) Task
scheduler is tightly intergrated with the
ThreadPool
. As such if you use the default Task
scheduler , the worker threads
that run the Task
s are managed by the ThreadPool
, where generally there are at
least as many worker Thread
s as there are cores on the target PC. When there are
more Task
s than there are worker Thread
s, some Task
s must be queued, until a
ThreadPool
worker thread becomes free to service the Task
.
This is a similar concept the one deployed by the existing
ThreadPool.QueueUserWorkItem(..)
. In fact you could think of the default Task
scheduler as an improved ThreadPool
, where the worker items are simply Task
s.
The default scheduler is capable of better performance than the standard
ThreadPool
alone as the number of cores increases, we shall examine that below.
The Standard ThreadPool
The ThreadPool
is basically a global First In First Out (FIFO) queue which
worker items are assigned to do the dequeued work.
This is ok until the number of cores increases, and then it becomes a bottleneck, as the queue can only be accesses bny one worker thread at a time. When there are only a few large course grained parallel items to deal with the synchronization cost of ensuring single access to this global queue is small, but when you have much finer grainer parallelism going on (as you would with Tasks), the synchronization costs of working with this single global queue begin to become a bottle neck.
Tasks were always designed to scale to the number of cores available, and I read somewhere that a .NET is capable of running efficiently with milllions of Tasks. To handle that a different approach had to be taken from the centralized queue, I will talk about this more decentralized approach to scheduling below.
DeCentralized Local Queues
The .NET framework provides each worker thread from the ThreadPool
with its
own local task queue. The local queus distribute the load and allieviate much of
the need to use the single global queue. You can see below that there are as
manay local queues as there are worker threads as well as the single global
queue, all of which operate concurrently.
The idea being that a worker thread may take from its local queue in a last in first out (LIFO) approach, where it might find work, or it may have to go back (and incur a heavier synchronization cost for doing so) to the single global queue.
There is one more trick that the TPL designers managed to get into play, if a
worker Thread
local queue is empty they could go back to the global queue for
more, but what the TPL designers did was get it to work steal from its
neighbours local queue in FIFO order.
Former MVP and now Microsoft employee Daniel Moth, has an excellent post with some highly intuitive diagrams to illustrate all this on his blog post :
http://www.danielmoth.com/Blog/New-And-Improved-CLR-4-Thread-Pool-Engine.aspx
It is well worth reading that post.
Anyway I am sorry about that slight divergence, I just felt I need to get that out there. Ok so now on to continuations
Continuation, What's that
Simply put continuations allow Task
s to be chained together. While this does
not sound like that much of a big deal by itself, what makes the continuation
concept really shine, is that you can have selective continuations, that is you
could have a continuation that only fires when a whole group of Task
s finish, or
a continuation that only fires when one of many Task
s finish, or we could have a
continuation that only fires when a Task
fails or is cancelled, continuations
afford us that level of freedom. And by using this freedom offered us by TPL we
can achieve very fine grain control over many aspects of our parallel code,
rather than just one monolithic chunk of threaded code.
In this article I have purposely designed the Task
chains to be quite small,
but you really can make these chains as small or as large as you see fit.
Simple continuation
Demo code project : SimpleContinuation
I have not really got too much to say about this small code snippet/demo apart from to say it is a continuation, and truth be known that is probably all I need to say, as that really is pretty much all there is to creating and using a continuation. Dead easy really.
// create the task
Task<List<int>> taskWithFactoryAndState =
Task.Factory.StartNew<List<int>>((stateObj) =>
{
List<int> ints = new List<int>();
for (int i = 0; i < (int)stateObj; i++)
{
ints.Add(i);
}
return ints;
}, 2000);
try
{
//setup a continuation for task
taskWithFactoryAndState.ContinueWith((ant) =>
{
List<int> result = ant.Result;
foreach (int resultValue in result)
{
Console.WriteLine("Task produced {0}", resultValue);
}
});
}
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("Caught exception '{0}'",
ex.Message));
}
}
Console.ReadLine();
And here is a small screen shot, not very exciting I know, it does get better though.
WPF Synchronization
Demo code project : WPFDispatcherSynchonizationContext
This is something that I covered in the previous article UI Synchronization and more specifically Synchronization, WPF Synchronization there is nothing I have changed to that code, but I have included it here again, you should read that article for the basis of what we are trying to solve, this article just included the code to show you that it is a very valid reason to use a TPL continuation, that is to marshall the thread back to a UI controls owning thread. As I say this code snippet will not make much sense unless you go and read the relevant sections of the 1st article.
private void btnDoIt_Click(object sender, RoutedEventArgs e)
{
//create CancellationTokenSource, so we can use the overload of
//the Task.Factory that allows us to pass in a SynchronizationContext
CancellationTokenSource tokenSource = new CancellationTokenSource();
CancellationToken token = tokenSource.Token;
Task taskWithFactoryAndState1 =
Task.Factory.StartNew<List<int>>((stateObj) =>
{
// This is not run on the UI thread.
List<int> ints = new List<int>();
for (int i = 0; i < (int)stateObj; i++)
{
ints.Add(i);
}
return ints;
}, 10000, token).ContinueWith(ant =>
{
//updates UI no problem as we are using correct SynchronizationContext
lstBox.ItemsSource = ant.Result;
}, token, TaskContinuationOptions.None,
TaskScheduler.FromCurrentSynchronizationContext());
}
And here is screen shot of the demo running
Continue "WhenAny"
Demo code project : ContinueWhenAny
One requirement when working with parrallel programming might be to to try and run a group of algorithms over a dataset and use the one that was the best performing. These algorithms could literally be anything from a custom experimental search. I did not have time to write a whole slew of ustom experimental search algorithms so rarther opted for something a little bit better known, "Sorting Algorithms". I am using examples from past C# competition winner http://www.codeproject.com/KB/recipes/SortVisualization.aspx by Kanasz Robert.
The basic idea here is that I want to wait only until the 1st algorithm (ie hopefully the fastest one) achieves its goal.
I have squirreled away the algorithms into a little helper Dll called
"ContinueWhen.Common
" in the attached VS2010 solution, but as
far as getting the concept of only waiting for one of many running tasks for a
continuation this code should be easy enough to understand without needing to
see the actual sorting algorithms.
static void Main(string[] args)
{
//create a list of random numbers to sort
Random rand = new Random();
List<int> unsortedList = new List<int>();
int numberOfItemsToSort = 5000;
for (int i = 0; i < numberOfItemsToSort; i++)
{
unsortedList.Add(rand.Next(numberOfItemsToSort));
}
//create 3 tasks to run 3 different sorting algorithms
Task<SortingTaskResult>[] tasks =
new Task<SortingTaskResult>[3];
//Bubble Sort Task
tasks[0] = Task.Factory.StartNew((state) =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
List<int> source = (List<int>)state;
List<int> localWorkList = new List<int>();
//copy
for (int i = 0; i < source.Count; i++)
{
localWorkList.Add(source[i]);
}
//run algorithm
List<int> result = SortingAlgorithms.BubbleSort(localWorkList);
watch.Stop();
return new SortingTaskResult(
watch.ElapsedMilliseconds, result, "Bubble Sort");
}, unsortedList);
//Selection Sort Task
tasks[1] = Task.Factory.StartNew((state) =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
List<int> source = (List<int>)state;
List<int> localWorkList = new List<int>();
//copy
for (int i = 0; i < source.Count; i++)
{
localWorkList.Add(source[i]);
}
//run algorithm
List<int> result = SortingAlgorithms.SelectionSort(localWorkList);
watch.Stop();
return new SortingTaskResult(
watch.ElapsedMilliseconds, result, "Selection Sort");
}, unsortedList);
//Counting Sort Task
tasks[2] = Task.Factory.StartNew((state) =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
List<int> source = (List<int>)state;
List<int> localWorkList = new List<int>();
//copy
for (int i = 0; i < source.Count; i++)
{
localWorkList.Add(source[i]);
}
//run algorithm
List<int> result = SortingAlgorithms.CountingSort(localWorkList);
watch.Stop();
return new SortingTaskResult(
watch.ElapsedMilliseconds, result, "Counting Sort");
}, unsortedList);
//Wait for any of them (assuming nothing goes wrong)
Task.Factory.ContinueWhenAny(
tasks,
(Task<SortingTaskResult> antecedent) =>
{
Console.WriteLine(antecedent.Result.ToString());
});
Console.ReadLine();
}
The above code shows us creating 3 Task
s, one for each sorting algorithm, and then a single continuation
that waits for Any (the 1st of the 3 Task
s to finish) of the Task
s.
Here is what is produced when the listing above is run.
It can be seen that "Selection Sort" won the race, even though it was not the
1st Task
to be started, it won as it is a better algorithm than
whatever other algorithm happened to have use of an available CPU core at that
time. I only have 2 CPU cores on the laptop I wrote this test code on, so
chances are if I had 4 CPU cores, that the 3rd algorithm may have ended up
winning, as on paper it is the better algorithm.
The other intersting thing to note is that because we are waiting for only 1
of the
Task
s in a group (array) to finish, we are ONLY able to use the
Result
from the single Task
that we waited on, as shown in this screen shot.
Continue "WhenAll"
Demo code project : ContinueWhenAll
ContinueWhenAll
is an interesting one, I could think
of a number of times this would be very useful, you have split up your parallel
work, but must wait for all the parts to finish before moving to the next step,
or to use the experimental algorithm idea again, we could also imagine that we
might be quite interested in seeing various characteristics about how our custom
alogorithms performed, so must wait for them all to complete before continuing.
I have chosen to again use the sorting algorithm, as its a simple concept, where by the idea is that we want to run various sorting algorithms over an unsorted list, and wait for all the different algorithms to complete before we can continue.
Here is the code that does that.
static void Main(string[] args)
{
//create a list of random numbers to sort
Random rand = new Random();
List<int> unsortedList = new List<int>();
int numberOfItemsToSort = 5000;
for (int i = 0; i < numberOfItemsToSort; i++)
{
unsortedList.Add(rand.Next(numberOfItemsToSort));
}
//create 3 tasks to run 3 different sorting algorithms
Task<SortingTaskResult>[] tasks =
new Task<SortingTaskResult>[3];
//Bubble Sort Task
tasks[0] = Task.Factory.StartNew((state) =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
List<int> source = (List<int>)state;
List<int> localWorkList = new List<int>();
//copy
for (int i = 0; i < source.Count; i++)
{
localWorkList.Add(source[i]);
}
//run algorithm
List<int> result = SortingAlgorithms.BubbleSort(localWorkList);
watch.Stop();
return new SortingTaskResult(
watch.ElapsedMilliseconds, result, "Bubble Sort");
}, unsortedList);
//Selection Sort Task
tasks[1] = Task.Factory.StartNew((state) =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
List<int> source = (List<int>)state;
List<int> localWorkList = new List<int>();
//copy
for (int i = 0; i < source.Count; i++)
{
localWorkList.Add(source[i]);
}
//run algorithm
List<int> result = SortingAlgorithms.SelectionSort(localWorkList);
watch.Stop();
return new SortingTaskResult(
watch.ElapsedMilliseconds, result, "Selection Sort");
}, unsortedList);
//Counting Sort Task
tasks[2] = Task.Factory.StartNew((state) =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
List<int> source = (List<int>)state;
List<int> localWorkList = new List<int>();
//copy
for (int i = 0; i < source.Count; i++)
{
localWorkList.Add(source[i]);
}
//run algorithm
List<int> result = SortingAlgorithms.CountingSort(localWorkList);
watch.Stop();
return new SortingTaskResult(
watch.ElapsedMilliseconds, result, "Counting Sort");
}, unsortedList);
//Wait for all of them (assuming nothing goes wrong)
Task.Factory.ContinueWhenAll(
tasks,
(antecedents) =>
{
foreach (Task<SortingTaskResult> task in antecedents)
{
Console.WriteLine(task.Result.ToString());
}
});
Console.ReadLine();
}
It can be seen that this time the continution only kicked in when all the 3 sorting Task
s
complete. Here is the results of running this snippet:
The other intersting thing to note is that because we are waiting for all
Task
s in a group (array) to finish, we are able to use the
Result
from all these Task
s which we waited on, as shown in this screen shot.
Using a continuation for exception handling
Demo code project : UsingContinuationForExceptionHandling
When I was talking about the different ways of how to
handle
Task Exceptions within the 1st article, I also mentioned that there was
another technique which I did not show at the time. Well now is the time to show
that other way. It is pretty simple really we simply use continuations. The idea
is that we have a continuation that is run if the antecedent Task
ran to
completion, and another if the antecedent Task
was put into the Faulted state.
This is easily achieved using the TaskContinuationOptions
that we can supply
when we create a Task continuation. Here is some example code to illustrate what
I mean
// create the task
Task<List<int>> taskWithFactoryAndState =
Task.Factory.StartNew<List<int>>((stateObj) =>
{
List<int> ints = new List<int>();
for (int i = 0; i < (int)stateObj; i++)
{
ints.Add(i);
if (i > 100)
{
InvalidOperationException ex =
new InvalidOperationException("oh no its > 100");
ex.Source = "taskWithFactoryAndState";
throw ex;
}
}
return ints;
}, 2000);
//and setup a continuation for it only on when faulted
taskWithFactoryAndState.ContinueWith((ant) =>
{
AggregateException aggEx = ant.Exception;
Console.WriteLine("OOOOPS : The Task exited with Exception(s)");
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("Caught exception '{0}'",
ex.Message));
}
}, TaskContinuationOptions.OnlyOnFaulted);
//and setup a continuation for it only on ran to completion
taskWithFactoryAndState.ContinueWith((ant) =>
{
List<int> result = ant.Result;
foreach (int resultValue in result)
{
Console.WriteLine("Task produced {0}", resultValue);
}
}, TaskContinuationOptions.OnlyOnRanToCompletion);
Console.ReadLine();
And to show you what happens when we run this lot, here is a demo
See how only one of the continuations ran, which is the one that should run "OnlyOnFaulted".
Using continuation as pipeline
Demo code project : UsingContinuationsAsPipelines
I did mention somewhere at the beginning of this article that you could use continuations to chain tasks together to make them as simple or as complex as you choose, I have not gone nuts or anything but I have come up with a small example shown below, which is just slightly larger than the examples you have seen so far. What it does illustrate though is the fact that you can quite easily continue a continuation.
static void Main(string[] args)
{
// create the task
Task<List<int>> taskWithFactoryAndState =
Task.Factory.StartNew<List<int>>((stateObj) =>
{
List<int> ints = new List<int>();
for (int i = 0; i < (int)stateObj; i++)
{
ints.Add(i);
}
return ints;
}, 10);
//and setup a continuation for it only on ran to completion, where this continuation
//returns a result too, which will be used by yet another continuation
taskWithFactoryAndState.ContinueWith<List<int>>((ant) =>
{
List<int> parentResult = ant.Result;
List<int> result = new List<int>();
foreach (int resultValue in parentResult)
{
Console.WriteLine("Parent Task produced {0}, which will be squared by continuation",
resultValue);
result.Add(resultValue * resultValue);
}
return result;
}, TaskContinuationOptions.OnlyOnRanToCompletion)
//Another continution
.ContinueWith((ant) =>
{
List<int> parentResult = ant.Result;
foreach (int resultValue in parentResult)
{
Console.WriteLine("Parent Continuation Task produced Square of {0}",
resultValue);
}
}, TaskContinuationOptions.OnlyOnRanToCompletion);
Console.ReadLine();
}
Its certainly not rocket science this one, all that I am doing is creating an
initial Task
that creates and returns a list of numbers, this List<int>
produced
by the 1st Task
, is then passed to a continuation where the original result from
the original Task
(the antecedent) is printed, and where a new List<int>
is created by obtaining a square of the original Task
(the antecedent) value
produced, the result of this continuation is then fed to yet another
continuation that prints the results of the squaring continuation Task
(the
antecedent to this continuation).
Each of the continuations assumes an ideal world and will only run if the
original Task
of the continuation runs to completion.
Here is a small demo of this one running
Catching exception in continuation antecedent
Demo code project : CatchExceptionInAntecedent
So we have now seen several examples of using
Task
s/continuations, and we have seen that we can use continuations to run when
things go to plan, and we can also run Task
s when the original Task
fails to
complete its job, but sometimes we might want simply have unspecified
continuation that always happens, even if the original Task
completes
successfully or not, and have the continuation decide what to do if something is
not right about the status of the original Task
.
Here is an example of how we would check for an Exception
in the original
Task in the continuation, where we are rethrowing the original Exception
that
was provided by the original Task
. Since in this example the continuation
rethrows an Exception
we need to make sure the Exception it will throw will be
observed in some way (I talked abou Exception
observing last time, when I talked
about trigger methods/properties such as Wait()
/ Result
), as such I Wait()
on
the continution.
Here is the code:
try
{
// create the task
Task<List<int>> taskWithFactoryAndState =
Task.Factory.StartNew<List<int>>((stateObj) =>
{
Console.WriteLine("In TaskWithFactoryAndState");
List<int> ints = new List<int>();
for (int i = 0; i < (int)stateObj; i++)
{
Console.WriteLine("taskWithFactoryAndState, creating Item: {0}", i);
ints.Add(i);
if (i == 5)
throw new InvalidOperationException("Don't like 5 its vulgar and dirty");
}
return ints;
}, 100);
//Setup a continuation which will not run
taskWithFactoryAndState.ContinueWith<List<int>>((ant) =>
{
if (ant.Status == TaskStatus.Faulted)
throw ant.Exception.InnerException;
Console.WriteLine("In Continuation, no problems in Antecedent");
List<int> parentResult = ant.Result;
List<int> result = new List<int>();
foreach (int resultValue in parentResult)
{
Console.WriteLine("Parent Task produced {0}, which will be squared by continuation",
resultValue);
result.Add(resultValue * resultValue);
}
return result;
});
//wait for the task to complete
taskWithFactoryAndState.Wait();
}
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("Caught exception '{0}'", ex.Message));
}
}
Console.WriteLine("Finished");
And here is a small demo screen shot of this running, see how we caught the
original Exception
of the original Task
, and rethrew successfully (preserving
the Exception
information) up to the point where the Exception
got caught by the
try/catch
.
Cancelling a continuation
Demo code project : CancellingContinuations
One of the most obvious things you may want to do with a continuation is
cancel it right? Well luckily you have already seen all the tricks of the trade
for doing this is the 1st
article, remember the CancellationTokenSource
object we looked at last time?
There is not much more to it than that, we create a new
CancellationTokenSource
, and pass the CancellationToken
from it to any TPL
Task
s/continuations we want to be affected when the CancellationToken
is
cancelled.
The same rules you saw in the
1st article, still
apply, we must be good and ensure that the Task
/continuation that expect, and
make use of a CancellationToken
throw an Exception
when cancel is requested
(remember this is vital to ensure the Task
transitions to the correct state.
Anyway I am probably talking too much, when the code speaks for itself, the
code itself is simple we have a original Task
that creates a List<int> which are
then used inside a continuation, where the original Task
s numbers are
squared/printed and returned. However 5 seconds after the original Task
is
created, the CancellationToken
that was passed to the original Task
and the
continuation is cancelled.
CancellationTokenSource tokenSource
= new CancellationTokenSource();
CancellationToken token = tokenSource.Token;
try
{
// create the task
Task<List<int>> taskWithFactoryAndState =
Task.Factory.StartNew<List<int>>((stateObj) =>
{
Console.WriteLine("In TaskWithFactoryAndState");
List<int> ints = new List<int>();
for (int i = 0; i < (int)stateObj; i++)
{
tokenSource.Token.ThrowIfCancellationRequested();
ints.Add(i);
Console.WriteLine("taskWithFactoryAndState, creating Item: {0}", i);
Thread.Sleep(1000); // simulate some work
}
return ints;
}, 10000, tokenSource.Token);
Thread.Sleep(5000); //wait 5 seconds then cancel the runnning Task
tokenSource.Cancel();
//Setup a continuation which will not run
taskWithFactoryAndState.ContinueWith<List<int>>((ant) =>
{
Console.WriteLine("In Continuation");
List<int> parentResult = ant.Result;
List<int> result = new List<int>();
foreach (int resultValue in parentResult)
{
Console.WriteLine("Parent Task produced {0}, which will be squared by continuation",
resultValue);
result.Add(resultValue * resultValue);
}
return result;
}, tokenSource.Token);
taskWithFactoryAndState.Wait();
}
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("Caught exception '{0}'", ex.Message));
}
}
finally
{
tokenSource.Dispose();
}
Console.WriteLine("Finished");
Console.ReadLine();
Here is the results:
Notice how the continuation did not even kick in at all, and we only created
5 items, that is due to the fact that the CancellationToken
cancel kicked in.
It's that easy to cancel a continuation, you have to love this TPL stuff man.
That's It For Now
I know this article did not have as much to say as the 1st, thing is continuations are suprisingly easy to get to grips with, so there was less to say, the next 2 articles, are likely to be of about the same sort of size as this, but the ones after that will have some more meat.
That is all I wanted to say in this in this article. I hope you liked it, and want more. If you did like this article, and would like more, could you spare some time to leave a comment and a vote. Many thanks.
Hopefully , see you at the next one, and the one after that, and the one after that, yes 6 in total, I better get busy
发表评论
Even may treatment, low amounts to. needs your friends male training a. Of person health, of effective best to.