Parallel.cs
Go to the documentation of this file.
1 // Parallel.cs
2 //
3 // Copyright (c) 2008 Jérémie "Garuma" Laval
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22 //
23 //
24 
25 #if NET_4_0
26 using System;
29 using System.Threading;
30 using System.Runtime.InteropServices;
31 
32 namespace System.Threading.Tasks
33 {
34  public static class Parallel
35  {
36  internal static int GetBestWorkerNumber ()
37  {
38  return GetBestWorkerNumber (TaskScheduler.Current);
39  }
40 
41  internal static int GetBestWorkerNumber (TaskScheduler scheduler)
42  {
43  return Math.Min (Environment.ProcessorCount, (scheduler ?? TaskScheduler.Current).MaximumConcurrencyLevel);
44  }
45 
46  static int GetBestWorkerNumber (int from, int to, ParallelOptions options, out int step)
47  {
48  int num = GetBestWorkerNumber(options.TaskScheduler);
49  if (options != null && options.MaxDegreeOfParallelism != -1)
50  num = Math.Min (options.MaxDegreeOfParallelism, num);
51  // Integer range that each task process
52  if ((step = (to - from) / num) < 5) {
53  step = 5;
54  num = (to - from) / 5;
55  if (num < 1)
56  num = 1;
57  }
58 
59  return num;
60  }
61 
62  static void HandleExceptions (IEnumerable<Task> tasks)
63  {
64  HandleExceptions (tasks, null);
65  }
66 
67  static void HandleExceptions (IEnumerable<Task> tasks, ParallelLoopState.ExternalInfos infos)
68  {
69  List<Exception> exs = new List<Exception> ();
70  foreach (Task t in tasks) {
71  if (t.Exception != null)
72  exs.Add (t.Exception);
73  }
74 
75  if (exs.Count > 0) {
76  if (infos != null)
77  infos.IsExceptional = true;
78 
79  throw new AggregateException (exs).Flatten ();
80  }
81  }
82 
83  static void InitTasks (Task[] tasks, int count, Action action, ParallelOptions options)
84  {
85  TaskCreationOptions creation = TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent;
86 
87  for (int i = 0; i < count; i++) {
88  if (options == null)
89  tasks [i] = Task.Factory.StartNew (action, creation);
90  else
91  tasks [i] = Task.Factory.StartNew (action, options.CancellationToken, creation, options.TaskScheduler);
92  }
93  }
94 
95 #region For
96 
97  public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int> body)
98  {
99  return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
100  }
101 
102  public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body)
103  {
104  return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
105  }
106 
107  public static ParallelLoopResult For (int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int> body)
108  {
109  return For (fromInclusive, toExclusive, parallelOptions, (index, state) => body (index));
110  }
111 
112  public static ParallelLoopResult For (int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body)
113  {
114  return For<object> (fromInclusive, toExclusive, parallelOptions, () => null, (i, s, l) => { body (i, s); return null; }, _ => {});
115  }
116 
117  public static ParallelLoopResult For<TLocal> (int fromInclusive,
118  int toExclusive,
119  Func<TLocal> localInit,
120  Func<int, ParallelLoopState, TLocal, TLocal> body,
121  Action<TLocal> localFinally)
122  {
123  return For<TLocal> (fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
124  }
125 
126  public static ParallelLoopResult For<TLocal> (int fromInclusive,
127  int toExclusive,
128  ParallelOptions parallelOptions,
129  Func<TLocal> localInit,
130  Func<int, ParallelLoopState, TLocal, TLocal> body,
131  Action<TLocal> localFinally)
132  {
133  if (body == null)
134  throw new ArgumentNullException ("body");
135  if (localInit == null)
136  throw new ArgumentNullException ("localInit");
137  if (localFinally == null)
138  throw new ArgumentNullException ("localFinally");
139  if (parallelOptions == null)
140  throw new ArgumentNullException ("options");
141  if (fromInclusive >= toExclusive)
142  return new ParallelLoopResult (null, true);
143 
144  // Number of task toExclusive be launched (normally == Env.ProcessorCount)
145  int step;
146  int num = GetBestWorkerNumber (fromInclusive, toExclusive, parallelOptions, out step);
147 
148  Task[] tasks = new Task [num];
149 
150  StealRange[] ranges = new StealRange[num];
151  for (int i = 0; i < num; i++)
152  ranges[i] = new StealRange (fromInclusive, i, step);
153 
155 
156  int currentIndex = -1;
157 
158  Action workerMethod = delegate {
159  int localWorker = Interlocked.Increment (ref currentIndex);
160  StealRange range = ranges[localWorker];
161  int index = range.V64.Actual;
162  int stopIndex = localWorker + 1 == num ? toExclusive : Math.Min (toExclusive, index + step);
163  TLocal local = localInit ();
164 
165  ParallelLoopState state = new ParallelLoopState (infos);
166  CancellationToken token = parallelOptions.CancellationToken;
167 
168  try {
169  for (int i = index; i < stopIndex;) {
170  if (infos.IsStopped)
171  return;
172 
174 
175  if (i >= stopIndex - range.V64.Stolen)
176  break;
177 
178  if (infos.LowestBreakIteration != null && infos.LowestBreakIteration > i)
179  return;
180 
181  state.CurrentIteration = i;
182  local = body (i, state, local);
183 
184  if (i + 1 >= stopIndex - range.V64.Stolen)
185  break;
186 
187  range.V64.Actual = ++i;
188  }
189 
190  bool sixtyfour = IntPtr.Size == 8; // Environment.Is64BitProcess;
191 
192  // Try toExclusive steal fromInclusive our right neighbor (cyclic)
193  int len = num + localWorker;
194  for (int sIndex = localWorker + 1; sIndex < len; ++sIndex) {
195  int extWorker = sIndex % num;
196  range = ranges[extWorker];
197 
198  stopIndex = extWorker + 1 == num ? toExclusive : Math.Min (toExclusive, fromInclusive + (extWorker + 1) * step);
199  int stolen = -1;
200 
201  do {
202  do {
203  long old;
204  StealValue64 val = new StealValue64 ();
205 
206  old = sixtyfour ? range.V64.Value : Interlocked.CompareExchange (ref range.V64.Value, 0, 0);
207  val.Value = old;
208 
209  if (val.Actual >= stopIndex - val.Stolen - 2)
210  goto next;
211  stolen = (val.Stolen += 1);
212 
213  if (Interlocked.CompareExchange (ref range.V64.Value, val.Value, old) == old)
214  break;
215  } while (true);
216 
217  stolen = stopIndex - stolen;
218 
219  if (stolen > range.V64.Actual)
220  local = body (stolen, state, local);
221  else
222  break;
223  } while (true);
224 
225  next:
226  continue;
227  }
228  } finally {
229  localFinally (local);
230  }
231  };
232 
233  InitTasks (tasks, num, workerMethod, parallelOptions);
234 
235  try {
236  Task.WaitAll (tasks);
237  } catch {
238  HandleExceptions (tasks, infos);
239  }
240 
241  return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped || infos.IsExceptional));
242  }
243 
244  [StructLayout(LayoutKind.Explicit)]
245  struct StealValue64 {
246  [FieldOffset(0)]
247  public long Value;
248  [FieldOffset(0)]
249  public int Actual;
250  [FieldOffset(4)]
251  public int Stolen;
252  }
253 
254  class StealRange
255  {
256  public StealValue64 V64 = new StealValue64 ();
257 
258  public StealRange (int fromInclusive, int i, int step)
259  {
260  V64.Actual = fromInclusive + i * step;
261  }
262  }
263 
264 #endregion
265 
266 #region For (long)
267 
268  [MonoTODO]
269  public static ParallelLoopResult For (long fromInclusive, long toExclusive, Action<long> body)
270  {
271  return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
272  }
273 
274  [MonoTODO]
275  public static ParallelLoopResult For (long fromInclusive, long toExclusive, Action<long, ParallelLoopState> body)
276  {
277  return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
278  }
279 
280  [MonoTODO]
281  public static ParallelLoopResult For (long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long> body)
282  {
283  return For (fromInclusive, toExclusive, parallelOptions, (index, state) => body (index));
284  }
285 
286  [MonoTODO]
287  public static ParallelLoopResult For (long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long, ParallelLoopState> body)
288  {
289  return For<object> (fromInclusive, toExclusive, parallelOptions, () => null, (i, s, l) => { body (i, s); return null; }, _ => {});
290  }
291 
292  [MonoTODO]
293  public static ParallelLoopResult For<TLocal> (long fromInclusive,
294  long toExclusive,
295  Func<TLocal> localInit,
296  Func<long, ParallelLoopState, TLocal, TLocal> body,
297  Action<TLocal> localFinally)
298  {
299  return For<TLocal> (fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
300  }
301 
302  [MonoTODO ("See how this can be refactored with the above For implementation")]
303  public static ParallelLoopResult For<TLocal> (long fromInclusive,
304  long toExclusive,
305  ParallelOptions parallelOptions,
306  Func<TLocal> localInit,
307  Func<long, ParallelLoopState, TLocal, TLocal> body,
308  Action<TLocal> localFinally)
309  {
310  if (body == null)
311  throw new ArgumentNullException ("body");
312  if (localInit == null)
313  throw new ArgumentNullException ("localInit");
314  if (localFinally == null)
315  throw new ArgumentNullException ("localFinally");
316  if (parallelOptions == null)
317  throw new ArgumentNullException ("options");
318  if (fromInclusive >= toExclusive)
319  return new ParallelLoopResult (null, true);
320 
321  throw new NotImplementedException ();
322  }
323 
324 #endregion
325 
326 #region Foreach
327  static ParallelLoopResult ForEach<TSource, TLocal> (Func<int, IList<IEnumerator<TSource>>> enumerable, ParallelOptions options,
328  Func<TLocal> init, Func<TSource, ParallelLoopState, TLocal, TLocal> action,
329  Action<TLocal> destruct)
330  {
331  if (enumerable == null)
332  throw new ArgumentNullException ("source");
333  if (options == null)
334  throw new ArgumentNullException ("options");
335  if (action == null)
336  throw new ArgumentNullException ("action");
337  if (init == null)
338  throw new ArgumentNullException ("init");
339  if (destruct == null)
340  throw new ArgumentNullException ("destruct");
341 
342  int num = Math.Min (GetBestWorkerNumber (options.TaskScheduler),
343  options != null && options.MaxDegreeOfParallelism != -1 ? options.MaxDegreeOfParallelism : int.MaxValue);
344 
345  Task[] tasks = new Task[num];
347 
348  SimpleConcurrentBag<TSource> bag = new SimpleConcurrentBag<TSource> (num);
349  const int bagCount = 5;
350 
351  IList<IEnumerator<TSource>> slices = enumerable (num);
352 
353  int sliceIndex = -1;
354 
355  Action workerMethod = delegate {
356  IEnumerator<TSource> slice = slices[Interlocked.Increment (ref sliceIndex)];
357 
358  TLocal local = init ();
359  ParallelLoopState state = new ParallelLoopState (infos);
360  int workIndex = bag.GetNextIndex ();
361  CancellationToken token = options.CancellationToken;
362 
363  try {
364  bool cont = true;
365  TSource element;
366 
367  while (cont) {
368  if (infos.IsStopped || infos.IsBroken.Value)
369  return;
370 
372 
373  for (int i = 0; i < bagCount && (cont = slice.MoveNext ()); i++) {
374  bag.Add (workIndex, slice.Current);
375  }
376 
377  for (int i = 0; i < bagCount && bag.TryTake (workIndex, out element); i++) {
378  if (infos.IsStopped)
379  return;
380 
382 
383  local = action (element, state, local);
384  }
385  }
386 
387  while (bag.TrySteal (workIndex, out element)) {
389 
390  local = action (element, state, local);
391 
392  if (infos.IsStopped || infos.IsBroken.Value)
393  return;
394  }
395  } finally {
396  destruct (local);
397  }
398  };
399 
400  InitTasks (tasks, num, workerMethod, options);
401 
402  try {
403  Task.WaitAll (tasks);
404  } catch {
405  HandleExceptions (tasks, infos);
406  }
407 
408  return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped || infos.IsExceptional));
409  }
410 
411  public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, Action<TSource> body)
412  {
413  if (source == null)
414  throw new ArgumentNullException ("source");
415  if (body == null)
416  throw new ArgumentNullException ("body");
417 
418  return ForEach<TSource, object> (Partitioner.Create (source),
419  ParallelOptions.Default,
420  () => null,
421  (e, s, l) => { body (e); return null; },
422  _ => {});
423  }
424 
425  public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body)
426  {
427  if (source == null)
428  throw new ArgumentNullException ("source");
429  if (body == null)
430  throw new ArgumentNullException ("body");
431 
432  return ForEach<TSource, object> (Partitioner.Create (source),
433  ParallelOptions.Default,
434  () => null,
435  (e, s, l) => { body (e, s); return null; },
436  _ => {});
437  }
438 
439  public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source,
440  Action<TSource, ParallelLoopState, long> body)
441  {
442  if (source == null)
443  throw new ArgumentNullException ("source");
444  if (body == null)
445  throw new ArgumentNullException ("body");
446 
447 
448  return ForEach<TSource, object> (Partitioner.Create (source),
449  ParallelOptions.Default,
450  () => null,
451  (e, s, l) => { body (e, s, -1); return null; },
452  _ => {});
453  }
454 
455  public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
456  Action<TSource, ParallelLoopState> body)
457  {
458  if (body == null)
459  throw new ArgumentNullException ("body");
460 
461  return ForEach<TSource, object> (source,
462  ParallelOptions.Default,
463  () => null,
464  (e, s, l) => { body (e, s); return null; },
465  _ => {});
466  }
467 
468  public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source,
469  Action<TSource, ParallelLoopState, long> body)
470 
471  {
472  if (body == null)
473  throw new ArgumentNullException ("body");
474 
475  return ForEach<TSource, object> (source,
476  ParallelOptions.Default,
477  () => null,
478  (e, s, i, l) => { body (e, s, i); return null; },
479  _ => {});
480  }
481 
482  public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
483  Action<TSource> body)
484 
485  {
486  if (body == null)
487  throw new ArgumentNullException ("body");
488 
489  return ForEach<TSource, object> (source,
490  ParallelOptions.Default,
491  () => null,
492  (e, s, l) => { body (e); return null; },
493  _ => {});
494  }
495 
496  public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source,
497  ParallelOptions parallelOptions,
498  Action<TSource> body)
499  {
500  if (source == null)
501  throw new ArgumentNullException ("source");
502  if (body == null)
503  throw new ArgumentNullException ("body");
504 
505  return ForEach<TSource, object> (Partitioner.Create (source),
506  parallelOptions,
507  () => null,
508  (e, s, l) => { body (e); return null; },
509  _ => {});
510  }
511 
512  public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
513  Action<TSource, ParallelLoopState> body)
514  {
515  if (source == null)
516  throw new ArgumentNullException ("source");
517  if (body == null)
518  throw new ArgumentNullException ("body");
519 
520  return ForEach<TSource, object> (Partitioner.Create (source),
521  parallelOptions,
522  () => null,
523  (e, s, l) => { body (e, s); return null; },
524  _ => {});
525  }
526 
527  public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
528  Action<TSource, ParallelLoopState, long> body)
529  {
530  if (source == null)
531  throw new ArgumentNullException ("source");
532  if (body == null)
533  throw new ArgumentNullException ("body");
534 
535  return ForEach<TSource, object> (Partitioner.Create (source),
536  parallelOptions,
537  () => null,
538  (e, s, i, l) => { body (e, s, i); return null; },
539  _ => {});
540  }
541 
542  public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
543  Action<TSource, ParallelLoopState, long> body)
544 
545  {
546  if (body == null)
547  throw new ArgumentNullException ("body");
548 
549  return ForEach<TSource, object> (source,
550  parallelOptions,
551  () => null,
552  (e, s, i, l) => { body (e, s, i); return null; },
553  _ => {});
554  }
555 
556  public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
557  Action<TSource> body)
558  {
559  if (body == null)
560  throw new ArgumentNullException ("body");
561 
562  return ForEach<TSource, object> (source,
563  parallelOptions,
564  () => null,
565  (e, s, l) => { body (e); return null; },
566  _ => {});
567  }
568 
569  public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
570  Action<TSource, ParallelLoopState> body)
571  {
572  return ForEach<TSource, object> (source,
573  parallelOptions,
574  () => null,
575  (e, s, l) => { body (e, s); return null; },
576  _ => {});
577  }
578 
579  public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
580  Func<TSource, ParallelLoopState, TLocal, TLocal> body,
581  Action<TLocal> localFinally)
582  {
583  if (source == null)
584  throw new ArgumentNullException ("source");
585 
586  return ForEach<TSource, TLocal> ((Partitioner<TSource>)Partitioner.Create (source),
587  ParallelOptions.Default,
588  localInit,
589  body,
590  localFinally);
591  }
592 
593  public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
594  Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
595  Action<TLocal> localFinally)
596  {
597  return ForEach<TSource, TLocal> (Partitioner.Create (source),
598  ParallelOptions.Default,
599  localInit,
600  body,
601  localFinally);
602  }
603 
604  public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, Func<TLocal> localInit,
605  Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
606  Action<TLocal> localFinally)
607  {
608  return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
609  }
610 
611  public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, Func<TLocal> localInit,
612  Func<TSource, ParallelLoopState, TLocal, TLocal> body,
613  Action<TLocal> localFinally)
614  {
615  return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
616  }
617 
618  public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
619  Func<TLocal> localInit,
620  Func<TSource, ParallelLoopState, TLocal, TLocal> body,
621  Action<TLocal> localFinally)
622  {
623  if (source == null)
624  throw new ArgumentNullException ("source");
625 
626  return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
627  }
628 
629  public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
630  Func<TLocal> localInit,
631  Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
632  Action<TLocal> localFinally)
633  {
634  if (source == null)
635  throw new ArgumentNullException ("source");
636 
637  return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
638  }
639 
640  public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, ParallelOptions parallelOptions,
641  Func<TLocal> localInit,
642  Func<TSource, ParallelLoopState, TLocal, TLocal> body,
643  Action<TLocal> localFinally)
644  {
645  if (source == null)
646  throw new ArgumentNullException ("source");
647  if (body == null)
648  throw new ArgumentNullException ("body");
649 
650  return ForEach<TSource, TLocal> (source.GetPartitions, parallelOptions, localInit, body, localFinally);
651  }
652 
653  public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
654  Func<TLocal> localInit,
655  Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
656  Action<TLocal> localFinally)
657  {
658  if (source == null)
659  throw new ArgumentNullException ("source");
660  if (body == null)
661  throw new ArgumentNullException ("body");
662 
663  return ForEach<KeyValuePair<long, TSource>, TLocal> (source.GetOrderablePartitions,
664  parallelOptions,
665  localInit,
666  (e, s, l) => body (e.Value, s, e.Key, l),
667  localFinally);
668  }
669  #endregion
670 
671  #region Invoke
672  public static void Invoke (params Action[] actions)
673  {
674  if (actions == null)
675  throw new ArgumentNullException ("actions");
676 
677  Invoke (ParallelOptions.Default, actions);
678  }
679 
680  public static void Invoke (ParallelOptions parallelOptions, params Action[] actions)
681  {
682  if (parallelOptions == null)
683  throw new ArgumentNullException ("parallelOptions");
684  if (actions == null)
685  throw new ArgumentNullException ("actions");
686  if (actions.Length == 0)
687  throw new ArgumentException ("actions is empty");
688  foreach (var a in actions)
689  if (a == null)
690  throw new ArgumentException ("One action in actions is null", "actions");
691  if (actions.Length == 1) {
692  actions[0] ();
693  return;
694  }
695 
696  Task[] ts = new Task[actions.Length];
697  for (int i = 0; i < ts.Length; i++)
698  ts[i] = Task.Factory.StartNew (actions[i],
699  parallelOptions.CancellationToken,
700  TaskCreationOptions.None,
701  parallelOptions.TaskScheduler);
702 
703  try {
704  Task.WaitAll (ts, parallelOptions.CancellationToken);
705  } catch {
706  HandleExceptions (ts);
707  }
708  }
709  #endregion
710 
711  #region SpawnBestNumber, used by PLinq
712  internal static Task[] SpawnBestNumber (Action action, Action callback)
713  {
714  return SpawnBestNumber (action, -1, callback);
715  }
716 
717  internal static Task[] SpawnBestNumber (Action action, int dop, Action callback)
718  {
719  return SpawnBestNumber (action, dop, false, callback);
720  }
721 
722  internal static Task[] SpawnBestNumber (Action action, int dop, bool wait, Action callback)
723  {
724  // Get the optimum amount of worker to create
725  int num = dop == -1 ? (wait ? GetBestWorkerNumber () + 1 : GetBestWorkerNumber ()) : dop;
726 
727  // Initialize worker
728  CountdownEvent evt = new CountdownEvent (num);
729  Task[] tasks = new Task [num];
730  for (int i = 0; i < num; i++) {
731  tasks [i] = Task.Factory.StartNew (() => {
732  action ();
733  evt.Signal ();
734  if (callback != null && evt.IsSet)
735  callback ();
736  });
737  }
738 
739  // If explicitely told, wait for all workers to complete
740  // and thus let main thread participate in the processing
741  if (wait)
742  Task.WaitAll (tasks);
743 
744  return tasks;
745  }
746 #endregion
747  }
748 }
749 #endif
static ParallelLoopResult For(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action< long > body)
Definition: Parallel.cs:281
static ParallelLoopResult For(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action< long, ParallelLoopState > body)
Definition: Parallel.cs:287
static void Invoke(ParallelOptions parallelOptions, params Action[] actions)
Definition: Parallel.cs:680
static ParallelLoopResult For(int fromInclusive, int toExclusive, Action< int, ParallelLoopState > body)
Definition: Parallel.cs:102
static ParallelLoopResult For(long fromInclusive, long toExclusive, Action< long, ParallelLoopState > body)
Definition: Parallel.cs:275
static ParallelLoopResult For(int fromInclusive, int toExclusive, Action< int > body)
Definition: Parallel.cs:97
static ParallelLoopResult For(long fromInclusive, long toExclusive, Action< long > body)
Definition: Parallel.cs:269
static void Invoke(params Action[] actions)
Definition: Parallel.cs:672
static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action< int > body)
Definition: Parallel.cs:107
AggregateException Flatten()
static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action< int, ParallelLoopState > body)
Definition: Parallel.cs:112