Passa al contenuto principale

Code native .NET

.NET offre strutture dati native per la gestione di code in-process. Sono la scelta giusta quando la comunicazione avviene all'interno dello stesso processo, senza la complessità di un broker esterno.

Queue<T> — coda FIFO non thread-safe

Queue<T> implementa una coda FIFO semplice. Non è thread-safe: va usata solo in scenari single-thread o protetta con lock espliciti.

var queue = new Queue<string>();

queue.Enqueue("primo");
queue.Enqueue("secondo");
queue.Enqueue("terzo");

while (queue.TryDequeue(out var item))
{
Console.WriteLine(item); // primo, secondo, terzo
}

TryDequeue è preferibile a Dequeue perché non lancia eccezione se la coda è vuota.

ConcurrentQueue<T> — coda thread-safe

ConcurrentQueue<T> è la variante thread-safe di Queue<T>. Più thread possono accodare e togliere elementi senza lock espliciti:

var queue = new ConcurrentQueue<OrdineEvent>();

// Thread produttore
queue.Enqueue(new OrdineEvent(ordineId, "Confermato"));

// Thread consumatore
if (queue.TryDequeue(out var evt))
{
await ProcessEventAsync(evt);
}

ConcurrentQueue<T> è adatta per scenari semplici produttore/consumatore, ma non offre meccanismi di attesa bloccante (il consumatore deve fare polling). Per questo si preferisce Channel<T>.

Channel<T> — produttore/consumatore asincrono

System.Threading.Channels.Channel<T> è la soluzione moderna per la comunicazione asincrona produttore/consumatore. Supporta backpressure, è completamente asincrona e integra nativamente con async/await.

Channel non limitato

var channel = Channel.CreateUnbounded<OrdineEvent>();

// Produttore
await channel.Writer.WriteAsync(new OrdineEvent(ordineId, "Pagato"));

// Consumatore (in background)
await foreach (var evt in channel.Reader.ReadAllAsync(cancellationToken))
{
await ProcessEventAsync(evt);
}

Channel limitato (con backpressure)

var channel = Channel.CreateBounded<OrdineEvent>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait // il produttore aspetta se la coda è piena
});

BoundedChannelFullMode ha altre opzioni:

ValoreComportamento
WaitIl produttore aspetta (backpressure)
DropWriteIl messaggio nuovo viene scartato
DropOldestIl messaggio più vecchio viene scartato
DropNewestIl messaggio più nuovo viene scartato

Integrazione con IHostedService

Il pattern classico è un BackgroundService che consuma il channel:

public class OrdineEventProcessor : BackgroundService
{
private readonly ChannelReader<OrdineEvent> _reader;
private readonly ILogger<OrdineEventProcessor> _logger;

public OrdineEventProcessor(
ChannelReader<OrdineEvent> reader,
ILogger<OrdineEventProcessor> logger)
{
_reader = reader;
_logger = logger;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var evt in _reader.ReadAllAsync(stoppingToken))
{
try
{
await ProcessAsync(evt, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Errore elaborazione evento {TipoEvento} per ordine {OrdineId}",
evt.Tipo, evt.OrdineId);
}
}
}

private Task ProcessAsync(OrdineEvent evt, CancellationToken ct)
{
// logica di elaborazione
return Task.CompletedTask;
}
}
// Program.cs — registrazione
var channel = Channel.CreateBounded<OrdineEvent>(100);

builder.Services.AddSingleton(channel.Writer);
builder.Services.AddSingleton(channel.Reader);
builder.Services.AddHostedService<OrdineEventProcessor>();

Il produttore (es. un controller o un use case) inietta ChannelWriter<OrdineEvent> e scrive senza sapere nulla del consumatore.

Confronto

StrutturaThread-safeAsincronaBackpressureScenari
Queue<T>Single-thread, struttura interna
ConcurrentQueue<T>Multi-thread, polling
Channel<T>Produttore/consumatore in-process

Per code distribuite o persistenti (messaggi che sopravvivono al riavvio del processo) si usano librerie dedicate. Vedi 09-librerie-code.