Priority-Based Job Processing System in C#

Leader posted 4 min read

This tutorial explains how to build a multithreaded, priority-based job processing system using:

  • PriorityQueue<TElement,TPriority>
  • SemaphoreSlim
  • Producer / Consumer pattern
  • Retry mechanism
  • Dead Letter Queue (DLQ)
  • Multiple workers

1. What is PriorityQueue in C#?

Definition

PriorityQueue<TElement,TPriority> is a built-in .NET collection that:

Stores elements so that the item with the highest priority is dequeued first, not the one inserted first.

Official docs:
https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.priorityqueue-2?view=net-10.0


⚙️ How it works (in this system)

private readonly PriorityQueue<Job, int> _queue = new();

Jobs are added like this:

_queue.Enqueue(job, job.Priority);

Each entry has:

  • ElementJob
  • Priorityint

Priority rules in .NET

Lower number = higher priority

Priority Meaning Execution Order
1 Highest priority First
2 Medium priority Next
5 Low priority Last

Simple example

var queue = new PriorityQueue<string, int>();

queue.Enqueue("Job A", 5);
queue.Enqueue("Job B", 1);
queue.Enqueue("Job C", 3);

Console.WriteLine(queue.Dequeue()); // Job B

⚠️ Important limitation

  • ❌ PriorityQueue is NOT thread-safe
  • ✔ This system fixes that using lock + SemaphoreSlim

2. Job Model

public record Job(
    string Name,
    int Priority,
    int RetryCount = 0,
    int MaxRetries = 3);

Why this works well

  • Immutable (record)
  • Safe for multithreading
  • Supports retry tracking

3. Blocking Priority Queue (Core Engine)

public class BlockingPriorityQueue
{
    private readonly PriorityQueue<Job, int> _queue = new();
    private readonly object _lock = new();
    private readonly SemaphoreSlim _signal = new(0);

Why these 3 components?

Component Purpose
PriorityQueue Stores jobs by priority
lock Thread safety
SemaphoreSlim Async waiting / signaling

➕ Enqueue (Producer side)

public void Enqueue(Job job)
{
    lock (_lock)
    {
        _queue.Enqueue(job, job.Priority);
        _signal.Release();
    }
}

Flow

Producer → lock → enqueue → signal workers


➖ Dequeue (Consumer side)

public async Task<Job> DequeueAsync(CancellationToken cancellationToken = default)
{
    await _signal.WaitAsync(cancellationToken);

    lock (_lock)
    {
        return _queue.Dequeue();
    }
}

Flow

Worker waits → signal → lock → dequeue highest priority job


4. Dead Letter Queue (DLQ)

public class DeadLetterQueue
{
    private readonly List<Job> _failedJobs = new();

    public void Add(Job job, Exception ex)
    {
        Console.WriteLine($"DLQ: Job {job.Name} failed permanently: {ex.Message}");
        _failedJobs.Add(job);
    }

    public IReadOnlyList<Job> GetAll() => _failedJobs;
}

Purpose

DLQ stores jobs that:

  • exceeded retry limit
  • failed permanently
  • require manual inspection

5. Job Processor (Worker System)

public class JobProcessor(BlockingPriorityQueue jobQueue, DeadLetterQueue deadLetterQueue)
{
    private readonly BlockingPriorityQueue _jobQueue = jobQueue;
    private readonly DeadLetterQueue _deadLetterQueue = deadLetterQueue;

Main processing loop

public async Task ProcessJobsAsync(CancellationToken cancellationToken = default)
{
    while (!cancellationToken.IsCancellationRequested)
    {
        try
        {
            var job = await _jobQueue.DequeueAsync(cancellationToken);

            await ProcessJob(job, cancellationToken);
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Job processing cancelled.");
            break;
        }
    }
}

Job execution logic

private async Task ProcessJob(Job job, CancellationToken ct)
{
    try
    {
        Console.WriteLine($"Processing {job.Name}");

        await Task.Delay(500, ct);

        if (Random.Shared.Next(0, 3) == 0)
            throw new Exception("Random failure");

        Console.WriteLine($"✅ Success: {job.Name}");
    }
    catch (Exception ex)
    {
        await HandleFailure(job, ex);
    }
}

6. Retry Logic

private async Task HandleFailure(Job job, Exception ex)
{
    if (job.RetryCount < job.MaxRetries)
    {
        var retryJob = job with
        {
            RetryCount = job.RetryCount + 1
        };

        Console.WriteLine($"Retrying {job.Name} ({retryJob.RetryCount})");

        await Task.Delay(1000);

        _jobQueue.Enqueue(retryJob);
    }
    else
    {
        _deadLetterQueue.Add(job, ex);
    }
}

Behavior flow

Failure → retry → requeue → success OR DLQ


7. Job Producer

public class JobProducer(BlockingPriorityQueue jobQueue, DeadLetterQueue deadLetterQueue)
{
    private readonly BlockingPriorityQueue _jobQueue = jobQueue;
    private readonly DeadLetterQueue _deadLetterQueue = deadLetterQueue;

Publish single job

public void PublishJob(string name, int priority)
{
    var job = new Job(name, priority);

    Console.WriteLine($"Publishing {job.Name} with priority {job.Priority}");

    _jobQueue.Enqueue(job);
}

Publish multiple jobs

public void PublishJobs(IEnumerable<Job> jobs)
{
    foreach (var job in jobs)
    {
        Console.WriteLine($"Publishing {job.Name} with priority {job.Priority}");
        _jobQueue.Enqueue(job);
    }
}

Manual DLQ injection

public void PublishDeadLetterJob(string name, int priority)
{
    var job = new Job(name, priority);

    Console.WriteLine($"Publishing {job.Name} to DLQ");

    _deadLetterQueue.Add(job, new Exception("Manually added"));
}

8. System Execution

Setup

var queue = new BlockingPriorityQueue();
var dlq = new DeadLetterQueue();
var jobProducer = new JobProducer(queue, dlq);
var cts = new CancellationTokenSource();

Start workers

var job1 = new JobProcessor(queue, dlq);
var job2 = new JobProcessor(queue, dlq);

var workerTasks = new[]
{
    Task.Run(() => job1.ProcessJobsAsync(cts.Token)),
    Task.Run(() => job2.ProcessJobsAsync(cts.Token))
};

Publish jobs

for (int i = 0; i < 10; i++)
{
    var priority = Random.Shared.Next(1, 5);

    Console.WriteLine($"\nPublishing Job - {i} with priority {priority}");

    jobProducer.PublishJob($"Job - {i}", priority);
}

Shutdown system

await Task.Delay(3000);

cts.Cancel();

await Task.WhenAll(workerTasks);

Console.WriteLine("System stopped cleanly.");

9. Runtime behavior

  1. Jobs are published with random priority
  2. Stored in PriorityQueue
  3. Workers wait asynchronously
  4. Highest priority job executes first
  5. Random failures occur
  6. Failed jobs retry if allowed
  7. Permanent failures go to DLQ
  8. System shuts down gracefully

10. Key Concepts Learned

✔ PriorityQueue

Built-in heap structure for priority-based scheduling

✔ Thread Safety

Handled using:

  • lock
  • SemaphoreSlim

✔ Producer / Consumer

  • Producer → JobProducer
  • Consumer → JobProcessor

✔ Retry System

Automatic retry with counter tracking

✔ Dead Letter Queue

Stores failed jobs permanently

✔ Multithreading

Multiple workers process jobs concurrently

✔ Graceful Shutdown

Uses CancellationTokenSource


Summary

Real-world style system that includes:

  • Priority scheduling
  • Concurrent workers
  • Async processing
  • Retry logic
  • Failure isolation (DLQ)
  • Thread-safe queue coordination

More Posts

Abstract Factory Pattern Tutorial

Spyros - May 11

TechTalk 20250611: Factory Pattern - Separating Creation from Use

Methodox - Jun 11, 2025

# The Rule Pattern in C#, and Why It Works So Well with the Result Pattern

Spyros - Apr 7

DevLog 20250506 C# Video Processing Foundation Library

Methodox - May 16, 2025

The State Pattern

Spyros - Jan 17, 2025
chevron_left

Related Jobs

View all jobs →

Commenters (This Week)

1 comment
1 comment
1 comment

Contribute meaningful comments to climb the leaderboard and earn badges!