Pages

Saturday, January 16, 2010

Yet another thread-safe blocking queue for .NET

On one of my projects I needed to have a concurrent queue with multiple producers and multiple consumers. Unfortunately, .NET 3.5 doesn't yet have a blocking queue, and Google didn't turn up anything that quite fit the bill, so I wrote my own.

This solution is a simple one based on monitor waiting. When a consumer thread is dequeuing an item, one is returned if available. Otherwise, the thread waits on the monitor. When an item is enqueued, the monitor is pulsed, which wakes up one of the consumer threads, which dequeues the item and returns it. Pretty simple.
public class BlockingQueue<T>
{
    private readonly Queue<T> _queue = new Queue<T>();

    public void Enqueue(T item)
    {
        lock (_queue)
        {
            _queue.Enqueue(item);
            Monitor.Pulse(_queue);
        }
    }

    public T Dequeue()
    {
        lock (_queue)
        {
            while (_queue.Count == 0)
                Monitor.Wait(_queue);
            return _queue.Dequeue();
        }
    }
}
However, one of the drawbacks of this kind of synchronization is that the consumer threads will never exit; they just sit in a loop waiting on the monitor. I needed a way to signal to the consumer threads that they should stop waiting and gracefully exit (I call this "stopping the queue").

This is an easy feature to add. I simply set a boolean field and then pulse all the threads waiting on the monitor (using Monitor.PulseAll). When the consumer threads wake up, they check the field and simply return a default value instead of dequeuing.
public class BlockingQueue<T>
{
    private readonly Queue<T> _queue = new Queue<T>();
    private bool _stopped;

    public bool Enqueue(T item)
    {
        if (_stopped)
            return false;
        lock (_queue)
        {
            if (_stopped)
                return false;
            _queue.Enqueue(item);
            Monitor.Pulse(_queue);
        }
        return true;
    }

    public T Dequeue()
    {
        if (_stopped)
            return default(T);
        lock (_queue)
        {
            if (_stopped)
                return default(T);
            while (_queue.Count == 0)
            {
                Monitor.Wait(_queue);
                if (_stopped)
                    return default(T);
            }
            return _queue.Dequeue();
        }
    }

    public void Stop()
    {
        if (_stopped)
            return;
        lock (_queue)
        {
            if (_stopped)
                return;
            _stopped = true;
            Monitor.PulseAll(_queue);
        }
    }
}
An alternative implementation can have a stopped queue throw an exception to waiting consumers rather than returning default(T).

Here is a sample app:
public class Program
{
    private static readonly BlockingQueue<string> _queue =
        new BlockingQueue<string>();

    public static void Main()
    {
        ThreadPool.QueueUserWorkItem(ConsumerThread);
        ThreadPool.QueueUserWorkItem(ProducerThread);
        Console.ReadLine();
        _queue.Stop();
        Console.ReadLine();
    }

    private static void ProducerThread(object arg)
    {
        for (int n = 1; n <= 10; n++)
            _queue.Enqueue(Guid.NewGuid().ToString());
        Console.WriteLine("Producer thread finished.");
    }

    private static void ConsumerThread(object arg)
    {
        string item;
        while ((item = _queue.Dequeue()) != null)
        {
            Thread.Sleep(1000); // simulate work
            Console.WriteLine(item);
        }
        Console.WriteLine("Consumer thread finished.");
    }
}

2 comments:

  1. Thanks a lot! This is the best of many articles on how to make a thread-safe queue. The others are bloated and seem unlikely to work well.

    ReplyDelete
  2. Awesome, thanks. I needed a thread-safe queue and have to use .NET 3.5.

    ReplyDelete