Couple issues here. First, don't use spin locks that way - big waste of
cpu. Moreover, the sync wrapper takes a lock for all methods and
properties. So the 2nd thread is taking an releasing the lock 4 times for a
single dequeue operation. As Brian said, use a blocking queue for this kind
of producer/consumer pattern. Here is my 1 lock blocking queue below.
private void button5_Click(object sender, EventArgs e)
{
BlockingQueue<int> bq = new BlockingQueue<int>();
new Thread(delegate()
{
int i;
while( bq.TryDequeue(200, out i) )
Console.WriteLine(i);
Console.WriteLine("Consumer thread completed.");
}).Start();
for (int i = 0; i < 100; i++)
{
bq.Enqueue(i);
}
}
/// <summary>
/// Represents a first-in, first-out collection of objects.
/// </summary>
/// <typeparam name="T">Type of element queue will contain.</typeparam>
public class BlockingQueue<T> : IEnumerable<T>, ICollection
{
private bool isOpened = true;
private readonly Queue<T> q;
private readonly object syncRoot = new object();
/// <summary>
/// Initializes a new instance of the BlockingQueue class.
/// </summary>
public BlockingQueue()
{
q = new Queue<T>();
}
/// <summary>
/// Initializes a new instance of the BlockingQueue class.
/// </summary>
/// <param name="capacity">The initial number of elements the queue
can contain.</param>
public BlockingQueue(int capacity)
{
q = new Queue<T>(capacity);
}
/// <summary>
/// Initializes a new instance of the BlockingQueue class.
/// </summary>
/// <param name="collection">A collection whose elements are copied
to the new queue.</param>
public BlockingQueue(IEnumerable<T> collection)
{
q = new Queue<T>(collection);
}
/// <summary>
/// Gets the number of elements in the queue.
/// </summary>
public int Count
{
get
{
lock ( syncRoot )
{
return q.Count;
}
}
}
/// <summary>
/// Remove all objects from the BlockingQueue<T>.
/// </summary>
public void Clear()
{
lock ( syncRoot )
{
q.Clear();
}
}
/// <summary>
/// Closes the queue.
/// </summary>
public void Close()
{
lock ( syncRoot )
{
if ( ! this.isOpened )
return; // Already closed.
isOpened = false;
q.Clear();
Monitor.PulseAll(syncRoot); // resume any waiting threads
so they see the queue is closed.
}
}
/// <summary>
/// Gets a value indicating if queue is opened.
/// </summary>
public bool Opened
{
get
{
lock ( syncRoot )
{
return this.isOpened;
}
}
}
/// <summary>
/// Determines whether an element is in the
System.Collections.Generic.Queue<T>.
/// </summary>
/// <param name="item">The object to locate in the
System.Collections.Generic.Queue<T>. The value can be null for reference
types.</param>
/// <returns>true if item is found in the
System.Collections.Generic.Queue<T>; otherwise, false.</returns>
public bool Contains(T item)
{
lock ( syncRoot )
{
return q.Contains(item);
}
}
/// <summary>
/// Copies the System.Collections.Generic.Queue<T> elements to an
existing one-dimensional System.Array, starting at the specified array
index.
/// </summary>
/// <param name="array">The one-dimensional System.Array that is the
destination of the elements
/// copied from System.Collections.Generic.Queue<T>. The
System.Array must have zero-based indexing.
/// </param>
/// <param name="arrayIndex">The zero-based index in array at which
copying begins.</param>
public void CopyTo(T[] array, int arrayIndex)
{
lock ( syncRoot )
{
q.CopyTo(array, arrayIndex);
}
}
public T[] ToArray()
{
lock ( syncRoot )
{
return q.ToArray();
}
}
public IEnumerator<T> GetEnumerator()
{
return new BlockingQueue<T>.Enumerator(this, -1);
}
public IEnumerator<T> GetEnumerator(int millisecondsTimeout)
{
return new BlockingQueue<T>.Enumerator(this,
millisecondsTimeout);
}
IEnumerator IEnumerable.GetEnumerator()
{
return new BlockingQueue<T>.Enumerator(this, -1);
}
/// <summary>
/// Sets the capacity to the actual number of elements in the
System.Collections.Generic.Queue<T>,
/// if that number is less than 90 percent of current capacity.
/// </summary>
public void TrimExcess()
{
lock ( syncRoot )
{
q.TrimExcess();
}
}
/// <summary>
/// Removes and returns the object at the beginning of the Queue.
/// </summary>
/// <returns>Object in queue.</returns>
public T Dequeue()
{
return Dequeue(Timeout.Infinite);
}
/// <summary>
/// Removes and returns the object at the beginning of the Queue.
/// </summary>
/// <param name="timeout">Time to wait before returning (in
milliseconds).</param>
/// <returns>Object in queue.</returns>
public T Dequeue(int millisecondsTimeout)
{
lock ( syncRoot )
{
while ( isOpened && (q.Count == 0) )
{
if ( ! Monitor.Wait(syncRoot, millisecondsTimeout) )
throw new TimeoutException("Operation timeout");
}
if ( ! isOpened )
throw new InvalidOperationException("Queue closed");
return q.Dequeue();
}
}
public bool TryDequeue(int millisecondsTimeout, out T value)
{
lock (syncRoot)
{
while (isOpened && (q.Count == 0))
{
if ( ! Monitor.Wait(syncRoot, millisecondsTimeout) )
{
value = default(T);
return false;
}
}
if (! isOpened)
throw new InvalidOperationException("Queue closed");
value = q.Dequeue();
return true;
}
}
/// <summary>
/// Returns the object at the beginning of the BlockingQueue<T>
/// without removing it.
/// </summary>
/// <returns>The object at the beginning of the
BlockingQueue<T>.</returns>
public T Peek()
{
return Peek(-1);
}
/// <summary>
/// Returns the object at the beginning of the BlockingQueue<T>
/// without removing it.
/// </summary>
/// <returns>The object at the beginning of the
BlockingQueue<T>.</returns>
/// <param name="millisecondsTimeout">Time to wait before returning
(in milliseconds).</param>
public T Peek(int millisecondsTimeout)
{
lock ( syncRoot )
{
while ( isOpened && (q.Count == 0) )
{
if ( ! Monitor.Wait(syncRoot, millisecondsTimeout) )
throw new TimeoutException("Operation timeout");
}
if ( ! isOpened )
throw new InvalidOperationException("Queue closed");
return q.Peek();
}
}
/// <summary>
/// Adds an object to the end of the Queue.
/// </summary>
/// <param name="obj">Object to put in queue.</param>
public void Enqueue(T item)
{
lock ( syncRoot )
{
if ( ! isOpened )
throw new InvalidOperationException("Queue closed");
q.Enqueue(item);
Monitor.Pulse(syncRoot); // Move 1 waiting thread to the
"ready" queue in this monitor object.
} // Exiting lock will free thread(s) in the "ready" queue for
this monitor object.
}
[Serializable, StructLayout(LayoutKind.Sequential)]
public struct Enumerator : IEnumerator<T>, IDisposable, IEnumerator
{
private BlockingQueue<T> q;
private IEnumerator<T> e;