Skip to main content

A Deep Dive into Priority Queue, Concurrent Queue, and Channel in C#

5 min read

Sample Code

Due to work, I’ve intermittently used some concurrency programming tools in the past, but I never really explored what concurrency programming truly entails.

This article aims to fill that gap and help me supplement my foundational knowledge of concurrency programming.

I’ve always believed that the most effective way to learn is by doing, so I created a simple demo program. If you’re interested, you can find this demo using the link above.

Introduction

Handling massive amounts of data is always one of the core challenges in enterprise projects, and concurrency programming is often a key optimization strategy to address this.

When a project scales and gains a large user base, even the most powerful servers will eventually encounter bottlenecks.

Imagine developing a global project where users worldwide are sending tens of thousands of requests simultaneously. Can your server handle all of them efficiently (assuming unlimited server capacity is not an option)?

Concurrency programming is a technique designed to handle such situations involving multiple simultaneous requests. Instead of overloading the server by processing everything at once, it’s often more efficient to queue incoming tasks and process them one by one.

Today, I’ll introduce three concurrency tools provided by C#: priority queues, concurrent queues, and channels.

There are other concurrency tools, such as distributed queues, message queues, and job queues. If you’re interested, feel free to explore those on your own, as their principles are fundamentally similar.

This article doesn’t dive deeply into technical details, so the code examples will be light on explanations. Instead, the focus will be on the characteristics and basic principles of each queue.

Priority Queue

Priority Queue From the name alone, you might already have a rough idea of what a priority queue is.

It follows the same First-In-First-Out (FIFO) principle as most queues, but the difference is that you can assign priority levels to tasks in the queue.

However, in C#, lower priority values indicate higher priority, while higher values indicate lower priority. Keep this in mind, as it might confuse some people.

In my demo project, I first created a background service. This background service generates a task for the priority queue every few seconds.

I also created an API to manually add high-priority tasks to the priority queue.

Using this approach, we can clearly observe how a priority queue behaves.

Below is a snippet of the output. You’ll notice that when high-priority and low-priority tasks are added to the queue simultaneously, the high-priority tasks are processed first.

20241223135141119

Concurrent Queue

A concurrent queue is a common queueing method designed to improve efficiency through concurrent processing.

For example, banks often set up two or more ATMs to serve customers. This is done to handle requests faster and more efficiently by increasing the number of machines.

However, simply adding more ATMs isn’t practical. Banks have limited space, and too many machines can overcrowd the area, reducing overall efficiency.

Similarly, concurrent queues require a limit to prevent the negative effects of unlimited high concurrency.

Managing the number of concurrent accesses can be done through simple logic or by using a lock. In this example, I’ll use a semaphore to control access to shared resources.

Semaphore

Using a semaphore is straightforward: you specify an initial value that represents the number of concurrent accesses allowed.

For example, if the semaphore’s initial value is 2, it will decrement by 1 every time a thread occupies a resource. When the value reaches 0, resources are blocked, and other threads must wait until currently occupied resources are released.

Once a thread finishes its task and releases the resource, the semaphore’s value increments by 1. At that point, a waiting thread can proceed, decrementing the semaphore’s value again.

The concept is simple. In code, you just need to declare a SemaphoreSlim with an initial value:

CSHARP
private readonly SemaphoreSlim _semaphore;

public ConcurrentTaskService(_semaphore){
_semaphore = new SemaphoreSlim(2);
}

How can we demonstrate a concurrent queue?

First, I used circular logic to push some storage task to the concurrent queue, paired with a background service to process the stored tasks.

Then, I implemented an API to retrieve the stored tasks. To make it easier to understand, one of the columns includes the task completion time, allowing us to see the effect of the semaphore more clearly.

Here’s a screenshot of the results. From this, you can observe that since my semaphore's initial value is set to 2, the completion times for every two tasks are almost identical.

20241223194742977

Channel

Channels are similar to concurrent queues in principle, but they come with more built-in features and functions, allowing developers to configure settings based on their specific needs.

The main difference between channels and concurrent queues is that channels support asynchronous transmission, making them ideal for scenarios involving pipes or data streams.

Another key difference is that channels allow you to control the queue’s capacity. While both priority queues and concurrent queues can also achieve this, it requires manual programming logic.

You can explore the finer details on your own, but here’s an overview.

When using channels, there are two main considerations: queue capacity and concurrency model.

  1. Do you need to limit the queue’s capacity? If so, use a BoundedChannel; if not, use an UnBoundedChannel.
  2. Will you use a concurrency model? Since channels support asynchronous transmission with await, FIFO can be easily implemented. For concurrency, you can combine it with a Semaphore.

Channels are typically used for more complex tasks, so whether to use FIFO or concurrency depends on the development requirements.

Since we've already demonstrated concurrency using Semaphore with a Concurrent Queue, this example will use FIFO.

Here, I’m simulating a background service that sends emails.

Sending emails can be time-consuming, not only because of the email service but also because complex emails may involve database queries.

In my demo, I’m using an unlimited-capacity channel, which is registered as a singleton to ensure its unique nature.

CSHARP
builder.Services.AddSingleton(_ => Channel.CreateUnbounded<TriggerEmail>()); // Register Channel

Next, I’ll use the circular method to assign tasks to the channel, then use the background service to process these tasks over time.

The code is simple, so I’ll provide it here directly:

CSHARP
public class ChannelTaskService : BackgroundService
{
private readonly SemaphoreSlim _semaphore;
private readonly Channel<TriggerEmail> _channel;
private readonly ILogger<ChannelTaskService> _logger;

public ChannelTaskService(Channel<TriggerEmail> channel, ILogger<ChannelTaskService> logger)
{
_channel = channel;
_logger = logger;
_semaphore = new SemaphoreSlim(2); //optional: semaphore to limit the number of concurrent tasks
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Use the Reader property of the injected channel
var channelReader = _channel.Reader;

while (await channelReader.WaitToReadAsync(stoppingToken))
{
//process one email at a time sequentially
//await keywords make it wait for each operation to finish before moving to the next iteration
var email = await channelReader.ReadAsync(stoppingToken);

await Task.Delay(5000); // Simulate some work

_logger.LogInformation($"Email {email.Subject} Sent!");
}
}
}

Here's the final result: 20241223202347787

Comparison

Here is a table comparing PriorityQueue, Channel, and ConcurrentQueue based on their key features, use cases, and characteristics:

FeaturePriorityQueueChannelConcurrentQueue
TypeQueue with priority orderQueue with async support (bounded/unbounded)Simple, thread-safe FIFO queue
Thread-SafeYesYesYes
Blocking OperationsNo (based on priority only)Yes (can block/await when full/empty)No (non-blocking; uses TryDequeue)
Asynchronous SupportNoYes (async methods like WriteAsync)No (synchronous but can integrate with async)
BackpressureNoYes (bounded channels provide backpressure)No
Capacity LimitNo (unbounded)Yes (configurable, bounded or unbounded)No (unbounded by default)
Priority OrderingYes (process items by priority)NoNo
Use CaseTask scheduling with prioritiesAsync producer-consumer scenarios, pipelinesSimple producer-consumer scenarios
PerformanceMay be slower for FIFO due to sortingOverhead for async workflows, but scalableHigh performance for simple FIFO cases
Multiple Consumers/ProducersYesYesYes
Use in async/await PatternsNot designed for async operationsIdeal for async/await and data pipelinesNot ideal for async workflows
Example Use CasesTask scheduling, urgency-based processingReal-time data processing, event pipelinesBackground tasks, logging, event dispatching
Loading Comments...