Wrapping ConcurrentQueue<T> to make it eventful

The process that I discovered that's pulling items out of the ConcurrentQueue in a tight, CPU intensive loop

One of the projects I inherited has been merrily sat on its server bubbling away and doing what it needs to do since before I joined the company until a few days ago when another service on the same machine needed some adjustment. At that point I noticed that the project in question was constantly sat at 25% CPU utilisation. Anything that's constantly sat at 25% CPU must be running in a tight loop, I thought, so off I went to have a look at the code. 

This particular project is responsible for reading files as and when they appear, doing a little bit of processing and data transformation and then writing the output to a database. The actual nitty gritty of what happens is that files come in, a FileSystemWatcher triggers, reads the file and pumps the details into a ConcurrentQueue<T> (the producer) and then another thread sits and reads items from the queue (the consumer). The problem lies in the fact that the code that reads from the queue sits in a tight loop attempting to de-queue items and process them. In this specific instance there's no business requirement that items be processed in FIFO order so a queue isn't strictly necessary, nor would it matter if items got processed out of order.

Aside: Without any other reason I'd have left this alone on the basis that it's working and I've got plenty else to do, but an aspiration to rationalise the number of servers in use comes into play here and one process constantly using 25% of the available CPU to most of the time do nothing isn't something that'll help with that outcome.

In order to reduce the amount the CPU was being pummelled there are a couple of quick fixes that're available, the easiest and quickest being to slow the loop down by adding:

Thread.Sleep(1000) // Or your choice of sleep duration

To the loop that attempts to de-queue items.

The other was to have the code that de-queues items be triggered by something happening, which of course is the entire point of this post. Taking a look at the documentation for ConcurrentQueue<T> there weren't any events to be seen, so time to add some of my own.

Making ConcurrentQueue<T> eventful

As you probably guessed from the title of the post, I opted to go down the route of making ConcurrentQueue<T> raise an event when an item is queued. This gave me two choices, I either inherit from ConcurrentQueue<T>, or I encapsulate it. In this specific instance I opted to encapsulate as the calling code only makes use of three points of access, the methods Enqueue and TryDequeue and the Count property. This reduces the amount of code I have to write, but more importantly reduces the amount of entry-points to ConcurrentQueue<T> that I have to implement, reason about and get right. The less code you write, the less code there is that can potentially introduce a bug....

There's two events that I'm interested in, an item being added to the queue and an item being removed from the queue:

public event EventHandler ItemEnqueued;
public event EventHandler<ItemDequeuedEventArgs<T>> ItemDequeued;

The ItemDequeuedEventArgs looks like this:

using System;

namespace RW.EventfulConcurrentQueue
{
    public sealed class ItemDequeuedEventArgs<T> : EventArgs
    {
        public T Item { get; set; }
    }
}

It's notable that I've opted for a specialised EventArgs for instances where an item is removed from the queue, but not for when an item is added. The rationale here is that providing the code that's listening for items to be queued with the item itself feels like a bad idea as it could lead to bad-practice sneaking in further down the line. The code that's subscribed to the ItemEnqueued event should care about knowing that an item has been added to the queue but not specifically what. If it does need to examine the item then consideration should be given to adding a pass-through to the Peek method on ConcurrentQueue<T>.

The ItemDequeued event has been treated a little differently though, I've added this in anticipation of tweaking the code responsible for en-queuing items. By being told when an item has been processed, it or another watch-dog thread can keep an eye out for things not processed.

There's not a lot of code required to achieve the wrapping of ConcurrentQueue<T>, so I'll show the whole class now:

using System;
using System.Collections.Concurrent;

namespace RW.EventfulConcurrentQueue
{
    public sealed class EventfulConcurrentQueue<T>
    {
        private ConcurrentQueue<T> _queue;

        public EventfulConcurrentQueue()
        {
            _queue = new ConcurrentQueue<T>();
        }

        public void Enqueue(T item)
        {
            _queue.Enqueue(item);
            OnItemEnqueued();
        }

        public bool TryDequeue(out T result)
        {
            var success = _queue.TryDequeue(out result);

            if (success)
            {
                OnItemDequeued(result);
            }
            return success;
        }

        public event EventHandler ItemEnqueued;
        public event EventHandler<ItemDequeuedEventArgs<T>> ItemDequeued;

        void OnItemEnqueued()
        {
            ItemEnqueued?.Invoke(this, EventArgs.Empty);
        }

        void OnItemDequeued(T item)
        {
            ItemDequeued?.Invoke(this, new ItemDequeuedEventArgs<T> { Item = item });
        }
    }
}

The main thing here (that caught me out!) is ensuring that the event for an item being enqueued is raised after the item is added to the queue, otherwise you could (as I did) end up with a situation where the code that's listening for the ItemEnqueued event receives the event and attempts to de-queue an item before it's even been added to the queue.

About Rob

I've been interested in computing since the day my Dad purchased his first business PC (an Amstrad PC 1640 for anyone interested) which introduced me to MS-DOS batch programming and BASIC.

My skillset has matured somewhat since then, which you'll probably see from the posts here. You can read a bit more about me on the about page of the site, or check out some of the other posts on my areas of interest.

No Comments

Add a Comment