EnumerablePartitioner.cs
Go to the documentation of this file.
1 //
2 // EnumerablePartitioner.cs
3 //
4 // Author:
5 // Jérémie "Garuma" Laval <jeremie.laval@gmail.com>
6 //
7 // Copyright (c) 2009 Jérémie "Garuma" Laval
8 //
9 // Permission is hereby granted, free of charge, to any person obtaining a copy
10 // of this software and associated documentation files (the "Software"), to deal
11 // in the Software without restriction, including without limitation the rights
12 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 // copies of the Software, and to permit persons to whom the Software is
14 // furnished to do so, subject to the following conditions:
15 //
16 // The above copyright notice and this permission notice shall be included in
17 // all copies or substantial portions of the Software.
18 //
19 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25 // THE SOFTWARE.
26 
27 #if NET_4_0
28 
29 using System;
30 using System.Threading;
31 using System.Threading.Tasks;
33 
34 namespace System.Collections.Concurrent.Partitioners
35 {
36  // Represent a chunk partitioner
37  internal class EnumerablePartitioner<T> : OrderablePartitioner<T>
38  {
39  IEnumerable<T> source;
40 
41  const int InitialPartitionSize = 1;
42  const int PartitionMultiplier = 2;
43 
44  int initialPartitionSize;
45  int partitionMultiplier;
46 
47  public EnumerablePartitioner (IEnumerable<T> source)
48  : this (source, InitialPartitionSize, PartitionMultiplier)
49  {
50 
51  }
52 
53  // This is used to get striped partitionning (for Take and Skip for instance
54  public EnumerablePartitioner (IEnumerable<T> source, int initialPartitionSize, int partitionMultiplier)
55  : base (true, false, true)
56  {
57  this.source = source;
58  this.initialPartitionSize = initialPartitionSize;
59  this.partitionMultiplier = partitionMultiplier;
60  }
61 
62  public override IList<IEnumerator<KeyValuePair<long, T>>> GetOrderablePartitions (int partitionCount)
63  {
64  if (partitionCount <= 0)
65  throw new ArgumentOutOfRangeException ("partitionCount");
66 
67  IEnumerator<KeyValuePair<long, T>>[] enumerators
68  = new IEnumerator<KeyValuePair<long, T>>[partitionCount];
69 
70  PartitionerState state = new PartitionerState ();
71  IEnumerator<T> src = source.GetEnumerator ();
72  bool isSimple = initialPartitionSize == 1 && partitionMultiplier == 1;
73 
74  for (int i = 0; i < enumerators.Length; i++) {
75  enumerators[i] = isSimple ? GetPartitionEnumeratorSimple (src, state, i == enumerators.Length - 1) : GetPartitionEnumerator (src, state);
76  }
77 
78  return enumerators;
79  }
80 
81  // This partitioner that is simpler than the general case (don't use a list) is called in the case
82  // of initialPartitionSize == partitionMultiplier == 1
83  IEnumerator<KeyValuePair<long, T>> GetPartitionEnumeratorSimple (IEnumerator<T> src,
84  PartitionerState state,
85  bool last)
86  {
87  long index = -1;
88  var value = default (T);
89 
90  try {
91  do {
92  lock (state.SyncLock) {
93  if (state.Finished)
94  break;
95  if (state.Finished = !src.MoveNext ())
96  break;
97 
98  index = state.Index++;
99  value = src.Current;
100  }
101 
102  yield return new KeyValuePair<long, T> (index, value);
103  } while (!state.Finished);
104  } finally {
105  if (last)
106  src.Dispose ();
107  }
108  }
109 
110  IEnumerator<KeyValuePair<long, T>> GetPartitionEnumerator (IEnumerator<T> src, PartitionerState state)
111  {
112  int count = initialPartitionSize;
113  List<T> list = new List<T> ();
114 
115  while (!state.Finished) {
116  list.Clear ();
117  long ind = -1;
118 
119  lock (state.SyncLock) {
120  if (state.Finished)
121  break;
122 
123  ind = state.Index;
124 
125  for (int i = 0; i < count; i++) {
126  if (state.Finished = !src.MoveNext ()) {
127  if (list.Count == 0)
128  yield break;
129  else
130  break;
131  }
132 
133  list.Add (src.Current);
134  state.Index++;
135  }
136  }
137 
138  for (int i = 0; i < list.Count; i++)
139  yield return new KeyValuePair<long, T> (ind + i, list[i]);
140 
141  count *= partitionMultiplier;
142  }
143  }
144 
145  class PartitionerState
146  {
147  public bool Finished;
148  public long Index = 0;
149  public readonly object SyncLock = new object ();
150  }
151  }
152 }
153 #endif