This tutorial explains how to build a multithreaded, priority-based job processing system using:
PriorityQueue<TElement,TPriority>
SemaphoreSlim
- Producer / Consumer pattern
- Retry mechanism
- Dead Letter Queue (DLQ)
- Multiple workers
1. What is PriorityQueue in C#?
Definition
PriorityQueue<TElement,TPriority> is a built-in .NET collection that:
Stores elements so that the item with the highest priority is dequeued first, not the one inserted first.
Official docs:
https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.priorityqueue-2?view=net-10.0
⚙️ How it works (in this system)
private readonly PriorityQueue<Job, int> _queue = new();
Jobs are added like this:
_queue.Enqueue(job, job.Priority);
Each entry has:
- Element →
Job
- Priority →
int
Priority rules in .NET
Lower number = higher priority
| Priority | Meaning | Execution Order |
| 1 | Highest priority | First |
| 2 | Medium priority | Next |
| 5 | Low priority | Last |
Simple example
var queue = new PriorityQueue<string, int>();
queue.Enqueue("Job A", 5);
queue.Enqueue("Job B", 1);
queue.Enqueue("Job C", 3);
Console.WriteLine(queue.Dequeue()); // Job B
⚠️ Important limitation
- ❌ PriorityQueue is NOT thread-safe
- ✔ This system fixes that using
lock + SemaphoreSlim
2. Job Model
public record Job(
string Name,
int Priority,
int RetryCount = 0,
int MaxRetries = 3);
Why this works well
- Immutable (
record)
- Safe for multithreading
- Supports retry tracking
3. Blocking Priority Queue (Core Engine)
public class BlockingPriorityQueue
{
private readonly PriorityQueue<Job, int> _queue = new();
private readonly object _lock = new();
private readonly SemaphoreSlim _signal = new(0);
Why these 3 components?
| Component | Purpose |
| PriorityQueue | Stores jobs by priority |
| lock | Thread safety |
| SemaphoreSlim | Async waiting / signaling |
➕ Enqueue (Producer side)
public void Enqueue(Job job)
{
lock (_lock)
{
_queue.Enqueue(job, job.Priority);
_signal.Release();
}
}
Flow
Producer → lock → enqueue → signal workers
➖ Dequeue (Consumer side)
public async Task<Job> DequeueAsync(CancellationToken cancellationToken = default)
{
await _signal.WaitAsync(cancellationToken);
lock (_lock)
{
return _queue.Dequeue();
}
}
Flow
Worker waits → signal → lock → dequeue highest priority job
4. Dead Letter Queue (DLQ)
public class DeadLetterQueue
{
private readonly List<Job> _failedJobs = new();
public void Add(Job job, Exception ex)
{
Console.WriteLine($"DLQ: Job {job.Name} failed permanently: {ex.Message}");
_failedJobs.Add(job);
}
public IReadOnlyList<Job> GetAll() => _failedJobs;
}
```
### Purpose
DLQ stores jobs that:
* exceeded retry limit
* failed permanently
* require manual inspection
---
# 5. Job Processor (Worker System)
```csharp
public class JobProcessor(BlockingPriorityQueue jobQueue, DeadLetterQueue deadLetterQueue)
{
private readonly BlockingPriorityQueue _jobQueue = jobQueue;
private readonly DeadLetterQueue _deadLetterQueue = deadLetterQueue;
```
---
## Main processing loop
```csharp
public async Task ProcessJobsAsync(CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
var job = await _jobQueue.DequeueAsync(cancellationToken);
await ProcessJob(job, cancellationToken);
}
catch (OperationCanceledException)
{
Console.WriteLine("Job processing cancelled.");
break;
}
}
}
```
---
## Job execution logic
```csharp
private async Task ProcessJob(Job job, CancellationToken ct)
{
try
{
Console.WriteLine($"Processing {job.Name}");
await Task.Delay(500, ct);
if (Random.Shared.Next(0, 3) == 0)
throw new Exception("Random failure");
Console.WriteLine($"✅ Success: {job.Name}");
}
catch (Exception ex)
{
await HandleFailure(job, ex);
}
}
```
---
# 6. Retry Logic
```csharp
private async Task HandleFailure(Job job, Exception ex)
{
if (job.RetryCount < job.MaxRetries)
{
var retryJob = job with
{
RetryCount = job.RetryCount + 1
};
Console.WriteLine($"Retrying {job.Name} ({retryJob.RetryCount})");
await Task.Delay(1000);
_jobQueue.Enqueue(retryJob);
}
else
{
_deadLetterQueue.Add(job, ex);
}
}
Behavior flow
Failure → retry → requeue → success OR DLQ
7. Job Producer
public class JobProducer(BlockingPriorityQueue jobQueue, DeadLetterQueue deadLetterQueue)
{
private readonly BlockingPriorityQueue _jobQueue = jobQueue;
private readonly DeadLetterQueue _deadLetterQueue = deadLetterQueue;
Publish single job
public void PublishJob(string name, int priority)
{
var job = new Job(name, priority);
Console.WriteLine($"Publishing {job.Name} with priority {job.Priority}");
_jobQueue.Enqueue(job);
}
```
---
## Publish multiple jobs
```csharp
public void PublishJobs(IEnumerable<Job> jobs)
{
foreach (var job in jobs)
{
Console.WriteLine($"Publishing {job.Name} with priority {job.Priority}");
_jobQueue.Enqueue(job);
}
}
Manual DLQ injection
public void PublishDeadLetterJob(string name, int priority)
{
var job = new Job(name, priority);
Console.WriteLine($"Publishing {job.Name} to DLQ");
_deadLetterQueue.Add(job, new Exception("Manually added"));
}
```
---
# 8. System Execution
## Setup
```csharp
var queue = new BlockingPriorityQueue();
var dlq = new DeadLetterQueue();
var jobProducer = new JobProducer(queue, dlq);
var cts = new CancellationTokenSource();
```
---
## Start workers
```csharp
var job1 = new JobProcessor(queue, dlq);
var job2 = new JobProcessor(queue, dlq);
var workerTasks = new[]
{
Task.Run(() => job1.ProcessJobsAsync(cts.Token)),
Task.Run(() => job2.ProcessJobsAsync(cts.Token))
};
```
---
## Publish jobs
```csharp
for (int i = 0; i < 10; i++)
{
var priority = Random.Shared.Next(1, 5);
Console.WriteLine($"\nPublishing Job - {i} with priority {priority}");
jobProducer.PublishJob($"Job - {i}", priority);
}
```
---
## Shutdown system
```csharp
await Task.Delay(3000);
cts.Cancel();
await Task.WhenAll(workerTasks);
Console.WriteLine("System stopped cleanly.");
```
---
# 9. Runtime behavior
1. Jobs are published with random priority
2. Stored in PriorityQueue
3. Workers wait asynchronously
4. Highest priority job executes first
5. Random failures occur
6. Failed jobs retry if allowed
7. Permanent failures go to DLQ
8. System shuts down gracefully
---
# 10. Key Concepts Learned
## ✔ PriorityQueue
Built-in heap structure for priority-based scheduling
## ✔ Thread Safety
Handled using:
* lock
* SemaphoreSlim
## ✔ Producer / Consumer
* Producer → JobProducer
* Consumer → JobProcessor
## ✔ Retry System
Automatic retry with counter tracking
## ✔ Dead Letter Queue
Stores failed jobs permanently
## ✔ Multithreading
Multiple workers process jobs concurrently
## ✔ Graceful Shutdown
Uses CancellationTokenSource
---
# Summary
Real-world style system that includes:
* Priority scheduling
* Concurrent workers
* Async processing
* Retry logic
* Failure isolation (DLQ)
* Thread-safe queue coordination
---
**Full Implementation **
```csharp
public record Job(string Name,int Priority,int RetryCount = 0, int MaxRetries = 3);
/// <summary>
///
/// </summary>
public class BlockingPriorityQueue
{
/// <summary>
///
/// </summary>
private readonly PriorityQueue<Job, int> _queue = new();
/// <summary>
///
/// </summary>
private readonly object _lock = new();
/// <summary>
///
/// </summary>
private readonly SemaphoreSlim _signal = new(0);
public void Enqueue(Job job)
{
lock (_lock)
{
_queue.Enqueue(job, job.Priority);
_signal.Release();
}
}
public async Task<Job> DequeueAsync(CancellationToken cancellationToken = default) {
await _signal.WaitAsync(cancellationToken);
lock (_lock)
{
return _queue.Dequeue();
}
}
}
public class DeadLetterQueue
{
private readonly List<Job> _failedJobs = new();
public void Add(Job job, Exception ex)
{
Console.WriteLine($"DLQ: Job {job.Name} failed permanently: {ex.Message}");
_failedJobs.Add(job);
}
public IReadOnlyList<Job> GetAll() => _failedJobs;
}
public class JobProcessor(BlockingPriorityQueue jobQueue, DeadLetterQueue deadLetterQueue)
{
private readonly BlockingPriorityQueue _jobQueue = jobQueue;
private readonly DeadLetterQueue _deadLetterQueue = deadLetterQueue;
public async Task ProcessJobsAsync(CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
var job = await _jobQueue.DequeueAsync(cancellationToken);
if (job != null)
{
await ProcessJob(job, cancellationToken);
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Job processing cancelled.");
break;
}
}
}
private async Task ProcessJob(Job job, CancellationToken ct)
{
try
{
Console.WriteLine($"Processing {job.Name}");
// Simulate work
// In a real implementation, this would be where the actual job logic goes
// For demonstration, we just delay for a short time
// http Call .. database operation .. etc
await Task.Delay(500, ct);
// Simulate random failure
if (Random.Shared.Next(0, 3) == 0)
throw new Exception("Random failure");
Console.WriteLine($"✅ Success: {job.Name}");
}
catch (Exception ex)
{
await HandleFailure(job, ex);
}
}
private async Task HandleFailure(Job job, Exception ex)
{
if (job.RetryCount < job.MaxRetries)
{
var retryJob = job with { RetryCount = job.RetryCount + 1 };
Console.WriteLine($"Retrying {job.Name} ({retryJob.RetryCount})");
await Task.Delay(1000);
_jobQueue.Enqueue(retryJob);
}
else
{
_deadLetterQueue.Add(job, ex);
}
}
public class JobProducer(BlockingPriorityQueue jobQueue, DeadLetterQueue deadLetterQueue)
{
/// <summary>
/// Thread-safe priority queue for managing jobs.
/// </summary>
/// <remarks>Enables prioritized job processing in concurrent scenarios.</remarks>
private readonly BlockingPriorityQueue _jobQueue = jobQueue;
/// <summary>
///
/// </summary>
private readonly DeadLetterQueue _deadLetterQueue = deadLetterQueue;
/// <summary>
/// Publishes a collection of jobs to the job queue.
/// </summary>
/// <param name="jobs">The jobs to be published.</param>
public void PublishJobs(IEnumerable<Job> jobs)
{
foreach (var job in jobs)
{
Console.WriteLine($"Publishing {job.Name} with priority {job.Priority}");
_jobQueue.Enqueue(job);
}
}
public void PublishJob(string name, int priority)
{
var job = new Job(name, priority);
Console.WriteLine($"Publishing {job.Name} with priority {job.Priority}");
_jobQueue.Enqueue(job);
}
public void PublishDeadLetterJob(string name, int priority)
{
var job = new Job(name, priority);
Console.WriteLine($"Publishing {job.Name} to DLQ with priority {job.Priority}");
_deadLetterQueue.Add(job, new Exception("Manually added to DLQ"));
}
}
}
var queue = new BlockingPriorityQueue();
var dlq = new DeadLetterQueue();
var jobProducer = new JobProducer(queue, dlq);
var cts = new CancellationTokenSource();
try
{
var job1 = new JobProcessor(queue, dlq);
var job2 = new JobProcessor(queue, dlq);
var workerTasks = new[]
{
Task.Run(() => job1.ProcessJobsAsync(cts.Token)),
Task.Run(() => job2.ProcessJobsAsync(cts.Token))
};
// Publish jobs
for (int i = 0; i < 10; i++)
{
var priority = Random.Shared.Next(1, 5);
Console.BackgroundColor = ConsoleColor.Green;
Console.WriteLine(Environment.NewLine + $"Publishing Job - {i} with priority {priority}");
Console.WriteLine($"Publishing Job - {i} with priority {priority}");
Console.ResetColor();
jobProducer.PublishJob($"Job - {i}", priority);
}
await Task.Delay(3000);
cts.Cancel();
await Task.WhenAll(workerTasks);
Console.WriteLine("System stopped cleanly.");
}
catch (OperationCanceledException)
{
Console.WriteLine("Processing cancelled.");
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
Console.ReadKey();