Previous posts in this series:

We’ve gone over some simple examples and looked a bit into the implementation of primitives like Future. Now, let’s take a moment to think about how we can bring these examples closer to the real world.

A comment on one of the previous posts noted that, were we to go with the simplest possible implementation of our asset loading code, we’d be doing file I/O against multiple files at the same time, and that in practice, this is much slower than simply loading from those files sequentially. This turns out to be a common problem when we’re building asynchronous systems; we’re often trying to solve problems that require use of a limited resource – whether it be the hard disk, the GPU, or network bandwidth. If we don’t carefully manage our use of those shared resources, at best, our application will run much slower than it would otherwise – and if we’re unlucky, it will work incorrectly or fail to work at all.

There are a few ways we can tackle these shared resources. One of the simplest solutions is to serialize access to a resource, using a lock or (in cases where we don’t mind a little sharing) a semaphore. This can be done easily in most languages, and as long as we’re using worker threads to access the resource, things will work okay. It’s far from optimal, though – here are a few example gotchas:

  • Using a lock/semaphore means that any work items waiting to complete will tie up a worker thread, wasting valuable resources.
  • Locks and semaphores provide no way to prioritize or order work, so work with significant latency requirements – like streaming in the next 2 seconds of background music – cannot pre-empt work with less significant latency requirements, like streaming in a higher res version of an on-screen texture.
  • Relying on thread synchronization also makes it hard to interact with our pending work items – let’s say we started loading the face texture for a character, but he just disconnected from the server. We want to cancel that work item, since it’s not useful anymore, but the work item’s thread is currently blocked, waiting to acquire the resource loader lock.

To solve the problems caused by locks, we can complicate things a little and introduce a work queue – any time we start a work item, we push it onto our queue and return a Future that we will manually complete once the work is done. This addresses the issues with the previous solution – we can pull items out of the work queue if they’re cancelled, and maintain multiple queues or a sorted queue if we want to prioritize more important work over less important work. Let’s begin by building a simple texture loader that ensures textures are loaded one at a time.

In order to make this easier, we’ll introduce a simple utility class called a ‘blocking queue’. The nature of this queue is that dequeue operations always produce a Future instead of directly producing a value, and if you dequeue from the container while it is empty, you get a future that will become complete once a value is stored into the queue. This ends up being a good fit for pipelining work.

public class BlockingQueue<T> {
 
      private readonly object _Lock = new object();
 
      private readonly Queue<Future<T>> _WaitingFutures = new Queue<Future<T>>();
 
      private readonly Queue<T> _Queue = new Queue<T>();
 
   
 
      public int Count {
 
          get {
 
              lock (_Lock) {
 
                  return _Queue.Count - _WaitingFutures.Count;
 
              }
 
          }
 
      }
 
   
 
      public int DequeueMultiple (IList<T> output, int maximum) {
 
          for (int i = 0; i < maximum; i++) {
 
              lock (_Lock) {
 
                  if (_Queue.Count == 0)
 
                      return i;
 
   
 
                  output.Add(_Queue.Dequeue());
 
              }
 
          }
 
   
 
          return maximum;
 
      }
 
   
 
      public T[] DequeueAll () {
 
          lock (_Lock) {
 
              T[] result = new T[_Queue.Count];
 
              _Queue.CopyTo(result, 0);
 
              _Queue.Clear();
 
              return result;
 
          }
 
      }
 
   
 
      public Future<T> Dequeue () {
 
          var f = new Future<T>();
 
          lock (_Lock) {
 
              if (_Queue.Count > 0)
 
                  f.Complete(_Queue.Dequeue());
 
              else
 
                  _WaitingFutures.Enqueue(f);
 
          }
 
          return f;
 
      }
 
   
 
      public void Enqueue (T value) {
 
          Future<T> wf;
 
   
 
          while (true)
 
          lock (_Lock) {
 
              if (_WaitingFutures.Count > 0) {
 
                  wf = _WaitingFutures.Dequeue();
 
   
 
                  try {
 
                      wf.Complete(value);
 
                      if (wf.Completed)
 
                          return;
 
                  } catch (FutureDisposedException) {
 
                  }
 
              } else {
 
                  _Queue.Enqueue(value);
 
                  return;
 
              }
 
          }
 
      }
 
  }

As you can see, the blocking queue is pretty simple. It’s built out of a pair of queues – one for values awaiting a caller to Dequeue them, and another for futures awaiting a caller to Enqueue a value to fill them. Dequeueing a value pulls first from the value queue, and if it is empty, it creates a Future and pushes that onto the waiter queue instead. Enqueueing a value pulls first from the waiter queue, and if that’s empty, it pushes the value into the value queue. With this blocking queue primitive, we can build a simple pipelined texture loader:

public class PendingTextureLoad {
 
      public readonly Future<Texture> Future = new Future<Texture>();
 
      public string Filename;
 
  }
 
   
 
  protected readonly BlockingQueue<PendingTextureLoad> TextureLoadQueue = new BlockingQueue<PendingTextureLoad>();
 
   
 
  protected IEnumerator<object> TextureLoaderTask () {
 
      PendingTextureLoad item;
 
   
 
      while (true) {
 
          {
 
              var f = TextureLoadQueue.Dequeue();
 
              yield return f;            
 
              item = f.Result;
 
          }
 
   
 
          ThreadPool.QueueUserWorkItem(LoadSingleTexture, item);
 
   
 
          // Wait for the callback to finish loading the texture before we proceed to the next work
 
          //  item from the queue.
 
          yield return item.Future;
 
      }
 
  }
 
   
 
  // A callback to be run from the threadpool that loads our texture from disk.
 
  // Implicitly we're assuming here that we don't need any synchronization, because we will only
 
  //  ever be running one of these at a time. In practice, we probably will need some kind of 
 
  //  synchronization around the Texture constructor, depending on the API we're using.
 
  protected static void LoadSingleTexture (object _) {
 
      var ptl = (PendingTextureLoad)_;
 
      try {
 
          var texture = new Texture(ptl.Filename);
 
          ptl.Future.SetResult(texture, null);
 
      } catch (Exception e) {
 
          ptl.Future.SetResult(null, e);
 
      }
 
  }
 
   
 
  public Future<Texture> LoadTexture (string filename) {
 
      var ptl = new PendingTextureLoad { 
 
          Filename = filename
 
      };
 
   
 
      TextureLoadQueue.Enqueue(ptl);
 
      return ptl.Future;
 
  }

TextureLoaderTask is left running in the background, responsible for pulling work items off the texture load queue one at a time and completing them. In this case, we assume it’s not possible to load a texture asynchronously (it rarely is), so we do the actual load operation on the thread pool. Consumers of this system call LoadTexture with a filename, and get back a Future that will contain the texture once the load is complete.

We now know we won’t be paying the cost of simultaneous I/Os, because we’ll be loading one texture at a time, and we won’t be creating a bunch of idle threads waiting to acquire a lock. However, there’s still a lot of room for improvement in the real world. Even though we’re only performing one I/O operation at a time, platter based storage devices like hard disks have two kinds of I/O: random access and sequential. Random access I/O is prohibitively expensive, because each I/O operation requires the platter to be rotated into the correct position to perform the read or write. Sequential I/O is significantly faster, because the drive doesn’t have to rotate between each read – it will already be in the right position after completing the previous I/O operation. Right now, we’re still going to pay the cost of random access on every texture load operation.

If we’re willing to incur some latency on individual texture loads, having all loads go through the queue can allow us to minimize the cost we pay for random access I/O. Instead of pulling work items off the queue one at a time, we can pull them off in groups, and add a short delay in order to batch up texture load requests that occur within a short time period, and then sort the requests based on their location on disk, like this:

protected IEnumerator<object> TextureLoaderTask () {
 
      var sleep = new Sleep(0.5); // Wait half a second for work items to build up.
 
      var batch = new List<PendingTextureLoad>();
 
   
 
      while (true) {
 
          // Pull a single work item from the queue. Our task will be put to sleep if 
 
          //  the queue is empty.
 
          {
 
              var f = TextureLoadQueue.Dequeue();
 
              yield return f;
 
              batch.Add(f.Result);
 
          }
 
   
 
          // We were woken back up by a work item. Put our task back to sleep for a
 
          //  fixed period of time to allow more work items to build up.
 
          yield return sleep;
 
   
 
          // Now that we've slept for a while, more items may be in the queue. Attempt 
 
          //  to pull up to 31 more items into the batch.
 
          TextureLoadQueue.DequeueMultiple(batch, 31);
 
   
 
          // Since the contents of the batch can be processed in any order, we need to
 
          //  wait until the callback has finished running as a whole, so we use a 
 
          //  helper method to do that instead of waiting for the individual futures
 
          //  in the batch.
 
          yield return Future.RunInThread(
 
              () => LoadTextureBatch(batch)
 
          );
 
   
 
          // Make sure to empty out the batch before filling it with items in the next
 
          //  iteration.
 
          batch.Clear();
 
      }
 
  }
 
   
 
  // We've changed the callback to operate on a batch of work items instead of a single
 
  //  work item.
 
  protected static void LoadTextureBatch (List<PendingTextureLoad> batch) {
 
      // Sort the work items by their position on disk.
 
      batch.Sort((lhs, rhs) => {
 
          GetDiskLocation(lhs.Filename).CompareTo(
 
              GetDiskLocation(rhs.Filename)
 
          );
 
      });
 
   
 
      // Now walk through the batch in sorted order and load the textures.
 
      foreach (var ptl in batch) {
 
          try {
 
              var texture = new Texture(ptl.Filename);
 
              ptl.Future.SetResult(texture, null);
 
          } catch (Exception e) {
 
              ptl.Future.SetResult(null, e);
 
          }
 
      }
 
  }

Of course, this solution is unlikely to accomplish much when using the regular file system, because there is no way to guarantee that a set of files are sequential on disk, or even to prevent them from being fragmented. However, if we to store textures in an archive file format like ZIP, we can use the information stored within the ZIP file to determine the correct order in which to load our textures, and as long as the archive file is not significantly fragmented, we will pay little or no cost for disk seeks because even if our textures are not directly sequential, they will be close to each other and the disk will not have to move very far.

Batching up work like this allows us to benefit from other improvements as well – for example, if we have multiple CPU cores to utilize and texture decompression is particularly expensive, we can do all the I/O for a batch sequentially and then use multiple threads to decompress the textures in parallel, without having to invest a lot of work into threading machinery.

For one real-world example, I use this kind of batching in my Data Mangler key-value store in order to perform read operations in parallel across available CPU cores. Because individual work items still occur sequentially, and each work item is either a read or a write (types aren’t mixed), I don’t have to invest effort in complex locking schemes used by more sophisticated database systems. As a result, consumers of the library can queue up dozens of database operations at once, and then wait for the futures while the work is done in parallel across all the system’s available cores, without having to do any locking or synchronization.

At this point I’m out of good ideas for ways to go into depth on this topic, so I’d welcome any questions or suggestions from those of you who’ve been reading. Got a problem you’d like to tackle with these techniques? Curious about potential drawbacks? Let me know, and I can go into more depth in future posts.