SystemFiberScheduler.cs
Go to the documentation of this file.
1 /*
2 
3 Author: Aaron Oneal, http://aarononeal.info
4 
5 Copyright (c) 2012 Spicy Pixel, http://spicypixel.com
6 
7 Permission is hereby granted, free of charge, to any person obtaining
8 a copy of this software and associated documentation files (the
9 "Software"), to deal in the Software without restriction, including
10 without limitation the rights to use, copy, modify, merge, publish,
11 distribute, sublicense, and/or sell copies of the Software, and to
12 permit persons to whom the Software is furnished to do so, subject to
13 the following conditions:
14 
15 The above copyright notice and this permission notice shall be
16 included in all copies or substantial portions of the Software.
17 
18 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
19 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
20 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
21 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
22 LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23 OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
24 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 
26 */
27 using System;
28 using System.Collections;
31 using System.Threading;
32 
33 namespace SpicyPixel.Threading
34 {
47  {
55  {
56  return StartNew(null, CancellationToken.None, 0f);
57  }
58 
71  public static SystemFiberScheduler StartNew(CancellationToken token, float updatesPerSecond = 0f)
72  {
73  return StartNew(null, token, updatesPerSecond);
74  }
75 
85  public static SystemFiberScheduler StartNew(Fiber fiber)
86  {
87  return StartNew(fiber, CancellationToken.None, 0f);
88  }
89 
105  public static SystemFiberScheduler StartNew(Fiber fiber, CancellationToken token, float updatesPerSecond = 0f)
106  {
107  SystemFiberScheduler backgroundScheduler = null;
108 
109  // Setup a thread to run the scheduler
110  var wait = new ManualResetEvent(false);
111  var thread = new Thread(() => {
112  backgroundScheduler = (SystemFiberScheduler)FiberScheduler.Current;
113  wait.Set();
114  FiberScheduler.Current.Run(fiber, token, updatesPerSecond);
115  });
116  thread.Start();
117  wait.WaitOne();
118 
119  return backgroundScheduler;
120  }
121 
125  private const int MaxStackDepth = 10;
126 
131  [ThreadStatic]
132  private static int stackDepthQueueFiber = 0;
133 
137  private ConcurrentQueue<Fiber> executingFibers = new ConcurrentQueue<Fiber>();
138 
143 
144  // A future queue may include waitingFibers (e.g. waiting on a signal or timeout)
145 
149  private float currentTime;
150 
154  private ManualResetEvent runWaitHandle = new ManualResetEvent(true);
155 
159  private ManualResetEvent disposeWaitHandle = new ManualResetEvent(false);
160 
165  private AutoResetEvent schedulerEventWaitHandle = new AutoResetEvent(false);
166 
176  protected ManualResetEvent RunWaitHandle
177  {
178  get { return runWaitHandle; }
179  }
180 
190  protected WaitHandle DisposeWaitHandle
191  {
192  get { return disposeWaitHandle; }
193  }
194 
212  protected WaitHandle SchedulerEventWaitHandle
213  {
214  get { return schedulerEventWaitHandle; }
215  }
216 
226  protected int ExecutingFiberCount
227  {
228  get { return executingFibers.Count; }
229  }
230 
240  protected int SleepingFiberCount
241  {
242  get { return sleepingFibers.Count; }
243  }
244 
249  {
250  }
251 
262  protected sealed override void QueueFiber(Fiber fiber)
263  {
264  // Queueing can happen from completion callbacks
265  // which may happen once the fiber has already
266  // executed and changed state. It would be fine
267  // if the queue did happen because non-running
268  // fibers are skipped, but it's better to
269  // shortcut here.
270  if(fiber.Status != FiberStatus.WaitingToRun)
271  return;
272 
273  // Entering queue fiber where recursion might matter
274  Interlocked.Increment(ref stackDepthQueueFiber);
275 
276  try
277  {
278  // Execute immediately to inline as much as possible
279  //
280  // Note: Some applications may want to always queue to control
281  // performance more strictly by the run loop.
282  if(AllowInlining && SchedulerThread == Thread.CurrentThread && stackDepthQueueFiber < MaxStackDepth)
283  {
284  ExecuteFiberInternal(fiber);
285  return;
286  }
287  else
288  {
289  QueueFiberForExecution(fiber);
290  return;
291  }
292  }
293  finally
294  {
295  // Exiting queue fiber
296  Interlocked.Decrement(ref stackDepthQueueFiber);
297  }
298  }
299 
306  private void QueueFiberForExecution(Fiber fiber)
307  {
308  executingFibers.Enqueue(fiber);
309 
310  // Queueing a new execution fiber needs to trigger re-evaluation of the
311  // next update time
312  schedulerEventWaitHandle.Set();
313  }
314 
324  private void QueueFiberForSleep(Fiber fiber, float timeToWake)
325  {
326  var tuple = new Tuple<Fiber, float>(fiber, timeToWake);
327  sleepingFibers.Enqueue(tuple);
328 
329  // Fibers can only be queued for sleep when they return
330  // a yield instruction. This can only happen when executing
331  // on the main thread and therefore we will never be in
332  // a wait loop with a need to signal the scheduler event handle.
333  }
334 
348  protected void Update(float time)
349  {
350  currentTime = time;
351 
352  UpdateExecutingFibers();
353  UpdateSleepingFibers();
354  }
355 
356  private void UpdateExecutingFibers()
357  {
359  // Run executing fibers
360 
361  // Add null to know when to stop
362  executingFibers.Enqueue(null);
363 
364  Fiber item;
365  while(executingFibers.TryDequeue(out item))
366  {
367  // If we reached the marker for this update then stop
368  if(item == null)
369  break;
370 
371  // Skip completed items
372  if(item.IsCompleted)
373  continue;
374 
375  ExecuteFiberInternal(item);
376  }
377  }
378 
379  private void UpdateSleepingFibers()
380  {
382  // Wake sleeping fibers that it's time for
383 
384  // Add null to know when to stop
385  sleepingFibers.Enqueue(null);
386 
387  Tuple<Fiber, float> item;
388  while(sleepingFibers.TryDequeue(out item))
389  {
390  // If we reached the marker for this update then stop
391  if(item == null)
392  break;
393 
394  Fiber fiber = item.Item1;
395 
396  // Skip completed items
397  if(fiber.IsCompleted)
398  continue;
399 
400  // Run if time or cancelled otherwise re-enqueue
401  if(item.Item2 <= currentTime || fiber.cancelToken.IsCancellationRequested)
402  ExecuteFiberInternal(item.Item1);
403  else
404  sleepingFibers.Enqueue(item);
405  }
406  }
407 
414  protected sealed override void AbortRequested(Fiber fiber)
415  {
416  schedulerEventWaitHandle.Set();
417  }
418 
435  protected bool GetNextFiberWakeTime(out float fiberWakeTime)
436  {
437  fiberWakeTime = -1f;
438 
439  // Nothig to do if there are no sleeping fibers
440  if(sleepingFibers.Count == 0)
441  return false;
442 
443  // Find the earliest wake time
444  foreach(var fiber in sleepingFibers)
445  {
446  if(fiber.Item1.cancelToken.IsCancellationRequested)
447  {
448  fiberWakeTime = 0f; // wake immediately
449  break;
450  }
451 
452  if(fiberWakeTime == -1f || fiber.Item2 < fiberWakeTime)
453  fiberWakeTime = fiber.Item2;
454  }
455 
456  return true;
457  }
458 
459  private IEnumerator CancelWhenComplete(Fiber waitOnFiber, CancellationTokenSource cancelSource)
460  {
461  yield return waitOnFiber;
462  cancelSource.Cancel();
463  }
464 
485  public override void Run(Fiber fiber, CancellationToken token, float updatesPerSecond)
486  {
487  long frequencyTicks = (long)(updatesPerSecond * (float)TimeSpan.TicksPerSecond); // min time between updates (duration)
488  long startTicks = 0; // start of update time (marker)
489  long endTicks = 0; // end of update time (marker)
490  long sleepTicks; // time to sleep (duration)
491  long wakeTicks; // ticks before wake (duration)
492  int sleepMilliseconds; // ms to sleep (duration)
493  int wakeMilliseconds; // ms before wake (duration)
494  float wakeMarkerInSeconds; // time of wake in seconds (marker)
495  var mainFiberCompleteCancelSource = new CancellationTokenSource();
496 
497  if(isDisposed)
498  throw new ObjectDisposedException(GetType().FullName);
499 
500  // Run is not re-entrant, make sure we're not running
501  if(!runWaitHandle.WaitOne(0))
502  throw new InvalidOperationException("Run is already executing and is not re-entrant");
503 
504  // Verify arguments
505  if(updatesPerSecond < 0f)
506  throw new ArgumentOutOfRangeException("updatesPerSecond", "The updatesPerSecond must be >= 0");
507 
508  // Get a base time for better precision
509  long baseTicks = DateTime.Now.Ticks;
510 
511  // Build wait list to terminate execution
512  var waitHandleList = new List<WaitHandle>(4);
513  waitHandleList.Add(schedulerEventWaitHandle);
514  waitHandleList.Add(disposeWaitHandle);
515 
516  if(token.CanBeCanceled)
517  waitHandleList.Add(token.WaitHandle);
518 
519  try
520  {
521  if(fiber != null)
522  {
523  // Add the main fiber to the wait list so when it completes
524  // the wait handle falls through.
525  waitHandleList.Add(mainFiberCompleteCancelSource.Token.WaitHandle);
526 
527  // Start the main fiber if it isn't running yet
528  if(fiber.Status == FiberStatus.Created)
529  fiber.Start(this);
530 
531  // Start another fiber that waits on the main fiber to complete.
532  // When it does, it raises a cancellation.
533  Fiber.Factory.StartNew(CancelWhenComplete(fiber, mainFiberCompleteCancelSource), this);
534  }
535 
536  WaitHandle[] waitHandles = waitHandleList.ToArray();
537 
538  // FIXME: Unclear why below was included as the handle
539  // seems to be needed to wake sleeping fibers when abort is called.
540  //waitHandleList.Remove(schedulerEventWaitHandle);
541 
542  WaitHandle[] sleepWaitHandles = waitHandleList.ToArray();
543 
544  runWaitHandle.Reset();
545 
546  while(true)
547  {
548  // Stop executing if cancelled
549  if((token.CanBeCanceled && token.IsCancellationRequested) || mainFiberCompleteCancelSource.IsCancellationRequested || disposeWaitHandle.WaitOne(0))
550  return;
551 
552  // Snap current time
553  startTicks = DateTime.Now.Ticks;
554 
555  // Update using this time marker (and convert ticks to s)
556  Update((float)((double)(startTicks - baseTicks) / (double)TimeSpan.TicksPerSecond));
557 
558  // Only sleep to next frequency cycle if one was specified
559  if(updatesPerSecond > 0f)
560  {
561  // Snap end time
562  endTicks = DateTime.Now.Ticks;
563 
564  // Sleep at least until next update
565  sleepTicks = frequencyTicks - (endTicks - startTicks);
566  if(sleepTicks > 0)
567  {
568  sleepMilliseconds = (int)(sleepTicks / TimeSpan.TicksPerMillisecond);
569 
570  WaitHandle.WaitAny(sleepWaitHandles, sleepMilliseconds);
571 
572  // Stop executing if cancelled
573  if((token.CanBeCanceled && token.IsCancellationRequested) || mainFiberCompleteCancelSource.IsCancellationRequested || disposeWaitHandle.WaitOne(0))
574  return;
575  }
576  }
577 
578  // Now keep sleeping until it's time to update
579  while(ExecutingFiberCount == 0)
580  {
581  // Assume we wait forever (e.g. until a signal)
582  wakeMilliseconds = -1;
583 
584  // If there are sleeping fibers, then set a wake time
585  if(GetNextFiberWakeTime(out wakeMarkerInSeconds))
586  {
587  wakeTicks = baseTicks;
588  wakeTicks += (long)((double)wakeMarkerInSeconds * (double)TimeSpan.TicksPerSecond);
589  wakeTicks -= DateTime.Now.Ticks;
590 
591  // If there was a waiting fiber and it's already past time to awake then stop waiting
592  if(wakeTicks <= 0)
593  break;
594 
595  wakeMilliseconds = (int)(wakeTicks / TimeSpan.TicksPerMillisecond);
596  }
597 
598  // FIXME: Sleeping tasks can be aborted and this should wake the scheduler.
599  // For some reason the schedulerEventWaitHandle which would do this was not
600  // in the wait list and removed above. Trying with it in the list again.
601 
602  // There was no waiting fiber and we will wait for another signal,
603  // or there was a waiting fiber and we wait until that time.
604  WaitHandle.WaitAny(waitHandles, wakeMilliseconds);
605 
606  // Stop executing if cancelled
607  if((token.CanBeCanceled && token.IsCancellationRequested) || mainFiberCompleteCancelSource.IsCancellationRequested || disposeWaitHandle.WaitOne(0))
608  return;
609  }
610  }
611  }
612  finally
613  {
614  // Clear queues
615  Fiber deqeueFiber;
616  while(executingFibers.TryDequeue(out deqeueFiber));
617 
618  Tuple<Fiber, float> dequeueSleepingFiber;
619  while(sleepingFibers.TryDequeue(out dequeueSleepingFiber));
620 
621  // Reset time
622  currentTime = 0f;
623 
624  // Set for dispose
625  runWaitHandle.Set();
626  }
627  }
628 
641  private void ExecuteFiberInternal(Fiber fiber)
642  {
643  Fiber currentFiber = fiber;
644  Fiber nextFiber;
645 
646  while(currentFiber != null)
647  {
648  // Execute the fiber
649  var fiberInstruction = ExecuteFiber(currentFiber);
650 
651  // Nothing more to do if stopped
652  if(currentFiber.IsCompleted)
653  return;
654 
655  // Handle special fiber instructions or queue for another update
656  bool fiberQueued = false;
657  OnFiberInstruction(currentFiber, fiberInstruction, out fiberQueued, out nextFiber);
658 
659  // If the fiber is still running but wasn't added to a special queue by
660  // an instruction then it needs to be added to the execution queue
661  // to run in the next Update().
662  //
663  // Check alive state again in case an instruction resulted
664  // in an inline execution and altered state.
665  if(!fiberQueued && !currentFiber.IsCompleted) {
666  // Send the fiber to the queue and don't execute inline
667  // since we're done this update
668  QueueFiberForExecution(currentFiber);
669  }
670 
671  // Switch to the next fiber if an instruction says to do so
672  currentFiber = nextFiber;
673  }
674  }
675 
676  private void OnFiberInstruction(Fiber fiber, FiberInstruction instruction, out bool fiberQueued, out Fiber nextFiber)
677  {
678  fiberQueued = false;
679  nextFiber = null;
680 
681  YieldUntilComplete yieldUntilComplete = instruction as YieldUntilComplete;
682  if(yieldUntilComplete != null)
683  {
684  // The additional complexity below is because this was going
685  // to handle waiting for completions for fibers from other threads.
686  // Currently fibers must belong to the same thread and this is enforced
687  // by the instructions themselves for now.
688 
689  int completeOnce = 0;
690 
691  // FIXME: If we support multiple schedulers in the future
692  // this callback could occur from another thread and
693  // therefore after Dispose(). Would probably need a lock.
694 
695  yieldUntilComplete.Fiber.ContinueWith ((f) => {
696  var originalCompleteOnce = Interlocked.CompareExchange(ref completeOnce, 1, 0);
697  if(originalCompleteOnce != 0)
698  return;
699 
700  QueueFiber(fiber); // optionally execute inline when the completion occurs
701  });
702 
703  fiberQueued = true;
704  return;
705  }
706 
707  YieldForSeconds yieldForSeconds = instruction as YieldForSeconds;
708  if(yieldForSeconds != null)
709  {
710  QueueFiberForSleep(fiber, currentTime + yieldForSeconds.Seconds);
711  fiberQueued = true;
712  return;
713  }
714 
715  YieldToFiber yieldToFiber = instruction as YieldToFiber;
716  if(yieldToFiber != null)
717  {
718  RemoveFiberFromQueues(yieldToFiber.Fiber);
719  nextFiber = yieldToFiber.Fiber;
720  fiberQueued = false;
721  return;
722  }
723  }
724 
735  private void RemoveFiberFromQueues(Fiber fiber)
736  {
737  bool found = false;
738 
739  if(executingFibers.Count > 0)
740  {
741  Fiber markerItem = new Fiber(() => {});
742  executingFibers.Enqueue(markerItem);
743 
744  Fiber item;
745  while(executingFibers.TryDequeue(out item))
746  {
747  if(item == markerItem)
748  break;
749 
750  if(item == fiber)
751  found = true;
752  else
753  executingFibers.Enqueue(item);
754  }
755 
756  if(found)
757  return;
758  }
759 
760  if(sleepingFibers.Count > 0)
761  {
762  Tuple<Fiber, float> markerTuple = new Tuple<Fiber, float>(null, 0f);
763  sleepingFibers.Enqueue(markerTuple);
764 
765  Tuple<Fiber, float> itemTuple;
766  while(sleepingFibers.TryDequeue(out itemTuple))
767  {
768  if(itemTuple == markerTuple)
769  break;
770 
771  if(itemTuple != null && itemTuple.Item1 == fiber)
772  found = true;
773  else
774  sleepingFibers.Enqueue(itemTuple);
775  }
776  }
777  }
778 
779  #region IDisposable implementation
780 
784  private bool isDisposed = false;
785 
796  protected override void Dispose(bool disposing)
797  {
798  // Do nothing if already called
799  if(isDisposed)
800  return;
801 
802  if(disposing)
803  {
804  // Free other state (managed objects).
805  disposeWaitHandle.Set();
806  runWaitHandle.WaitOne();
807  }
808 
809  // Free your own state (unmanaged objects).
810  // Set large fields to null.
811 
812  // Mark disposed
813  isDisposed = true;
814 
815  base.Dispose(disposing);
816  }
817  #endregion
818  }
819 }
820 
Schedules fibers for execution.
Fiber StartNew(IEnumerator coroutine)
Start executing a new fiber using the default scheduler on the thread.
sealed override void QueueFiber(Fiber fiber)
Queues the fiber for execution on the scheduler.
This class is the system default implementation of a FiberScheduler and is capable of scheduling and ...
Yield execution until the watched fiber on the same scheduler is complete.
void Run(Fiber fiber)
Run the blocking scheduler loop and perform the specified number of updates per second.
static FiberFactory Factory
Gets the default factory for creating fibers.
Definition: Fiber.cs:120
A Fiber is a lightweight means of scheduling work that enables multiple units of processing to execut...
Represents a fiber instruction to be processed by a FiberScheduler.
sealed override void AbortRequested(Fiber fiber)
Invoked when an abort has been requested.
static SystemFiberScheduler StartNew(Fiber fiber, CancellationToken token, float updatesPerSecond=0f)
Starts a new thread, creates a scheduler, starts it running, and returns it to the calling thread...
Fiber Fiber
Gets the fiber to yield to.
override void Dispose(bool disposing)
Dispose the scheduler.
void Start()
Start executing the fiber using the default scheduler on the thread.
Definition: Fiber.cs:443
Yield execution to a specific fiber belonging to the same scheduler as the current fiber...
Definition: YieldToFiber.cs:35
static FiberScheduler Current
Gets the default fiber scheduler for the thread.
static SystemFiberScheduler StartNew(Fiber fiber)
Starts a new thread, creates a scheduler, starts it running, and returns it to the calling thread...
A FiberInstruction to pause execution of a fiber for the specified duration.
override void Run(Fiber fiber, CancellationToken token, float updatesPerSecond)
Run the blocking scheduler loop and perform the specified number of updates per second.
FiberStatus Status
Gets or sets the state of the fiber.
Definition: Fiber.cs:238
void Update(float time)
Update the scheduler which causes all queued tasks to run for a cycle.
Fiber Fiber
Gets the fiber to yield to.
Definition: YieldToFiber.cs:43
SystemFiberScheduler()
Initializes a new instance of the SpicyPixel.Threading.FiberScheduler class.
Fiber ContinueWith(IEnumerator continuationCoroutine)
Creates a continuation that executes asynchronously when the target fiber completes.
FiberStatus
Represents the current state of a fiber.
Definition: FiberStatus.cs:34
bool IsCompleted
Gets a value indicating whether this instance is completed.
Definition: Fiber.cs:204
bool GetNextFiberWakeTime(out float fiberWakeTime)
Gets the time of the first fiber wake up.
static SystemFiberScheduler StartNew(CancellationToken token, float updatesPerSecond=0f)
Starts a new thread, creates a scheduler, starts it running, and returns it to the calling thread...
static SystemFiberScheduler StartNew()
Starts a new thread, creates a scheduler, starts it running, and returns it to the calling thread...