BlockingCollection.cs
Go to the documentation of this file.
1 //
2 // BlockingCollection.cs
3 //
4 // Copyright (c) 2008 Jérémie "Garuma" Laval
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 // THE SOFTWARE.
23 //
24 //
25 
26 #if NET_4_0
27 
28 using System;
29 using System.Threading;
30 using System.Collections;
32 using System.Diagnostics;
33 using System.Runtime.InteropServices;
34 
35 namespace System.Collections.Concurrent
36 {
37  [ComVisible (false)]
38  [DebuggerDisplay ("Count={Count}")]
39  [DebuggerTypeProxy (typeof (CollectionDebuggerView<>))]
40  public class BlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
41  {
42  const int spinCount = 5;
43 
44  readonly IProducerConsumerCollection<T> underlyingColl;
45  readonly int upperBound;
46 
47  AtomicBoolean isComplete;
48  long completeId;
49 
50  /* The whole idea of the collection is to use these two long values in a transactional
51  * way to track and manage the actual data inside the underlying lock-free collection
52  * instead of directly working with it or using external locking.
53  *
54  * They are manipulated with CAS and are guaranteed to increase over time and use
55  * of the instance thus preventing ABA problems.
56  */
57  long addId = long.MinValue;
58  long removeId = long.MinValue;
59 
60  /* These events are used solely for the purpose of having an optimized sleep cycle when
61  * the BlockingCollection have to wait on an external event (Add or Remove for instance)
62  */
63  ManualResetEventSlim mreAdd = new ManualResetEventSlim (true);
64  ManualResetEventSlim mreRemove = new ManualResetEventSlim (true);
65 
66  /* For time based operations, we share this instance of Stopwatch and base calculation
67  on a time offset at each of these method call */
68  static Stopwatch watch = Stopwatch.StartNew ();
69 
70  #region ctors
72  : this (new ConcurrentQueue<T> (), -1)
73  {
74  }
75 
76  public BlockingCollection (int boundedCapacity)
77  : this (new ConcurrentQueue<T> (), boundedCapacity)
78  {
79  }
80 
82  : this (collection, -1)
83  {
84  }
85 
86  public BlockingCollection (IProducerConsumerCollection<T> collection, int boundedCapacity)
87  {
88  this.underlyingColl = collection;
89  this.upperBound = boundedCapacity;
90  this.isComplete = new AtomicBoolean ();
91  }
92  #endregion
93 
94  #region Add & Remove (+ Try)
95  public void Add (T item)
96  {
97  Add (item, CancellationToken.None);
98  }
99 
100  public void Add (T item, CancellationToken cancellationToken)
101  {
102  TryAdd (item, -1, cancellationToken);
103  }
104 
105  public bool TryAdd (T item)
106  {
107  return TryAdd (item, 0, CancellationToken.None);
108  }
109 
110  public bool TryAdd (T item, int millisecondsTimeout, CancellationToken cancellationToken)
111  {
112  if (millisecondsTimeout < -1)
113  throw new ArgumentOutOfRangeException ("millisecondsTimeout");
114 
115  long start = millisecondsTimeout == -1 ? 0 : watch.ElapsedMilliseconds;
116  SpinWait sw = new SpinWait ();
117 
118  do {
119  cancellationToken.ThrowIfCancellationRequested ();
120 
121  long cachedAddId = addId;
122  long cachedRemoveId = removeId;
123 
124  // If needed, we check and wait that the collection isn't full
125  if (upperBound != -1 && cachedAddId - cachedRemoveId > upperBound) {
126  if (millisecondsTimeout == 0)
127  return false;
128 
129  if (sw.Count <= spinCount) {
130  sw.SpinOnce ();
131  } else {
132  mreRemove.Reset ();
133  if (cachedRemoveId != removeId || cachedAddId != addId) {
134  mreRemove.Set ();
135  continue;
136  }
137 
138  mreRemove.Wait (ComputeTimeout (millisecondsTimeout, start), cancellationToken);
139  }
140 
141  continue;
142  }
143 
144  // Check our transaction id against completed stored one
145  if (isComplete.Value && cachedAddId >= completeId)
146  ThrowCompleteException ();
147 
148  // Validate the steps we have been doing until now
149  if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) != cachedAddId)
150  continue;
151 
152  // We have a slot reserved in the underlying collection, try to take it
153  if (!underlyingColl.TryAdd (item))
154  throw new InvalidOperationException ("The underlying collection didn't accept the item.");
155 
156  // Wake up process that may have been sleeping
157  mreAdd.Set ();
158 
159  return true;
160  } while (millisecondsTimeout == -1 || (watch.ElapsedMilliseconds - start) < millisecondsTimeout);
161 
162  return false;
163  }
164 
165  public bool TryAdd (T item, TimeSpan timeout)
166  {
167  return TryAdd (item, (int)timeout.TotalMilliseconds);
168  }
169 
170  public bool TryAdd (T item, int millisecondsTimeout)
171  {
172  return TryAdd (item, millisecondsTimeout, CancellationToken.None);
173  }
174 
175  public T Take ()
176  {
177  return Take (CancellationToken.None);
178  }
179 
180  public T Take (CancellationToken cancellationToken)
181  {
182  T item;
183  TryTake (out item, -1, cancellationToken, true);
184 
185  return item;
186  }
187 
188  public bool TryTake (out T item)
189  {
190  return TryTake (out item, 0, CancellationToken.None);
191  }
192 
193  public bool TryTake (out T item, int millisecondsTimeout, CancellationToken cancellationToken)
194  {
195  return TryTake (out item, millisecondsTimeout, cancellationToken, false);
196  }
197 
198  bool TryTake (out T item, int milliseconds, CancellationToken cancellationToken, bool throwComplete)
199  {
200  if (milliseconds < -1)
201  throw new ArgumentOutOfRangeException ("milliseconds");
202 
203  item = default (T);
204  SpinWait sw = new SpinWait ();
205  long start = milliseconds == -1 ? 0 : watch.ElapsedMilliseconds;
206 
207  do {
208  cancellationToken.ThrowIfCancellationRequested ();
209 
210  long cachedRemoveId = removeId;
211  long cachedAddId = addId;
212 
213  // Empty case
214  if (cachedRemoveId == cachedAddId) {
215  if (milliseconds == 0)
216  return false;
217 
218  if (IsCompleted) {
219  if (throwComplete)
220  ThrowCompleteException ();
221  else
222  return false;
223  }
224 
225  if (sw.Count <= spinCount) {
226  sw.SpinOnce ();
227  } else {
228  mreAdd.Reset ();
229  if (cachedRemoveId != removeId || cachedAddId != addId) {
230  mreAdd.Set ();
231  continue;
232  }
233 
234  mreAdd.Wait (ComputeTimeout (milliseconds, start), cancellationToken);
235  }
236 
237  continue;
238  }
239 
240  if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) != cachedRemoveId)
241  continue;
242 
243  while (!underlyingColl.TryTake (out item));
244 
245  mreRemove.Set ();
246 
247  return true;
248 
249  } while (milliseconds == -1 || (watch.ElapsedMilliseconds - start) < milliseconds);
250 
251  return false;
252  }
253 
254  public bool TryTake (out T item, TimeSpan timeout)
255  {
256  return TryTake (out item, (int)timeout.TotalMilliseconds);
257  }
258 
259  public bool TryTake (out T item, int millisecondsTimeout)
260  {
261  item = default (T);
262 
263  return TryTake (out item, millisecondsTimeout, CancellationToken.None, false);
264  }
265 
266  static int ComputeTimeout (int millisecondsTimeout, long start)
267  {
268  return millisecondsTimeout == -1 ? 500 : (int)Math.Max (watch.ElapsedMilliseconds - start - millisecondsTimeout, 1);
269  }
270  #endregion
271 
272  #region static methods
273  static void CheckArray (BlockingCollection<T>[] collections)
274  {
275  if (collections == null)
276  throw new ArgumentNullException ("collections");
277  if (collections.Length == 0 || IsThereANullElement (collections))
278  throw new ArgumentException ("The collections argument is a 0-length array or contains a null element.", "collections");
279  }
280 
281  static bool IsThereANullElement (BlockingCollection<T>[] collections)
282  {
283  foreach (BlockingCollection<T> e in collections)
284  if (e == null)
285  return true;
286  return false;
287  }
288 
289  public static int AddToAny (BlockingCollection<T>[] collections, T item)
290  {
291  CheckArray (collections);
292  int index = 0;
293  foreach (var coll in collections) {
294  try {
295  coll.Add (item);
296  return index;
297  } catch {}
298  index++;
299  }
300  return -1;
301  }
302 
303  public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken cancellationToken)
304  {
305  CheckArray (collections);
306  int index = 0;
307  foreach (var coll in collections) {
308  try {
309  coll.Add (item, cancellationToken);
310  return index;
311  } catch {}
312  index++;
313  }
314  return -1;
315  }
316 
317  public static int TryAddToAny (BlockingCollection<T>[] collections, T item)
318  {
319  CheckArray (collections);
320  int index = 0;
321  foreach (var coll in collections) {
322  if (coll.TryAdd (item))
323  return index;
324  index++;
325  }
326  return -1;
327  }
328 
329  public static int TryAddToAny (BlockingCollection<T>[] collections, T item, TimeSpan timeout)
330  {
331  CheckArray (collections);
332  int index = 0;
333  foreach (var coll in collections) {
334  if (coll.TryAdd (item, timeout))
335  return index;
336  index++;
337  }
338  return -1;
339  }
340 
341  public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
342  {
343  CheckArray (collections);
344  int index = 0;
345  foreach (var coll in collections) {
346  if (coll.TryAdd (item, millisecondsTimeout))
347  return index;
348  index++;
349  }
350  return -1;
351  }
352 
353  public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout,
354  CancellationToken cancellationToken)
355  {
356  CheckArray (collections);
357  int index = 0;
358  foreach (var coll in collections) {
359  if (coll.TryAdd (item, millisecondsTimeout, cancellationToken))
360  return index;
361  index++;
362  }
363  return -1;
364  }
365 
366  public static int TakeFromAny (BlockingCollection<T>[] collections, out T item)
367  {
368  item = default (T);
369  CheckArray (collections);
370  int index = 0;
371  foreach (var coll in collections) {
372  try {
373  item = coll.Take ();
374  return index;
375  } catch {}
376  index++;
377  }
378  return -1;
379  }
380 
381  public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken cancellationToken)
382  {
383  item = default (T);
384  CheckArray (collections);
385  int index = 0;
386  foreach (var coll in collections) {
387  try {
388  item = coll.Take (cancellationToken);
389  return index;
390  } catch {}
391  index++;
392  }
393  return -1;
394  }
395 
396  public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item)
397  {
398  item = default (T);
399 
400  CheckArray (collections);
401  int index = 0;
402  foreach (var coll in collections) {
403  if (coll.TryTake (out item))
404  return index;
405  index++;
406  }
407  return -1;
408  }
409 
410  public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, TimeSpan timeout)
411  {
412  item = default (T);
413 
414  CheckArray (collections);
415  int index = 0;
416  foreach (var coll in collections) {
417  if (coll.TryTake (out item, timeout))
418  return index;
419  index++;
420  }
421  return -1;
422  }
423 
424  public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
425  {
426  item = default (T);
427 
428  CheckArray (collections);
429  int index = 0;
430  foreach (var coll in collections) {
431  if (coll.TryTake (out item, millisecondsTimeout))
432  return index;
433  index++;
434  }
435  return -1;
436  }
437 
438  public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout,
439  CancellationToken cancellationToken)
440  {
441  item = default (T);
442 
443  CheckArray (collections);
444  int index = 0;
445  foreach (var coll in collections) {
446  if (coll.TryTake (out item, millisecondsTimeout, cancellationToken))
447  return index;
448  index++;
449  }
450  return -1;
451  }
452  #endregion
453 
454  public void CompleteAdding ()
455  {
456  // No further add beside that point
457  completeId = addId;
458  isComplete.Value = true;
459  // Wakeup some operation in case this has an impact
460  mreAdd.Set ();
461  mreRemove.Set ();
462  }
463 
464  void ThrowCompleteException ()
465  {
466  throw new InvalidOperationException ("The BlockingCollection<T> has"
467  + " been marked as complete with regards to additions.");
468  }
469 
470  void ICollection.CopyTo (Array array, int index)
471  {
472  underlyingColl.CopyTo (array, index);
473  }
474 
475  public void CopyTo (T[] array, int index)
476  {
477  underlyingColl.CopyTo (array, index);
478  }
479 
480  public IEnumerable<T> GetConsumingEnumerable ()
481  {
482  return GetConsumingEnumerable (CancellationToken.None);
483  }
484 
485  public IEnumerable<T> GetConsumingEnumerable (CancellationToken cancellationToken)
486  {
487  while (true) {
488  T item = default (T);
489 
490  try {
491  item = Take (cancellationToken);
492  } catch {
493  // Then the exception is perfectly normal
494  if (IsCompleted)
495  break;
496  // otherwise rethrow
497  throw;
498  }
499 
500  yield return item;
501  }
502  }
503 
504  IEnumerator IEnumerable.GetEnumerator ()
505  {
506  return ((IEnumerable)underlyingColl).GetEnumerator ();
507  }
508 
509  IEnumerator<T> IEnumerable<T>.GetEnumerator ()
510  {
511  return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
512  }
513 
514  public void Dispose ()
515  {
516 
517  }
518 
519  protected virtual void Dispose (bool disposing)
520  {
521 
522  }
523 
524  public T[] ToArray ()
525  {
526  return underlyingColl.ToArray ();
527  }
528 
529  public int BoundedCapacity {
530  get {
531  return upperBound;
532  }
533  }
534 
535  public int Count {
536  get {
537  return underlyingColl.Count;
538  }
539  }
540 
541  public bool IsAddingCompleted {
542  get {
543  return isComplete.Value;
544  }
545  }
546 
547  public bool IsCompleted {
548  get {
549  return isComplete.Value && addId == removeId;
550  }
551  }
552 
553  object ICollection.SyncRoot {
554  get {
555  return underlyingColl.SyncRoot;
556  }
557  }
558 
559  bool ICollection.IsSynchronized {
560  get {
561  return underlyingColl.IsSynchronized;
562  }
563  }
564  }
565 }
566 #endif
static int AddToAny(BlockingCollection< T >[] collections, T item, CancellationToken cancellationToken)
static int TakeFromAny(BlockingCollection< T >[] collections, out T item)
static int TryAddToAny(BlockingCollection< T >[] collections, T item)
IEnumerable< T > GetConsumingEnumerable(CancellationToken cancellationToken)
bool TryAdd(T item, int millisecondsTimeout)
void Add(T item, CancellationToken cancellationToken)
static int TryTakeFromAny(BlockingCollection< T >[] collections, out T item, int millisecondsTimeout)
static int TryTakeFromAny(BlockingCollection< T >[] collections, out T item, int millisecondsTimeout, CancellationToken cancellationToken)
static int TakeFromAny(BlockingCollection< T >[] collections, out T item, CancellationToken cancellationToken)
T Take(CancellationToken cancellationToken)
static int TryAddToAny(BlockingCollection< T >[] collections, T item, TimeSpan timeout)
static int TryTakeFromAny(BlockingCollection< T >[] collections, out T item)
static int TryTakeFromAny(BlockingCollection< T >[] collections, out T item, TimeSpan timeout)
static int TryAddToAny(BlockingCollection< T >[] collections, T item, int millisecondsTimeout)
bool TryTake(out T item, int millisecondsTimeout)
bool TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken)
BlockingCollection(IProducerConsumerCollection< T > collection, int boundedCapacity)
static int TryAddToAny(BlockingCollection< T >[] collections, T item, int millisecondsTimeout, CancellationToken cancellationToken)
bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken)
bool TryTake(out T item, TimeSpan timeout)
BlockingCollection(IProducerConsumerCollection< T > collection)
static int AddToAny(BlockingCollection< T >[] collections, T item)