ConcurrentExclusiveSchedulerPair.cs
Go to the documentation of this file.
1 //
2 // ConcurrentExclusiveSchedulerPair.cs
3 //
4 // Authors:
5 // Jérémie "Garuma" Laval <jeremie.laval@gmail.com>
6 // Marek Safar <marek.safar@gmail.com>
7 //
8 // Copyright (c) 2011 Jérémie "Garuma" Laval
9 // Copyright 2012 Xamarin, Inc (http://www.xamarin.com)
10 //
11 // Permission is hereby granted, free of charge, to any person obtaining a copy
12 // of this software and associated documentation files (the "Software"), to deal
13 // in the Software without restriction, including without limitation the rights
14 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 // copies of the Software, and to permit persons to whom the Software is
16 // furnished to do so, subject to the following conditions:
17 //
18 // The above copyright notice and this permission notice shall be included in
19 // all copies or substantial portions of the Software.
20 //
21 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27 // THE SOFTWARE.
28 //
29 //
30 
31 #if NET_4_5
32 
33 using System;
34 using System.Threading;
37 using System.Diagnostics;
38 
39 namespace System.Threading.Tasks
40 {
41  [DebuggerDisplay ("Concurrent={ConcurrentTaskCount}, Exclusive={ExclusiveTaskCount}")]
42  [DebuggerTypeProxy (typeof (SchedulerDebuggerView))]
44  {
45  sealed class SchedulerDebuggerView
46  {
47  readonly ConcurrentExclusiveSchedulerPair owner;
48 
49  public SchedulerDebuggerView (ConcurrentExclusiveSchedulerPair owner)
50  {
51  this.owner = owner;
52  }
53 
54  public IEnumerable<Task> ScheduledConcurrent {
55  get {
56  return owner.concurrentTasks;
57  }
58  }
59 
60  public IEnumerable<Task> ScheduledExclusive {
61  get {
62  return owner.exclusiveTasks;
63  }
64  }
65 
66  public TaskScheduler TargetScheduler {
67  get {
68  return owner.target;
69  }
70  }
71 
72  }
73 
74  readonly int maxConcurrencyLevel;
75  readonly int maxItemsPerTask;
76 
77  readonly TaskScheduler target;
78  readonly TaskFactory factory;
79  readonly Action taskHandler;
80 
81  readonly ConcurrentQueue<Task> concurrentTasks = new ConcurrentQueue<Task> ();
82  readonly ConcurrentQueue<Task> exclusiveTasks = new ConcurrentQueue<Task> ();
83 
84  //readonly ReaderWriterLockSlim rwl = new ReaderWriterLockSlim ();
86  readonly InnerTaskScheduler concurrent;
87  readonly InnerTaskScheduler exclusive;
88 
89  int numTask;
90 
91  class InnerTaskScheduler : TaskScheduler
92  {
93  readonly ConcurrentExclusiveSchedulerPair scheduler;
94  readonly ConcurrentQueue<Task> queue;
95 
96  public InnerTaskScheduler (ConcurrentExclusiveSchedulerPair scheduler,
98  {
99  this.scheduler = scheduler;
100  this.queue = queue;
101  }
102 
103  public int TaskCount {
104  get {
105  return queue.Count;
106  }
107  }
108 
109  public override int MaximumConcurrencyLevel {
110  get {
111  return scheduler.maxConcurrencyLevel;
112  }
113  }
114 
115  protected internal override void QueueTask (Task t)
116  {
117  scheduler.DoQueue (t, queue);
118  }
119 
120  protected override bool TryExecuteTaskInline (Task task, bool taskWasPreviouslyQueued)
121  {
122  if (task.Status != TaskStatus.Created)
123  return false;
124 
125  task.RunSynchronously (scheduler.target);
126  return true;
127  }
128 
129  public void Execute (Task t)
130  {
131  TryExecuteTask (t);
132  }
133 
134  [MonoTODO ("Only useful for debugger support")]
135  protected override IEnumerable<Task> GetScheduledTasks ()
136  {
137  throw new NotImplementedException ();
138  }
139  }
140 
142  {
143  }
144 
145  public ConcurrentExclusiveSchedulerPair (TaskScheduler taskScheduler) : this (taskScheduler, taskScheduler.MaximumConcurrencyLevel)
146  {
147  }
148 
149  public ConcurrentExclusiveSchedulerPair (TaskScheduler taskScheduler, int maxConcurrencyLevel)
150  : this (taskScheduler, maxConcurrencyLevel, -1)
151  {
152  }
153 
154  public ConcurrentExclusiveSchedulerPair (TaskScheduler taskScheduler, int maxConcurrencyLevel, int maxItemsPerTask)
155  {
156  this.target = taskScheduler;
157  this.maxConcurrencyLevel = maxConcurrencyLevel;
158  this.maxItemsPerTask = maxItemsPerTask;
159  this.factory = new TaskFactory (taskScheduler);
160  this.taskHandler = InternalTaskProcesser;
161  this.concurrent = new InnerTaskScheduler (this, concurrentTasks);
162  this.exclusive = new InnerTaskScheduler (this, exclusiveTasks);
163  }
164 
165  public void Complete ()
166  {
167  completion.SetResult (null);
168  }
169 
170  public TaskScheduler ConcurrentScheduler {
171  get {
172  return concurrent;
173  }
174  }
175 
176  private int ConcurrentTaskCount {
177  get {
178  return concurrent.TaskCount;
179  }
180  }
181 
182  public TaskScheduler ExclusiveScheduler {
183  get {
184  return exclusive;
185  }
186  }
187 
188  private int ExclusiveTaskCount {
189  get {
190  return exclusive.TaskCount;
191  }
192  }
193 
194  public Task Completion {
195  get {
196  return completion.Task;
197  }
198  }
199 
200  void DoQueue (Task task, ConcurrentQueue<Task> queue)
201  {
202  queue.Enqueue (task);
203  SpinUpTasks ();
204  }
205 
206  void InternalTaskProcesser ()
207  {
208  int times = 0;
209 // const int lockWaitTime = 2;
210 
211  while (!concurrentTasks.IsEmpty || !exclusiveTasks.IsEmpty) {
212  if (maxItemsPerTask != -1 && ++times == maxItemsPerTask)
213  break;
214 
215  throw new NotImplementedException ();
216 /*
217  bool locked = false;
218 
219  try {
220  if (!concurrentTasks.IsEmpty && rwl.TryEnterReadLock (lockWaitTime)) {
221  locked = true;
222  Task task;
223  while (concurrentTasks.TryDequeue (out task)) {
224  RunTask (task);
225  }
226  }
227  } finally {
228  if (locked) {
229  rwl.ExitReadLock ();
230  locked = false;
231  }
232  }
233 
234  try {
235  if (!exclusiveTasks.IsEmpty && rwl.TryEnterWriteLock (lockWaitTime)) {
236  locked = true;
237  Task task;
238  while (exclusiveTasks.TryDequeue (out task)) {
239  RunTask (task);
240  }
241  }
242  } finally {
243  if (locked) {
244  rwl.ExitWriteLock ();
245  }
246  }
247 */
248  }
249 
250  // TODO: there's a race here, task adding + spinup check may be done while here
251  Interlocked.Decrement (ref numTask);
252  }
253 
254  void SpinUpTasks ()
255  {
256  int currentTaskNumber;
257  do {
258  currentTaskNumber = numTask;
259  if (currentTaskNumber >= maxConcurrencyLevel)
260  return;
261  } while (Interlocked.CompareExchange (ref numTask, currentTaskNumber + 1, currentTaskNumber) != currentTaskNumber);
262 
263  factory.StartNew (taskHandler);
264  }
265 
266  void RunTask (Task task)
267  {
268  concurrent.Execute (task);
269  }
270  }
271 }
272 
273 #endif
ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler, int maxConcurrencyLevel, int maxItemsPerTask)
ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler, int maxConcurrencyLevel)