This tutorial explains how to build a multithreaded, priority-based job processing system using:
PriorityQueue<TElement,TPriority>
SemaphoreSlim
- Producer / Consumer pattern
- Retry mechanism
- Dead Letter Queue (DLQ)
- Multiple workers
1. What is PriorityQueue in C#?
Definition
PriorityQueue<TElement,TPriority> is a built-in .NET collection that:
Stores elements so that the item with the highest priority is dequeued first, not the one inserted first.
Official docs:
https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.priorityqueue-2?view=net-10.0
⚙️ How it works (in this system)
private readonly PriorityQueue<Job, int> _queue = new();
Jobs are added like this:
_queue.Enqueue(job, job.Priority);
Each entry has:
- Element →
Job
- Priority →
int
Priority rules in .NET
Lower number = higher priority
| Priority | Meaning | Execution Order |
| 1 | Highest priority | First |
| 2 | Medium priority | Next |
| 5 | Low priority | Last |
Simple example
var queue = new PriorityQueue<string, int>();
queue.Enqueue("Job A", 5);
queue.Enqueue("Job B", 1);
queue.Enqueue("Job C", 3);
Console.WriteLine(queue.Dequeue()); // Job B
⚠️ Important limitation
- ❌ PriorityQueue is NOT thread-safe
- ✔ This system fixes that using
lock + SemaphoreSlim
2. Job Model
public record Job(
string Name,
int Priority,
int RetryCount = 0,
int MaxRetries = 3);
Why this works well
- Immutable (
record)
- Safe for multithreading
- Supports retry tracking
3. Blocking Priority Queue (Core Engine)
public class BlockingPriorityQueue
{
private readonly PriorityQueue<Job, int> _queue = new();
private readonly object _lock = new();
private readonly SemaphoreSlim _signal = new(0);
Why these 3 components?
| Component | Purpose |
| PriorityQueue | Stores jobs by priority |
| lock | Thread safety |
| SemaphoreSlim | Async waiting / signaling |
➕ Enqueue (Producer side)
public void Enqueue(Job job)
{
lock (_lock)
{
_queue.Enqueue(job, job.Priority);
_signal.Release();
}
}
Flow
Producer → lock → enqueue → signal workers
➖ Dequeue (Consumer side)
public async Task<Job> DequeueAsync(CancellationToken cancellationToken = default)
{
await _signal.WaitAsync(cancellationToken);
lock (_lock)
{
return _queue.Dequeue();
}
}
Flow
Worker waits → signal → lock → dequeue highest priority job
4. Dead Letter Queue (DLQ)
public class DeadLetterQueue
{
private readonly List<Job> _failedJobs = new();
public void Add(Job job, Exception ex)
{
Console.WriteLine($"DLQ: Job {job.Name} failed permanently: {ex.Message}");
_failedJobs.Add(job);
}
public IReadOnlyList<Job> GetAll() => _failedJobs;
}
Purpose
DLQ stores jobs that:
- exceeded retry limit
- failed permanently
- require manual inspection
5. Job Processor (Worker System)
public class JobProcessor(BlockingPriorityQueue jobQueue, DeadLetterQueue deadLetterQueue)
{
private readonly BlockingPriorityQueue _jobQueue = jobQueue;
private readonly DeadLetterQueue _deadLetterQueue = deadLetterQueue;
Main processing loop
public async Task ProcessJobsAsync(CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
var job = await _jobQueue.DequeueAsync(cancellationToken);
await ProcessJob(job, cancellationToken);
}
catch (OperationCanceledException)
{
Console.WriteLine("Job processing cancelled.");
break;
}
}
}
Job execution logic
private async Task ProcessJob(Job job, CancellationToken ct)
{
try
{
Console.WriteLine($"Processing {job.Name}");
await Task.Delay(500, ct);
if (Random.Shared.Next(0, 3) == 0)
throw new Exception("Random failure");
Console.WriteLine($"✅ Success: {job.Name}");
}
catch (Exception ex)
{
await HandleFailure(job, ex);
}
}
6. Retry Logic
private async Task HandleFailure(Job job, Exception ex)
{
if (job.RetryCount < job.MaxRetries)
{
var retryJob = job with
{
RetryCount = job.RetryCount + 1
};
Console.WriteLine($"Retrying {job.Name} ({retryJob.RetryCount})");
await Task.Delay(1000);
_jobQueue.Enqueue(retryJob);
}
else
{
_deadLetterQueue.Add(job, ex);
}
}
Behavior flow
Failure → retry → requeue → success OR DLQ
7. Job Producer
public class JobProducer(BlockingPriorityQueue jobQueue, DeadLetterQueue deadLetterQueue)
{
private readonly BlockingPriorityQueue _jobQueue = jobQueue;
private readonly DeadLetterQueue _deadLetterQueue = deadLetterQueue;
Publish single job
public void PublishJob(string name, int priority)
{
var job = new Job(name, priority);
Console.WriteLine($"Publishing {job.Name} with priority {job.Priority}");
_jobQueue.Enqueue(job);
}
Publish multiple jobs
public void PublishJobs(IEnumerable<Job> jobs)
{
foreach (var job in jobs)
{
Console.WriteLine($"Publishing {job.Name} with priority {job.Priority}");
_jobQueue.Enqueue(job);
}
}
Manual DLQ injection
public void PublishDeadLetterJob(string name, int priority)
{
var job = new Job(name, priority);
Console.WriteLine($"Publishing {job.Name} to DLQ");
_deadLetterQueue.Add(job, new Exception("Manually added"));
}
8. System Execution
Setup
var queue = new BlockingPriorityQueue();
var dlq = new DeadLetterQueue();
var jobProducer = new JobProducer(queue, dlq);
var cts = new CancellationTokenSource();
Start workers
var job1 = new JobProcessor(queue, dlq);
var job2 = new JobProcessor(queue, dlq);
var workerTasks = new[]
{
Task.Run(() => job1.ProcessJobsAsync(cts.Token)),
Task.Run(() => job2.ProcessJobsAsync(cts.Token))
};
Publish jobs
for (int i = 0; i < 10; i++)
{
var priority = Random.Shared.Next(1, 5);
Console.WriteLine($"\nPublishing Job - {i} with priority {priority}");
jobProducer.PublishJob($"Job - {i}", priority);
}
Shutdown system
await Task.Delay(3000);
cts.Cancel();
await Task.WhenAll(workerTasks);
Console.WriteLine("System stopped cleanly.");
9. Runtime behavior
- Jobs are published with random priority
- Stored in PriorityQueue
- Workers wait asynchronously
- Highest priority job executes first
- Random failures occur
- Failed jobs retry if allowed
- Permanent failures go to DLQ
- System shuts down gracefully
10. Key Concepts Learned
✔ PriorityQueue
Built-in heap structure for priority-based scheduling
✔ Thread Safety
Handled using:
✔ Producer / Consumer
- Producer → JobProducer
- Consumer → JobProcessor
✔ Retry System
Automatic retry with counter tracking
✔ Dead Letter Queue
Stores failed jobs permanently
✔ Multithreading
Multiple workers process jobs concurrently
✔ Graceful Shutdown
Uses CancellationTokenSource
Summary
Real-world style system that includes:
- Priority scheduling
- Concurrent workers
- Async processing
- Retry logic
- Failure isolation (DLQ)
- Thread-safe queue coordination