Skip to content

Streaming Handlers

Foundatio Mediator supports streaming handlers that can return IAsyncEnumerable<T> for scenarios where you need to process and return data incrementally, such as large datasets, real-time feeds, or progressive data processing.

Basic Streaming Handler

csharp
public class ProductStreamHandler
{
    public static async IAsyncEnumerable<Product> Handle(
        GetProductsStreamQuery query,
        IProductRepository repository,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        await foreach (var product in repository.GetProductsAsync(query.CategoryId, cancellationToken))
        {
            // Process each product before yielding
            product.CalculateDiscountPrice();
            yield return product;
        }
    }
}

Consuming Streaming Results

Basic Consumption

csharp
public class ProductController : ControllerBase
{
    private readonly IMediator _mediator;

    public ProductController(IMediator mediator)
    {
        _mediator = mediator;
    }

    [HttpGet("stream")]
    public async IAsyncEnumerable<Product> GetProductsStream(
        [FromQuery] string categoryId,
        CancellationToken cancellationToken)
    {
        var query = new GetProductsStreamQuery(categoryId);

        await foreach (var product in _mediator.Invoke<IAsyncEnumerable<Product>>(query, cancellationToken))
        {
            yield return product;
        }
    }
}

Processing with LINQ

csharp
public async Task ProcessProductsAsync()
{
    var query = new GetProductsStreamQuery("electronics");

    await foreach (var product in _mediator.Invoke<IAsyncEnumerable<Product>>(query)
        .Where(p => p.Price > 100)
        .Take(50))
    {
        await ProcessProductAsync(product);
    }
}

Real-World Examples

Large Dataset Processing

csharp
public record GetOrdersStreamQuery(DateTime StartDate, DateTime EndDate, int BatchSize = 100);

public class OrderStreamHandler
{
    public static async IAsyncEnumerable<OrderSummary> Handle(
        GetOrdersStreamQuery query,
        IOrderRepository repository,
        ILogger<OrderStreamHandler> logger,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        logger.LogInformation("Starting order stream for period {Start} to {End}",
            query.StartDate, query.EndDate);

        var totalProcessed = 0;

        await foreach (var batch in repository.GetOrderBatchesAsync(
            query.StartDate, query.EndDate, query.BatchSize, cancellationToken))
        {
            foreach (var order in batch)
            {
                var summary = new OrderSummary
                {
                    Id = order.Id,
                    CustomerEmail = order.CustomerEmail,
                    Total = order.Total,
                    Status = order.Status,
                    ProcessedAt = DateTime.UtcNow
                };

                totalProcessed++;

                if (totalProcessed % 1000 == 0)
                {
                    logger.LogInformation("Processed {Count} orders", totalProcessed);
                }

                yield return summary;
            }
        }

        logger.LogInformation("Completed order stream. Total processed: {Total}", totalProcessed);
    }
}

Real-Time Data Feed

csharp
public record GetLiveStockPricesQuery(string[] Symbols);

public class StockPriceStreamHandler
{
    public static async IAsyncEnumerable<StockPrice> Handle(
        GetLiveStockPricesQuery query,
        IStockPriceService stockService,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        // Subscribe to real-time stock price updates
        await foreach (var priceUpdate in stockService.SubscribeToSymbols(query.Symbols, cancellationToken))
        {
            var stockPrice = new StockPrice
            {
                Symbol = priceUpdate.Symbol,
                Price = priceUpdate.CurrentPrice,
                Change = priceUpdate.PriceChange,
                Volume = priceUpdate.Volume,
                Timestamp = priceUpdate.Timestamp
            };

            yield return stockPrice;
        }
    }
}

File Processing

csharp
public record ProcessCsvFileQuery(string FilePath);

public class CsvProcessorHandler
{
    public static async IAsyncEnumerable<CustomerRecord> Handle(
        ProcessCsvFileQuery query,
        ICsvParser csvParser,
        IValidator<CustomerRecord> validator,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        await foreach (var line in csvParser.ReadLinesAsync(query.FilePath, cancellationToken))
        {
            if (csvParser.TryParseCustomer(line, out var customer))
            {
                var validationResult = await validator.ValidateAsync(customer, cancellationToken);

                if (validationResult.IsValid)
                {
                    yield return customer;
                }
                else
                {
                    // Could yield error records or log validation failures
                    await LogValidationErrorAsync(line, validationResult.Errors);
                }
            }
        }
    }
}

Advanced Streaming Patterns

Streaming with Transformation

csharp
public class DataTransformStreamHandler
{
    public static async IAsyncEnumerable<ProcessedData> Handle(
        TransformDataStreamQuery query,
        IDataSource dataSource,
        ITransformationService transformer,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        await foreach (var rawData in dataSource.GetDataStreamAsync(query.Filter, cancellationToken))
        {
            // Apply transformations
            var transformed = await transformer.TransformAsync(rawData, cancellationToken);

            // Apply business rules
            if (transformer.MeetsBusinessCriteria(transformed))
            {
                yield return transformed;
            }
        }
    }
}

Streaming with Aggregation

csharp
public class SalesReportStreamHandler
{
    public static async IAsyncEnumerable<SalesMetrics> Handle(
        GenerateSalesReportQuery query,
        ISalesRepository repository,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        var currentPeriod = query.StartDate;

        while (currentPeriod <= query.EndDate)
        {
            var endPeriod = currentPeriod.AddDays(query.PeriodDays);

            var sales = await repository.GetSalesForPeriodAsync(currentPeriod, endPeriod, cancellationToken);

            var metrics = new SalesMetrics
            {
                Period = currentPeriod,
                TotalSales = sales.Sum(s => s.Amount),
                OrderCount = sales.Count(),
                AverageOrderValue = sales.Average(s => s.Amount),
                TopProduct = sales.GroupBy(s => s.ProductId)
                    .OrderByDescending(g => g.Sum(s => s.Amount))
                    .First().Key
            };

            yield return metrics;
            currentPeriod = endPeriod;
        }
    }
}

Conditional Streaming

csharp
public class ConditionalStreamHandler
{
    public static async IAsyncEnumerable<ProcessedItem> Handle(
        ProcessItemsQuery query,
        IItemRepository repository,
        IBusinessRuleEngine ruleEngine,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        await foreach (var item in repository.GetItemsAsync(query.Filter, cancellationToken))
        {
            // Apply business rules to determine if item should be processed
            var ruleResult = await ruleEngine.EvaluateAsync(item, cancellationToken);

            if (ruleResult.ShouldProcess)
            {
                var processed = new ProcessedItem
                {
                    Id = item.Id,
                    Data = item.Data,
                    ProcessingRules = ruleResult.AppliedRules,
                    ProcessedAt = DateTime.UtcNow
                };

                // Additional conditional logic
                if (ruleResult.RequiresEnrichment)
                {
                    processed = await EnrichItemAsync(processed, cancellationToken);
                }

                yield return processed;
            }
        }
    }
}

Error Handling in Streams

Graceful Error Recovery

csharp
public class RobustStreamHandler
{
    public static async IAsyncEnumerable<Result<ProcessedData>> Handle(
        ProcessDataStreamQuery query,
        IDataProcessor processor,
        ILogger<RobustStreamHandler> logger,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        await foreach (var item in GetDataStreamAsync(query, cancellationToken))
        {
            Result<ProcessedData> result;

            try
            {
                var processed = await processor.ProcessAsync(item, cancellationToken);
                result = Result<ProcessedData>.Success(processed);
            }
            catch (ProcessingException ex)
            {
                logger.LogWarning(ex, "Failed to process item {ItemId}", item.Id);
                result = Result<ProcessedData>.Failed($"Processing failed: {ex.Message}");
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "Unexpected error processing item {ItemId}", item.Id);
                result = Result<ProcessedData>.Failed("Unexpected processing error");
            }

            yield return result;
        }
    }
}

Circuit Breaker Pattern

csharp
public class CircuitBreakerStreamHandler
{
    private static int _consecutiveFailures = 0;
    private const int MaxFailures = 5;

    public static async IAsyncEnumerable<ProcessedData> Handle(
        ProcessStreamQuery query,
        IExternalService externalService,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        await foreach (var item in GetItemsAsync(query, cancellationToken))
        {
            if (_consecutiveFailures >= MaxFailures)
            {
                throw new InvalidOperationException("Circuit breaker is open due to consecutive failures");
            }

            try
            {
                var result = await externalService.ProcessAsync(item, cancellationToken);
                _consecutiveFailures = 0; // Reset on success
                yield return result;
            }
            catch (Exception)
            {
                _consecutiveFailures++;
                throw;
            }
        }
    }
}

Performance Considerations

Buffering for Better Performance

csharp
public class BufferedStreamHandler
{
    public static async IAsyncEnumerable<ProcessedBatch> Handle(
        ProcessBatchStreamQuery query,
        IDataSource dataSource,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        var buffer = new List<DataItem>(query.BatchSize);

        await foreach (var item in dataSource.GetDataAsync(cancellationToken))
        {
            buffer.Add(item);

            if (buffer.Count >= query.BatchSize)
            {
                var batch = await ProcessBatchAsync(buffer, cancellationToken);
                yield return batch;
                buffer.Clear();
            }
        }

        // Process remaining items
        if (buffer.Count > 0)
        {
            var finalBatch = await ProcessBatchAsync(buffer, cancellationToken);
            yield return finalBatch;
        }
    }
}

Memory-Efficient Processing

csharp
public class MemoryEfficientHandler
{
    public static async IAsyncEnumerable<string> Handle(
        ProcessLargeFileQuery query,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        using var fileStream = new FileStream(query.FilePath, FileMode.Open, FileAccess.Read);
        using var reader = new StreamReader(fileStream);

        string? line;
        while ((line = await reader.ReadLineAsync()) != null)
        {
            if (cancellationToken.IsCancellationRequested)
                yield break;

            // Process line without loading entire file into memory
            var processed = ProcessLine(line);

            if (!string.IsNullOrEmpty(processed))
                yield return processed;
        }
    }
}

Integration with ASP.NET Core

Streaming API Endpoints

csharp
[ApiController]
[Route("api/[controller]")]
public class DataController : ControllerBase
{
    private readonly IMediator _mediator;

    public DataController(IMediator mediator)
    {
        _mediator = mediator;
    }

    [HttpGet("stream")]
    public async IAsyncEnumerable<DataItem> GetDataStream(
        [FromQuery] string filter,
        CancellationToken cancellationToken)
    {
        var query = new GetDataStreamQuery(filter);

    await foreach (var item in _mediator.Invoke<IAsyncEnumerable<DataItem>>(query, cancellationToken))
        {
            yield return item;
        }
    }

    [HttpGet("csv")]
    public async Task<IActionResult> ExportToCsv(
        [FromQuery] string filter,
        CancellationToken cancellationToken)
    {
        var query = new GetDataStreamQuery(filter);

        Response.Headers.Add("Content-Type", "text/csv");
        Response.Headers.Add("Content-Disposition", "attachment; filename=data.csv");

    await foreach (var item in _mediator.Invoke<IAsyncEnumerable<DataItem>>(query, cancellationToken))
        {
            var csv = $"{item.Id},{item.Name},{item.Value}\n";
            await Response.WriteAsync(csv, cancellationToken);
        }

        return new EmptyResult();
    }
}

SignalR Integration

csharp
public class LiveDataHub : Hub
{
    private readonly IMediator _mediator;

    public LiveDataHub(IMediator mediator)
    {
        _mediator = mediator;
    }

    public async Task SubscribeToData(string filter)
    {
        var query = new GetLiveDataStreamQuery(filter);

    await foreach (var data in _mediator.Invoke<IAsyncEnumerable<LiveData>>(query, Context.ConnectionAborted))
        {
            await Clients.Caller.SendAsync("DataUpdate", data, Context.ConnectionAborted);
        }
    }
}

Best Practices

1. Always Use CancellationToken

csharp
public static async IAsyncEnumerable<T> Handle(
    StreamQuery query,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    // Always check for cancellation
    if (cancellationToken.IsCancellationRequested)
        yield break;

    // Pass cancellation token to async operations
    await foreach (var item in source.GetItemsAsync(cancellationToken))
    {
        yield return item;
    }
}

2. Handle Backpressure

csharp
public static async IAsyncEnumerable<T> Handle(
    StreamQuery query,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    using var semaphore = new SemaphoreSlim(Environment.ProcessorCount);

    await foreach (var item in source.GetItemsAsync(cancellationToken))
    {
        await semaphore.WaitAsync(cancellationToken);

        try
        {
            var processed = await ProcessItemAsync(item, cancellationToken);
            yield return processed;
        }
        finally
        {
            semaphore.Release();
        }
    }
}

3. Provide Progress Reporting

csharp
public static async IAsyncEnumerable<ProcessingResult<T>> Handle(
    StreamQuery query,
    IProgress<ProcessingProgress>? progress = null,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var totalItems = await GetTotalItemCountAsync(query);
    var processedCount = 0;

    await foreach (var item in source.GetItemsAsync(cancellationToken))
    {
        var result = await ProcessItemAsync(item, cancellationToken);
        processedCount++;

        progress?.Report(new ProcessingProgress(processedCount, totalItems));

        yield return new ProcessingResult<T>(result, processedCount, totalItems);
    }
}

4. Implement Proper Cleanup

csharp
public static async IAsyncEnumerable<T> Handle(
    StreamQuery query,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    IDisposable? resource = null;

    try
    {
        resource = await AcquireResourceAsync();

        await foreach (var item in ProcessWithResourceAsync(resource, query, cancellationToken))
        {
            yield return item;
        }
    }
    finally
    {
        resource?.Dispose();
    }
}

Streaming handlers are powerful for processing large datasets, real-time data feeds, and scenarios where you need to return results incrementally. They provide excellent memory efficiency and allow for responsive, scalable applications.

Released under the MIT License.