Previous posts in this series:

In my previous posts I showed how we can apply the Future primitive, along with C# iterator functions, to tackle some real-world problems. Now, I’ll dive in a bit deeper and show how, given a relatively simple implementation of Future, you can build more complex systems around it. For the examples in this article, I’ll be using a stripped down version of the interface provided by my particular implementation of Future, which looks like this:

Future.cs

public interface IFuture {
 
      Exception Error {
 
          get;
 
      }
 
      bool Completed {
 
          get;
 
      }
 
      bool Failed {
 
          get;
 
      }
 
   
 
      void SetResult (object result, Exception error);
 
      void RegisterOnComplete (Action<IFuture> handler);
 
  }
 
   
 
  public class Future<T> : IFuture {
 
      private object _Lock = new object();
 
      private bool _Completed = false;
 
      private Action<IFuture> _OnComplete = null;
 
      private Exception _Error = null;
 
      private T _Result = default(T);
 
   
 
      public Future () {
 
      }
 
   
 
      public Future (T value) {
 
          SetResult(value, null);
 
      }
 
   
 
      public void RegisterOnComplete (Action<IFuture> handler) {
 
          Action<IFuture> newOnComplete;
 
   
 
          bool completed;
 
   
 
          // Acquire the lock, and store our completion state at the time we entered the lock.
 
          lock (_Lock) {
 
              completed = _Completed;
 
   
 
              // If we're not complete when we enter the lock, we want to add this handler to the list of handlers.
 
              if (!completed) {
 
                  var oldOnComplete = _OnComplete;
 
   
 
                  // Construct a new OnComplete delegate that will invoke the old list of handlers, then invoke our new handler.
 
                  if (oldOnComplete != null) {
 
                      newOnComplete = (f) => {
 
                          oldOnComplete(f);
 
                          handler(f);
 
                      };
 
                  } else {
 
                      newOnComplete = handler;
 
                  }
 
   
 
                  _OnComplete = newOnComplete;
 
              }
 
          }
 
   
 
          // We were complete when we entered the lock, so the list of handlers is already empty. Just call the handler
 
          //  that we were given immediately.
 
          if (completed)
 
              handler(this);
 
      }
 
   
 
      public bool Completed {
 
          get {
 
              lock (_Lock)
 
                  return _Completed;
 
          }
 
      }
 
   
 
      public bool Failed {
 
          get {
 
              lock (_Lock)
 
                  return _Completed && (_Error != null)
 
          }
 
      }
 
   
 
      public Exception Error {
 
          get {
 
              lock (_Lock)
 
                  if (_Completed)
 
                      return _Error;
 
                  else
 
                      throw new FutureHasNoResultException(this);
 
          }
 
      }
 
   
 
      public T Result {
 
          get {
 
              lock (_Lock)
 
                  if (_Completed) {
 
                      if (_Error != null)
 
                          throw new FutureException("The future's result was an error.", _Error);
 
                      else
 
                          return _Result;
 
                  } else
 
                      throw new FutureHasNoResultException(this);
 
          }
 
      }
 
   
 
      void IFuture.SetResult (object result, Exception error) {
 
          if ((error != null) && (result != null))
 
              throw new FutureException("Cannot complete a future with both a result and an error.", error);
 
   
 
          if (result == null)
 
              SetResult(default(T), error);
 
          else
 
              SetResult((T)result, error);
 
      }
 
   
 
      public void SetResult (T result, Exception error) {
 
          Action<IFuture> onComplete;
 
   
 
          lock (_Lock) {
 
              // A future should only be completed once. It's important for us to throw an exception in the event that it is completed twice,
 
              //  since this usually indicates a bug in other code.
 
              if (_Completed)
 
                  throw new FutureAlreadyHasResultException(this);
 
   
 
              // Store away the list of handlers, and store our new result/error pair.
 
              onComplete = _OnComplete;
 
              _OnComplete = null;
 
              _Result = result;
 
              _Error = error;
 
              _Completed = true;
 
          }
 
   
 
          // If we had any handlers attached before we were completed, invoke them now.
 
          if (onComplete != null)
 
              onComplete(this);
 
      }
 
   
 
      public bool GetResult (out T result, out Exception error) {
 
          lock (_Lock) {
 
              result = _Result;
 
              error = _Error;
 
              return _Completed;
 
          }
 
      }
 
  }
 
   
 
  public class FutureException : Exception {
 
      public FutureException (string message, Exception innerException)
 
          : base(message, innerException) {
 
      }
 
  }
 
   
 
  public class FutureAlreadyHasResultException : InvalidOperationException {
 
      public readonly IFuture Future;
 
   
 
      public FutureAlreadyHasResultException (IFuture future)
 
          : base("Future already has a result") {
 
          Future = future;
 
      }
 
  }
 
   
 
  public class FutureHasNoResultException : InvalidOperationException {
 
      public readonly IFuture Future;
 
   
 
      public FutureHasNoResultException (IFuture future)
 
          : base("Future does not yet have a result") {
 
          Future = future;
 
      }
 
  }
 
   
 
  // Implement a few simple extension methods to make futures easier to use
 
  public static partial class Future {
 
      public static void Complete (this IFuture future) {
 
          future.SetResult(null, null);
 
      }
 
   
 
      public static void Complete<T> (this Future future, T result) {
 
          future.SetResult(result, null);
 
      }
 
   
 
      public static void Fail (this IFuture future, Exception error) {
 
          future.SetResult(null, error);
 
      }
 
  }

A few things are worth mentioning here: We go to great lengths to ensure that information is not lost; attempting to assign a result to a Future twice throws an exception, because the alternative could result in information being silently discarded.
Likewise, we ensure that when registering an OnComplete callback, the callback is invoked even if the Future already has been completed. When dealing with traditional event handlers or synchronization mechanisms it’s possible to ‘miss’ an event by registering your callback too late, and end up with an application that’s hung silently for no reason. By invoking any callback that’s registered after our Future has become completed, we prevent that problem.
And in cases where a naive application directly accesses a Future‘s Result property without checking to see if the operation in question has completed (or failed), we throw an exception if no result is actually available. If we were writing this class from scratch, we might have considered instead returning a default value – default(T), or null – but doing so means that it’s possible for a failed operation to be missed if the end user forgets to write error handling code.

One of the simplest ways to perform some work asynchronously is to just run it on a thread, so it’s useful to have a straightforward way of doing this. We care about being able to wait for our work to complete, and we want to be able to get the result of the work, so we want to get a Future representing the work. Let’s implement a simple version of this:

RunInThreadPool.cs

using System;
 
  using System.Threading;
 
   
 
  public static partial class Future {
 
      public static Future<T> RunInThreadPool<T> (Func<T> workItem) {
 
          var future = new Future<T>();
 
   
 
          ThreadPool.QueueUserWorkItem((_) => {
 
              try {
 
                  var result = workItem();
 
                  future.Complete(result);
 
              } catch (Exception ex) {
 
                  future.Fail(ex);
 
              }
 
          });
 
   
 
          return future;
 
      }
 
  }

Our solution is pretty simple: we take a function, and construct a future of the appropriate type to hold the result of the function. We shove a callback onto the .NET thread pool that will invoke our function, and then store the result into the future (or, if it fails, store the exception). The design decisions made when designing Future mean that as a result, this function will always behave the way we want to do, and avoid discarding any information. (Keen-eyed readers may spot an edge case where information can in fact be lost. This can be fixed, but I omitted the fix from the example since it’s not particularly relevant.)

Now that we have a way to take some simple work and make it asynchronous, what about waiting for work? We can easily wait for a single piece of work by registering a callback, but waiting for multiple pieces of work gets complicated – we could write an iterator function and yield upon each value in sequence, but that gets kind of awkward. We could also use a thread or threadpool work item to do this – in fact, the .NET ThreadPool includes a helper method specifically designed for this purpose – but it’s pretty silly to tie up a thread just to wait for work. As it turns out, we can implement a simple waiting primitive – let’s call it WaitForX – that allows us to construct a so-called ‘Composite Future‘ from a set of Futures that will become complete once X of the set’s Futures have become complete. Using this method, we can trivially implement WaitForAll and WaitForFirst helper methods:

WaitForX.cs

public static partial class Future {
 
      public static Future<IFuture> WaitForFirst (params IFuture[] futures) {
 
          return WaitForX(futures, futures.Length);
 
      }
 
   
 
      public static IFuture WaitForAll (params IFuture[] futures) {
 
          return WaitForX(futures, 1);
 
      }
 
   
 
      private class WaitHandler {
 
          public readonly Future<IFuture> Composite = new Future<IFuture>();
 
          public readonly List<IFuture> State = new List<IFuture>();
 
          public readonly int NumberToWaitFor;
 
   
 
          public WaitHandler (IFuture[] futures, int numberToWaitFor) {
 
              State.AddRange(futures);
 
              NumberToWaitFor = numberToWaitFor;
 
          }
 
   
 
          public void OnComplete (IFuture f) {
 
              bool completed = false;
 
              lock (State) {
 
                  if (State.Count == NumberToWaitFor) {
 
                      completed = true;
 
                      State.Clear();
 
                  } else {
 
                      State.Remove(f);
 
                  }
 
              }
 
   
 
              if (completed)
 
                  Composite.Complete(f);
 
          }
 
      }
 
   
 
      private static Future<IFuture> WaitForX (IFuture[] futures, int numberToWaitFor) {
 
          if ((futures == null) || (futures.Length == 0))
 
              throw new ArgumentException("Must specify at least one future to wait on", "futures");
 
   
 
          var h = new WaitHandler(futures, numberToWaitFor);
 
   
 
          Action<IFuture> oc = h.OnComplete;
 
          foreach (IFuture _ in futures)
 
              _.RegisterOnComplete(oc);
 
   
 
          return h.Composite;
 
      }
 
  }

Basically, we fill a list with the futures we want to wait for, and then register a callback on all of those futures so that we can find out if any of them become completed. The callback checks the current count of the list, and then either removes the future that was just completed from the list, or clears the list. The first future to be completed once the list reaches the specified count value causes our Composite Future to become complete as well. This allows us to wait for an entire set of Futures, or simply wait for any one of the Futures in a set to become complete. Even better, if we’re using this mechanism to wait for the first Future in a set to complete, a reference to that Future is stored into the composite, so we can pull it out and respond to whatever work that Future represented.

Now that I’ve given you a bit of a glimpse into how these building blocks are written, in my next post, I’ll describe how we can use them to tackle the problem of scaling an application up to take advantage of multiple cores, and pipeline work so that we get good performance when doing IO.