ConcurrentBag.cs
Go to the documentation of this file.
1 //
2 // ConcurrentBag.cs
3 //
4 // Author:
5 // Jérémie "Garuma" Laval <jeremie.laval@gmail.com>
6 //
7 // Copyright (c) 2009 Jérémie "Garuma" Laval
8 //
9 // Permission is hereby granted, free of charge, to any person obtaining a copy
10 // of this software and associated documentation files (the "Software"), to deal
11 // in the Software without restriction, including without limitation the rights
12 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 // copies of the Software, and to permit persons to whom the Software is
14 // furnished to do so, subject to the following conditions:
15 //
16 // The above copyright notice and this permission notice shall be included in
17 // all copies or substantial portions of the Software.
18 //
19 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25 // THE SOFTWARE.
26 
27 #if NET_4_0
28 using System;
29 using System.Collections;
31 using System.Diagnostics;
32 using System.Runtime.InteropServices;
33 
34 using System.Threading;
35 using System.Threading.Tasks;
36 
37 namespace System.Collections.Concurrent
38 {
39  [ComVisible (false)]
40  [DebuggerDisplay ("Count={Count}")]
41  [DebuggerTypeProxy (typeof (CollectionDebuggerView<>))]
42  public class ConcurrentBag<T> : IProducerConsumerCollection<T>, IEnumerable<T>, IEnumerable
43  {
44  // We store hints in a long
45  long hints;
46 
47  int count;
48  // The container area is where bag are added foreach thread
50  // The staging area is where non-empty bag are located for fast iteration
52 
53  public ConcurrentBag ()
54  {
55  }
56 
57  public ConcurrentBag (IEnumerable<T> collection) : this ()
58  {
59  foreach (T item in collection)
60  Add (item);
61  }
62 
63  public void Add (T item)
64  {
65  int index;
66  CyclicDeque<T> bag = GetBag (out index);
67  bag.PushBottom (item);
68  AddHint (index);
69  Interlocked.Increment (ref count);
70  }
71 
72  bool IProducerConsumerCollection<T>.TryAdd (T element)
73  {
74  Add (element);
75  return true;
76  }
77 
78  public bool TryTake (out T result)
79  {
80  result = default (T);
81 
82  if (count == 0)
83  return false;
84 
85  int hintIndex;
86  CyclicDeque<T> bag = GetBag (out hintIndex, false);
87  bool ret = true;
88 
89  if (bag == null || bag.PopBottom (out result) != PopResult.Succeed) {
90  var self = bag;
91  foreach (var other in staging) {
92  // Try to retrieve something based on a hint
93  ret = TryGetHint (out hintIndex) && (bag = container[hintIndex]).PopTop (out result) == PopResult.Succeed;
94 
95  // We fall back to testing our slot
96  if (!ret && other.Value != self) {
97  var status = other.Value.PopTop (out result);
98  while (status == PopResult.Abort)
99  status = other.Value.PopTop (out result);
100  ret = status == PopResult.Succeed;
101  hintIndex = other.Key;
102  bag = other.Value;
103  }
104 
105  // If we found something, stop
106  if (ret)
107  break;
108  }
109  }
110 
111  if (ret) {
112  TidyBag (hintIndex, bag);
113  Interlocked.Decrement (ref count);
114  }
115 
116  return ret;
117  }
118 
119  public bool TryPeek (out T result)
120  {
121  result = default (T);
122 
123  if (count == 0)
124  return false;
125 
126  int hintIndex;
127  CyclicDeque<T> bag = GetBag (out hintIndex, false);
128  bool ret = true;
129 
130  if (bag == null || !bag.PeekBottom (out result)) {
131  var self = bag;
132  foreach (var other in staging) {
133  // Try to retrieve something based on a hint
134  ret = TryGetHint (out hintIndex) && container[hintIndex].PeekTop (out result);
135 
136  // We fall back to testing our slot
137  if (!ret && other.Value != self)
138  ret = other.Value.PeekTop (out result);
139 
140  // If we found something, stop
141  if (ret)
142  break;
143  }
144  }
145 
146  return ret;
147  }
148 
149  void AddHint (int index)
150  {
151  // We only take thread index that can be stored in 5 bits (i.e. thread ids 1-15)
152  if (index > 0xF)
153  return;
154  var hs = hints;
155  // If cas failed then we don't retry
156  Interlocked.CompareExchange (ref hints, (long)(((ulong)hs) << 4 | (uint)index), (long)hs);
157  }
158 
159  bool TryGetHint (out int index)
160  {
161  /* Funny little thing to know, since hints is a long (because CAS has no ulong overload),
162  * a shift-right operation is an arithmetic shift which might set high-order right bits
163  * to 1 instead of 0 if the number turns negative.
164  */
165  var hs = hints;
166  index = 0;
167 
168  if (Interlocked.CompareExchange (ref hints, (long)(((ulong)hs) >> 4), hs) == hs)
169  index = (int)(hs & 0xF);
170 
171  return index > 0;
172  }
173 
174  public int Count {
175  get {
176  return count;
177  }
178  }
179 
180  public bool IsEmpty {
181  get {
182  return count == 0;
183  }
184  }
185 
186  object System.Collections.ICollection.SyncRoot {
187  get {
188  return this;
189  }
190  }
191 
192  bool System.Collections.ICollection.IsSynchronized {
193  get {
194  return true;
195  }
196  }
197 
198  IEnumerator IEnumerable.GetEnumerator ()
199  {
200  return GetEnumeratorInternal ();
201  }
202 
203  public IEnumerator<T> GetEnumerator ()
204  {
205  return GetEnumeratorInternal ();
206  }
207 
208  IEnumerator<T> GetEnumeratorInternal ()
209  {
210  foreach (var bag in container)
211  foreach (T item in bag.Value.GetEnumerable ())
212  yield return item;
213  }
214 
215  void System.Collections.ICollection.CopyTo (Array array, int index)
216  {
217  T[] a = array as T[];
218  if (a == null)
219  return;
220 
221  CopyTo (a, index);
222  }
223 
224  public void CopyTo (T[] array, int index)
225  {
226  int c = count;
227  if (array.Length < c + index)
228  throw new InvalidOperationException ("Array is not big enough");
229 
230  CopyTo (array, index, c);
231  }
232 
233  void CopyTo (T[] array, int index, int num)
234  {
235  int i = index;
236 
237  foreach (T item in this) {
238  if (i >= num)
239  break;
240 
241  array[i++] = item;
242  }
243  }
244 
245  public T[] ToArray ()
246  {
247  int c = count;
248  T[] temp = new T[c];
249 
250  CopyTo (temp, 0, c);
251 
252  return temp;
253  }
254 
255  int GetIndex ()
256  {
257  return Thread.CurrentThread.ManagedThreadId;
258  }
259 
260  CyclicDeque<T> GetBag (out int index, bool createBag = true)
261  {
262  index = GetIndex ();
263  CyclicDeque<T> value;
264  if (container.TryGetValue (index, out value))
265  return value;
266 
267  var bag = createBag ? container.GetOrAdd (index, new CyclicDeque<T> ()) : null;
268  if (bag != null)
269  staging.TryAdd (index, bag);
270  return bag;
271  }
272 
273  void TidyBag (int index, CyclicDeque<T> bag)
274  {
275  if (bag != null && bag.IsEmpty) {
276  if (staging.TryRemove (index, out bag) && !bag.IsEmpty)
277  staging.TryAdd (index, bag);
278  }
279  }
280  }
281 }
282 #endif
ConcurrentBag(IEnumerable< T > collection)