Priority-Based Job Processing System in C#

Leader 24 59 119
calendar_todayschedule6 min read

This tutorial explains how to build a multithreaded, priority-based job processing system using:

  • PriorityQueue<TElement,TPriority>
  • SemaphoreSlim
  • Producer / Consumer pattern
  • Retry mechanism
  • Dead Letter Queue (DLQ)
  • Multiple workers

1. What is PriorityQueue in C#?

Definition

PriorityQueue<TElement,TPriority> is a built-in .NET collection that:

Stores elements so that the item with the highest priority is dequeued first, not the one inserted first.

Official docs:
https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.priorityqueue-2?view=net-10.0


⚙️ How it works (in this system)

private readonly PriorityQueue<Job, int> _queue = new();

Jobs are added like this:

_queue.Enqueue(job, job.Priority);

Each entry has:

  • ElementJob
  • Priorityint

Priority rules in .NET

Lower number = higher priority

Priority Meaning Execution Order
1 Highest priority First
2 Medium priority Next
5 Low priority Last

Simple example

var queue = new PriorityQueue<string, int>();

queue.Enqueue("Job A", 5);
queue.Enqueue("Job B", 1);
queue.Enqueue("Job C", 3);

Console.WriteLine(queue.Dequeue()); // Job B

⚠️ Important limitation

  • ❌ PriorityQueue is NOT thread-safe
  • ✔ This system fixes that using lock + SemaphoreSlim

2. Job Model

public record Job(
    string Name,
    int Priority,
    int RetryCount = 0,
    int MaxRetries = 3);

Why this works well

  • Immutable (record)
  • Safe for multithreading
  • Supports retry tracking

3. Blocking Priority Queue (Core Engine)

public class BlockingPriorityQueue
{
    private readonly PriorityQueue<Job, int> _queue = new();
    private readonly object _lock = new();
    private readonly SemaphoreSlim _signal = new(0);

Why these 3 components?

Component Purpose
PriorityQueue Stores jobs by priority
lock Thread safety
SemaphoreSlim Async waiting / signaling

➕ Enqueue (Producer side)

public void Enqueue(Job job)
{
    lock (_lock)
    {
        _queue.Enqueue(job, job.Priority);
        _signal.Release();
    }
}

Flow

Producer → lock → enqueue → signal workers


➖ Dequeue (Consumer side)

public async Task<Job> DequeueAsync(CancellationToken cancellationToken = default)
{
    await _signal.WaitAsync(cancellationToken);

    lock (_lock)
    {
        return _queue.Dequeue();
    }
}

Flow

Worker waits → signal → lock → dequeue highest priority job


4. Dead Letter Queue (DLQ)

public class DeadLetterQueue
{
    private readonly List<Job> _failedJobs = new();

    public void Add(Job job, Exception ex)
    {
        Console.WriteLine($"DLQ: Job {job.Name} failed permanently: {ex.Message}");
        _failedJobs.Add(job);
    }

    public IReadOnlyList<Job> GetAll() => _failedJobs;
}
```

### Purpose

DLQ stores jobs that:

* exceeded retry limit
* failed permanently
* require manual inspection

---

# 5. Job Processor (Worker System)

```csharp
public class JobProcessor(BlockingPriorityQueue jobQueue, DeadLetterQueue deadLetterQueue)
{
    private readonly BlockingPriorityQueue _jobQueue = jobQueue;
    private readonly DeadLetterQueue _deadLetterQueue = deadLetterQueue;
```

---

## Main processing loop

```csharp
public async Task ProcessJobsAsync(CancellationToken cancellationToken = default)
{
    while (!cancellationToken.IsCancellationRequested)
    {
        try
        {
            var job = await _jobQueue.DequeueAsync(cancellationToken);

            await ProcessJob(job, cancellationToken);
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Job processing cancelled.");
            break;
        }
    }
}
```

---

## Job execution logic

```csharp
private async Task ProcessJob(Job job, CancellationToken ct)
{
    try
    {
        Console.WriteLine($"Processing {job.Name}");

        await Task.Delay(500, ct);

        if (Random.Shared.Next(0, 3) == 0)
            throw new Exception("Random failure");

        Console.WriteLine($"✅ Success: {job.Name}");
    }
    catch (Exception ex)
    {
        await HandleFailure(job, ex);
    }
}
```

---

# 6. Retry Logic

```csharp
private async Task HandleFailure(Job job, Exception ex)
{
    if (job.RetryCount < job.MaxRetries)
    {
        var retryJob = job with
        {
            RetryCount = job.RetryCount + 1
        };

        Console.WriteLine($"Retrying {job.Name} ({retryJob.RetryCount})");

        await Task.Delay(1000);

        _jobQueue.Enqueue(retryJob);
    }
    else
    {
        _deadLetterQueue.Add(job, ex);
    }
}

Behavior flow

Failure → retry → requeue → success OR DLQ


7. Job Producer

public class JobProducer(BlockingPriorityQueue jobQueue, DeadLetterQueue deadLetterQueue)
{
    private readonly BlockingPriorityQueue _jobQueue = jobQueue;
    private readonly DeadLetterQueue _deadLetterQueue = deadLetterQueue;

Publish single job

public void PublishJob(string name, int priority)
{
    var job = new Job(name, priority);

    Console.WriteLine($"Publishing {job.Name} with priority {job.Priority}");

    _jobQueue.Enqueue(job);
}
```

---

## Publish multiple jobs

```csharp
public void PublishJobs(IEnumerable<Job> jobs)
{
    foreach (var job in jobs)
    {
        Console.WriteLine($"Publishing {job.Name} with priority {job.Priority}");
        _jobQueue.Enqueue(job);
    }
}

Manual DLQ injection

public void PublishDeadLetterJob(string name, int priority)
{
    var job = new Job(name, priority);

    Console.WriteLine($"Publishing {job.Name} to DLQ");

    _deadLetterQueue.Add(job, new Exception("Manually added"));
}
```

---

# 8. System Execution

## Setup

```csharp
var queue = new BlockingPriorityQueue();
var dlq = new DeadLetterQueue();
var jobProducer = new JobProducer(queue, dlq);
var cts = new CancellationTokenSource();
```

---

## Start workers

```csharp
var job1 = new JobProcessor(queue, dlq);
var job2 = new JobProcessor(queue, dlq);

var workerTasks = new[]
{
    Task.Run(() => job1.ProcessJobsAsync(cts.Token)),
    Task.Run(() => job2.ProcessJobsAsync(cts.Token))
};
```

---

## Publish jobs

```csharp
for (int i = 0; i < 10; i++)
{
    var priority = Random.Shared.Next(1, 5);

    Console.WriteLine($"\nPublishing Job - {i} with priority {priority}");

    jobProducer.PublishJob($"Job - {i}", priority);
}
```

---

## Shutdown system

```csharp
await Task.Delay(3000);

cts.Cancel();

await Task.WhenAll(workerTasks);

Console.WriteLine("System stopped cleanly.");
```

---

# 9. Runtime behavior

1. Jobs are published with random priority
2. Stored in PriorityQueue
3. Workers wait asynchronously
4. Highest priority job executes first
5. Random failures occur
6. Failed jobs retry if allowed
7. Permanent failures go to DLQ
8. System shuts down gracefully

---

# 10. Key Concepts Learned

## ✔ PriorityQueue

Built-in heap structure for priority-based scheduling

## ✔ Thread Safety

Handled using:

* lock
* SemaphoreSlim

## ✔ Producer / Consumer

* Producer → JobProducer
* Consumer → JobProcessor

## ✔ Retry System

Automatic retry with counter tracking

## ✔ Dead Letter Queue

Stores failed jobs permanently

## ✔ Multithreading

Multiple workers process jobs concurrently

## ✔ Graceful Shutdown

Uses CancellationTokenSource

---

#  Summary

Real-world style system that includes:

* Priority scheduling
* Concurrent workers
* Async processing
* Retry logic
* Failure isolation (DLQ)
* Thread-safe queue coordination

---

**Full Implementation **

```csharp
public record Job(string Name,int Priority,int RetryCount = 0, int MaxRetries = 3);
/// <summary>
/// 
/// </summary>
public class BlockingPriorityQueue
{
    /// <summary>
    /// 
    /// </summary>
    private readonly PriorityQueue<Job, int> _queue = new();
    /// <summary>
    /// 
    /// </summary>
    private readonly object _lock = new();
    /// <summary>
    /// 
    /// </summary>
    private readonly SemaphoreSlim _signal = new(0);

    public void Enqueue(Job job)
    {
        lock (_lock)
        {
            _queue.Enqueue(job, job.Priority);

            _signal.Release();
        }
    }

    public async Task<Job> DequeueAsync(CancellationToken cancellationToken = default) { 
    
        await _signal.WaitAsync(cancellationToken);
       
        lock (_lock)
        {
            return _queue.Dequeue();
        }
    }
}


public class DeadLetterQueue
{
    private readonly List<Job> _failedJobs = new();

    public void Add(Job job, Exception ex)
    {
        Console.WriteLine($"DLQ: Job {job.Name} failed permanently: {ex.Message}");
        _failedJobs.Add(job);
    }

    public IReadOnlyList<Job> GetAll() => _failedJobs;
}


public class JobProcessor(BlockingPriorityQueue jobQueue, DeadLetterQueue deadLetterQueue)
{
    private readonly BlockingPriorityQueue _jobQueue = jobQueue;

    private readonly DeadLetterQueue _deadLetterQueue = deadLetterQueue;

    public async Task ProcessJobsAsync(CancellationToken cancellationToken = default)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            try
            {
               var job = await _jobQueue.DequeueAsync(cancellationToken);
                if (job != null) 
                { 
                    await ProcessJob(job, cancellationToken);
                }
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("Job processing cancelled.");
                break;
            }
        }
    }

    private async Task ProcessJob(Job job, CancellationToken ct)
    {

        try
        {
            Console.WriteLine($"Processing {job.Name}");

            // Simulate work
            // In a real implementation, this would be where the actual job logic goes
            // For demonstration, we just delay for a short time
            // http Call .. database operation .. etc
            await Task.Delay(500, ct);

            // Simulate random failure
            if (Random.Shared.Next(0, 3) == 0)
                throw new Exception("Random failure");

            Console.WriteLine($"✅ Success: {job.Name}");
        }
        catch (Exception ex)
        {
            await HandleFailure(job, ex);
        }
    }

    private async Task HandleFailure(Job job, Exception ex)
    {
        if (job.RetryCount < job.MaxRetries)
        {
            var retryJob = job with { RetryCount = job.RetryCount + 1 };

            Console.WriteLine($"Retrying {job.Name} ({retryJob.RetryCount})");

            await Task.Delay(1000);

            _jobQueue.Enqueue(retryJob);
        }
        else
        {
            _deadLetterQueue.Add(job, ex);
        }
    }


    public class JobProducer(BlockingPriorityQueue jobQueue, DeadLetterQueue deadLetterQueue)
    {
        /// <summary>
        /// Thread-safe priority queue for managing jobs.   
        /// </summary>
        /// <remarks>Enables prioritized job processing in concurrent scenarios.</remarks>
        private readonly BlockingPriorityQueue _jobQueue = jobQueue;
        /// <summary>
        /// 
        /// </summary>
        private readonly DeadLetterQueue _deadLetterQueue = deadLetterQueue;
        /// <summary>
        /// Publishes a collection of jobs to the job queue.
        /// </summary>
        /// <param name="jobs">The jobs to be published.</param>
        public void PublishJobs(IEnumerable<Job> jobs)
        {
            foreach (var job in jobs)
            {
                Console.WriteLine($"Publishing {job.Name} with priority {job.Priority}");

                _jobQueue.Enqueue(job);
            }
        }

        public void PublishJob(string name, int priority)
        {
            var job = new Job(name, priority);

            Console.WriteLine($"Publishing {job.Name} with priority {job.Priority}");

            _jobQueue.Enqueue(job);
        }

        public void PublishDeadLetterJob(string name, int priority)
        {
            var job = new Job(name, priority);
            Console.WriteLine($"Publishing {job.Name} to DLQ with priority {job.Priority}");
            _deadLetterQueue.Add(job, new Exception("Manually added to DLQ"));
        }
    }
}

var queue = new BlockingPriorityQueue();
var dlq = new DeadLetterQueue();
var jobProducer = new JobProducer(queue, dlq);
var cts = new CancellationTokenSource();

try
{
    var job1 = new JobProcessor(queue, dlq);
    var job2 = new JobProcessor(queue, dlq);

    var workerTasks = new[]
    {
        Task.Run(() => job1.ProcessJobsAsync(cts.Token)),
        Task.Run(() => job2.ProcessJobsAsync(cts.Token))
    };

    // Publish jobs
    for (int i = 0; i < 10; i++)
    {
        var priority = Random.Shared.Next(1, 5);

        Console.BackgroundColor = ConsoleColor.Green;
        Console.WriteLine(Environment.NewLine + $"Publishing Job - {i} with priority {priority}");
        Console.WriteLine($"Publishing Job - {i} with priority {priority}");
        Console.ResetColor();   
        jobProducer.PublishJob($"Job - {i}", priority);
    }

    await Task.Delay(3000);

    cts.Cancel();

    await Task.WhenAll(workerTasks);

    Console.WriteLine("System stopped cleanly.");
}
catch (OperationCanceledException)
{
    Console.WriteLine("Processing cancelled.");
}
catch (Exception ex)
{
    Console.WriteLine($"Error: {ex.Message}");
}

Console.ReadKey();

🔥 Join developers growing publicly
Share your knowledge, build in public, and grow your developer presence with a global community.

More Posts

Abstract Factory Pattern Tutorial

Spyros - May 11

TechTalk 20250611: Factory Pattern - Separating Creation from Use

Methodox - Jun 11, 2025

# The Rule Pattern in C#, and Why It Works So Well with the Result Pattern

Spyros - Apr 7

DevLog 20250506 C# Video Processing Foundation Library

Methodox - May 16, 2025

The State Pattern

Spyros - Jan 17, 2025
chevron_left
7.8k Points202 Badges
Athens Greeceweb-partner.gr
43Posts
164Comments
98Connections
Passionate about building robust and scalable software solutions with a focus on .NET technologies. ... Show more

Related Jobs

View all jobs →

Commenters (This Week)

1 comment
1 comment
1 comment

Contribute meaningful comments to climb the leaderboard and earn badges!