суббота, 11 октября 2025 г.

25.10.11, DeepSeek, EventHub, Code,

25.10.11, DeepSeek, EventHub, Code,

F:\Projects\VS\QwenSln\GS26.EventHub.sln

F:\Projects\VS\QwenSln\EventHub083\EventHub083.csproj

F:\Projects\VS\QwenSln\CaEventHub083\CaEventHub083.csproj

using System;

using System.Collections.Concurrent;

using System.Collections.Generic;

using System.Diagnostics;

using System.Diagnostics.Metrics;

using System.Linq;

using System.Threading;

using System.Threading.Channels;

using System.Threading.Tasks;


// =============================================

// ЛОГГИРОВАНИЕ

// =============================================


public interface ILogger

{

    void LogTrace(string message);

    void LogDebug(string message);

    void LogInformation(string message);

    void LogWarning(string message);

    void LogError(string message);

    void LogCritical(string message);

}


public class ConsoleLogger : ILogger

{

    private readonly string _categoryName;


    public ConsoleLogger(string categoryName = "EventHub")

    {

        _categoryName = categoryName;

    }


    public void LogTrace(string message) => Write("TRACE", message);

    public void LogDebug(string message) => Write("DEBUG", message);

    public void LogInformation(string message) => Write("INFO", message);

    public void LogWarning(string message) => Write("WARN", message);

    public void LogError(string message) => Write("ERROR", message);

    public void LogCritical(string message) => Write("CRITICAL", message);


    private void Write(string level, string message)

    {

        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} [{level}] {_categoryName}: {message}");

    }

}


// =============================================

// МЕТРИКИ И МОНИТОРИНГ

// =============================================


public interface IMetricsRecorder

{

    void RecordMessageEnqueued(string processorId);

    void RecordMessageProcessed(string processorId, bool success, long processingTimeMs);

    void RecordMessageDropped(string processorId, string reason);

    void RecordRetryAttempt(string processorId, int attempt);

    void RecordQueueLength(string processorId, int length);

}


public class MetricsRecorder : IMetricsRecorder, IDisposable

{

    private readonly Meter _meter;

    private readonly Counter<long> _messagesEnqueued;

    private readonly Counter<long> _messagesProcessed;

    private readonly Counter<long> _messagesDropped;

    private readonly Counter<long> _retryAttempts;

    private readonly Histogram<long> _processingTime;

    private readonly ObservableGauge<int> _queueLengths;


    private readonly ConcurrentDictionary<string, int> _currentQueueLengths = new();


    public MetricsRecorder(string meterName = "EventHub")

    {

        _meter = new Meter(meterName, "1.0.0");


        _messagesEnqueued = _meter.CreateCounter<long>("messages.enqueued");

        _messagesProcessed = _meter.CreateCounter<long>("messages.processed");

        _messagesDropped = _meter.CreateCounter<long>("messages.dropped");

        _retryAttempts = _meter.CreateCounter<long>("retry.attempts");

        _processingTime = _meter.CreateHistogram<long>("processing.time.ms");


        _queueLengths = _meter.CreateObservableGauge<int>("queue.length", () =>

            _currentQueueLengths.Select(kvp => new Measurement<int>(kvp.Value, new KeyValuePair<string, object?>("processor", kvp.Key))));

    }


    public void RecordMessageEnqueued(string processorId)

        => _messagesEnqueued.Add(1, new KeyValuePair<string, object?>("processor", processorId));


    public void RecordMessageProcessed(string processorId, bool success, long processingTimeMs)

    {

        _messagesProcessed.Add(1, new KeyValuePair<string, object?>("processor", processorId),

            new KeyValuePair<string, object?>("success", success));


        _processingTime.Record(processingTimeMs, new KeyValuePair<string, object?>("processor", processorId));

    }


    public void RecordMessageDropped(string processorId, string reason)

        => _messagesDropped.Add(1, new KeyValuePair<string, object?>("processor", processorId),

            new KeyValuePair<string, object?>("reason", reason));


    public void RecordRetryAttempt(string processorId, int attempt)

        => _retryAttempts.Add(1, new KeyValuePair<string, object?>("processor", processorId),

            new KeyValuePair<string, object?>("attempt", attempt));


    public void RecordQueueLength(string processorId, int length)

        => _currentQueueLengths.AddOrUpdate(processorId, length, (_, __) => length);


    public void Dispose() => _meter?.Dispose();

}


// =============================================

// ПОЛИТИКИ ПОВТОРНЫХ ПОПЫТОК И ОШИБОК

// =============================================


public enum ErrorHandlingPolicy

{

    Ignore,

    Retry,

    DeadLetter

}


public class ProcessingOptions

{

    public int MaxRetries { get; set; } = 3;

    public TimeSpan RetryDelay { get; set; } = TimeSpan.FromMilliseconds(100);

    public bool UseExponentialBackoff { get; set; } = true;

    public ErrorHandlingPolicy ErrorPolicy { get; set; } = ErrorHandlingPolicy.Retry;

    public int ChannelCapacity { get; set; } = 100;

    public TimeSpan? MessageTimeToLive { get; set; } = null;

    public TimeSpan ShutdownTimeout { get; set; } = TimeSpan.FromSeconds(10);

}


// =============================================

// ОСНОВНЫЕ ТИПЫ ДАННЫХ

// =============================================


public class MessageEnvelope<TInput> where TInput : notnull

{

    public string Key { get; }

    public TInput Payload { get; }

    public string? TargetId { get; }

    public DateTime CreatedAt { get; } = DateTime.UtcNow;

    public int RetryCount { get; set; } = 0;

    public string MessageId { get; } = Guid.NewGuid().ToString();


    public MessageEnvelope(string key, TInput payload, string? targetId = null)

    {

        Key = key ?? throw new ArgumentNullException(nameof(key));

        Payload = payload ?? throw new ArgumentNullException(nameof(payload));

        TargetId = targetId;

    }


    public bool IsExpired(TimeSpan? timeToLive)

    {

        if (!timeToLive.HasValue) return false;

        return DateTime.UtcNow - CreatedAt > timeToLive.Value;

    }

}


// =============================================

// ИНТЕРФЕЙСЫ ОСНОВНЫХ КОМПОНЕНТОВ

// =============================================


public interface IInputProcessor<TInput> : IDisposable

{

    Task<bool> EnqueueAsync(TInput input, CancellationToken cancellationToken = default);

    Task StartAsync(CancellationToken cancellationToken = default);

    Task<bool> StopAsync(TimeSpan timeout);

    string Id { get; }

    ProcessingMetrics GetMetrics();

    Task WhenIdle();

}


public interface IMessageHandlerModule<TInput>

{

    void RegisterHandler(string key, Func<TInput, Task> handler);

    Task<bool> TryHandleAsync(MessageEnvelope<TInput> envelope, CancellationToken cancellationToken = default);

}


public interface ISubscriber<TInput>

{

    string Id { get; }

    IInputProcessor<MessageEnvelope<TInput>> InputProcessor { get; }

    Task StartAsync(CancellationToken cancellationToken = default);

    Task<bool> StopAsync(TimeSpan timeout);

    Task WhenIdle();

}


// =============================================

// ОСНОВНЫЕ КОМПОНЕНТЫ С ОБРАБОТКОЙ ОШИБОК И МЕТРИКАМИ

// =============================================


public class ProcessingMetrics

{

    public long MessagesProcessed { get; set; }

    public long MessagesFailed { get; set; }

    public long MessagesDropped { get; set; }

    public long RetryAttempts { get; set; }

    public int CurrentQueueLength { get; set; }

    public double AverageProcessingTimeMs { get; set; }

    public bool IsProcessing { get; set; }

}


public class InputProcessor<TInput> : IInputProcessor<TInput>

{

    private readonly Channel<TInput> _channel;

    private readonly CancellationTokenSource _cancellationTokenSource;

    private readonly string _id;

    private readonly Func<TInput, Task> _processAction;

    private readonly ILogger _logger;

    private readonly IMetricsRecorder _metrics;

    private readonly ProcessingOptions _options;

    private Task? _processingTask;

    private readonly ProcessingMetrics _processingMetrics = new();

    private readonly Stopwatch _stopwatch = new();

    private long _totalProcessingTimeTicks = 0;

    private readonly SemaphoreSlim _processingLock = new(1, 1);

    private int _activeProcessingCount = 0;


    public InputProcessor(

        string id,

        Func<TInput, Task> processAction,

        ILogger logger,

        IMetricsRecorder metrics,

        ProcessingOptions? options = null)

    {

        _id = id ?? throw new ArgumentNullException(nameof(id));

        _processAction = processAction ?? throw new ArgumentNullException(nameof(processAction));

        _logger = logger ?? throw new ArgumentNullException(nameof(logger));

        _metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));

        _options = options ?? new ProcessingOptions();

        _cancellationTokenSource = new CancellationTokenSource();


        var channelOptions = new BoundedChannelOptions(_options.ChannelCapacity)

        {

            FullMode = BoundedChannelFullMode.Wait,

            SingleReader = true,

            SingleWriter = false

        };


        _channel = Channel.CreateBounded<TInput>(channelOptions);

        _stopwatch.Start();

    }


    public string Id => _id;


    public async Task<bool> EnqueueAsync(TInput input, CancellationToken cancellationToken = default)

    {

        if (input == null) throw new ArgumentNullException(nameof(input));


        try

        {

            await _channel.Writer.WriteAsync(input, cancellationToken).ConfigureAwait(false);

            _metrics.RecordMessageEnqueued(_id);

            UpdateQueueMetrics();

            return true;

        }

        catch (ChannelClosedException)

        {

            _logger.LogError($"Failed to enqueue message: Channel is closed for processor '{_id}'");

            _metrics.RecordMessageDropped(_id, "ChannelClosed");

            return false;

        }

        catch (OperationCanceledException)

        {

            _logger.LogDebug($"Enqueue operation was canceled for processor '{_id}'");

            return false;

        }

        catch (Exception ex)

        {

            _logger.LogError($"Unexpected error while enqueuing message for processor '{_id}': {ex.Message}");

            _metrics.RecordMessageDropped(_id, "UnexpectedError");

            return false;

        }

    }


    public Task StartAsync(CancellationToken cancellationToken = default)

    {

        if (_processingTask != null)

            throw new InvalidOperationException($"Processor '{_id}' is already running.");


        _processingTask = ProcessLoopAsync();

        _logger.LogInformation($"Processor '{_id}' started");

        return Task.CompletedTask;

    }


    public async Task<bool> StopAsync(TimeSpan timeout)

    {

        if (_processingTask == null) return true;


        _logger.LogInformation($"Stopping processor '{_id}'...");


        // Даем время на завершение текущей обработки

        await WhenIdle().ConfigureAwait(false);


        // Правильный порядок: сначала завершаем канал, потом отменяем

        _channel.Writer.Complete();

        _cancellationTokenSource.Cancel();


        try

        {

            using var timeoutCts = new CancellationTokenSource(timeout);

            var completionTask = _processingTask;

            var timeoutTask = Task.Delay(Timeout.Infinite, timeoutCts.Token);


            var completedTask = await Task.WhenAny(completionTask, timeoutTask).ConfigureAwait(false);


            if (completedTask == completionTask)

            {

                await completionTask.ConfigureAwait(false); // Проверяем исключения

                _processingTask = null;

                _logger.LogInformation($"Processor '{_id}' stopped successfully");

                return true;

            }

            else

            {

                _logger.LogWarning($"Processor '{_id}' stop timeout after {timeout.TotalSeconds}s");

                _processingTask = null;

                return false;

            }

        }

        catch (OperationCanceledException)

        {

            _processingTask = null;

            _logger.LogInformation($"Processor '{_id}' stopped (operation canceled)");

            return true;

        }

        catch (Exception ex)

        {

            _processingTask = null;

            _logger.LogError($"Error while stopping processor '{_id}': {ex.Message}");

            return false;

        }

    }


    public async Task WhenIdle()

    {

        while (true)

        {

            var queueLength = _channel.Reader.Count;

            var isProcessing = _processingMetrics.IsProcessing;


            if (queueLength == 0 && !isProcessing && _activeProcessingCount == 0)

            {

                break;

            }


            _logger.LogDebug($"Waiting for processor '{_id}' to become idle. Queue: {queueLength}, Processing: {isProcessing}, Active: {_activeProcessingCount}");

            await Task.Delay(10).ConfigureAwait(false);

        }

    }


    private async Task ProcessLoopAsync()

    {

        var token = _cancellationTokenSource.Token;


        try

        {

            await foreach (var input in _channel.Reader.ReadAllAsync(token).ConfigureAwait(false))

            {

                if (token.IsCancellationRequested)

                    break;


                if (input is MessageEnvelope<TInput> envelope && envelope.IsExpired(_options.MessageTimeToLive))

                {

                    _logger.LogWarning($"Message expired and will be skipped for processor '{_id}'");

                    _metrics.RecordMessageDropped(_id, "Expired");

                    _processingMetrics.MessagesDropped++;

                    continue;

                }


                await ProcessWithRetryAsync(input, token).ConfigureAwait(false);

                UpdateQueueMetrics();

            }

        }

        catch (OperationCanceledException)

        {

            // Ожидаемое исключение при остановке

        }

        catch (Exception ex)

        {

            _logger.LogCritical($"Fatal error in processing loop for processor '{_id}': {ex.Message}");

        }

    }


    private async Task ProcessWithRetryAsync(TInput input, CancellationToken cancellationToken)

    {

        Interlocked.Increment(ref _activeProcessingCount);

        _processingMetrics.IsProcessing = true;


        try

        {

            var startTime = Stopwatch.GetTimestamp();

            var success = false;

            Exception? lastException = null;


            for (int attempt = 0; attempt <= _options.MaxRetries && !cancellationToken.IsCancellationRequested; attempt++)

            {

                try

                {

                    if (attempt > 0)

                    {

                        _processingMetrics.RetryAttempts++;

                        _metrics.RecordRetryAttempt(_id, attempt);


                        var delay = _options.UseExponentialBackoff

                            ? TimeSpan.FromMilliseconds(_options.RetryDelay.TotalMilliseconds * Math.Pow(2, attempt - 1))

                            : _options.RetryDelay;


                        _logger.LogDebug($"Retry attempt {attempt} for processor '{_id}' after {delay.TotalMilliseconds}ms");


                        await Task.Delay(delay, cancellationToken).ConfigureAwait(false);

                    }


                    await _processAction(input).ConfigureAwait(false);

                    success = true;

                    break;

                }

                catch (Exception ex) when (ex is not OperationCanceledException)

                {

                    lastException = ex;

                    _logger.LogWarning($"Processing attempt {attempt} failed for processor '{_id}': {ex.Message}");


                    if (cancellationToken.IsCancellationRequested)

                        break;

                }

            }


            var processingTimeMs = (Stopwatch.GetTimestamp() - startTime) * 1000.0 / Stopwatch.Frequency;

            _totalProcessingTimeTicks += (long)(processingTimeMs * TimeSpan.TicksPerMillisecond);


            if (success)

            {

                _processingMetrics.MessagesProcessed++;

                _logger.LogDebug($"Message processed successfully by processor '{_id}' in {processingTimeMs:F2}ms");

            }

            else if (!cancellationToken.IsCancellationRequested)

            {

                _processingMetrics.MessagesFailed++;

                _logger.LogError($"Message processing failed after {_options.MaxRetries} attempts for processor '{_id}': {lastException?.Message}");


                if (_options.ErrorPolicy == ErrorHandlingPolicy.DeadLetter)

                {

                    _logger.LogWarning($"Message moved to dead letter queue for processor '{_id}'");

                }

            }


            _processingMetrics.AverageProcessingTimeMs =

                _processingMetrics.MessagesProcessed > 0

                    ? (_totalProcessingTimeTicks / TimeSpan.TicksPerMillisecond) / _processingMetrics.MessagesProcessed

                    : 0;


            if (!cancellationToken.IsCancellationRequested)

            {

                _metrics.RecordMessageProcessed(_id, success, (long)processingTimeMs);

            }

        }

        finally

        {

            Interlocked.Decrement(ref _activeProcessingCount);

            _processingMetrics.IsProcessing = false;

        }

    }


    private void UpdateQueueMetrics()

    {

        _processingMetrics.CurrentQueueLength = _channel.Reader.Count;

        _metrics.RecordQueueLength(_id, _processingMetrics.CurrentQueueLength);

    }


    public ProcessingMetrics GetMetrics()

    {

        lock (_processingMetrics)

        {

            return new ProcessingMetrics

            {

                MessagesProcessed = _processingMetrics.MessagesProcessed,

                MessagesFailed = _processingMetrics.MessagesFailed,

                MessagesDropped = _processingMetrics.MessagesDropped,

                RetryAttempts = _processingMetrics.RetryAttempts,

                CurrentQueueLength = _channel.Reader.Count,

                AverageProcessingTimeMs = _processingMetrics.AverageProcessingTimeMs,

                IsProcessing = _processingMetrics.IsProcessing

            };

        }

    }


    public void Dispose()

    {

        _cancellationTokenSource?.Cancel();

        _channel.Writer.Complete();

        _cancellationTokenSource?.Dispose();

        _processingLock?.Dispose();

        _stopwatch.Stop();

    }

}


// =============================================

// МОДУЛИ ОБРАБОТКИ СООБЩЕНИЙ

// =============================================


public class SelfHandlerModule<TInput> : IMessageHandlerModule<TInput>

{

    private readonly ConcurrentDictionary<string, Func<TInput, Task>> _handlers;

    private readonly ILogger _logger;


    public SelfHandlerModule(ILogger logger)

    {

        _handlers = new ConcurrentDictionary<string, Func<TInput, Task>>(StringComparer.OrdinalIgnoreCase);

        _logger = logger ?? throw new ArgumentNullException(nameof(logger));

    }


    public void RegisterHandler(string key, Func<TInput, Task> handler)

    {

        if (string.IsNullOrWhiteSpace(key))

            throw new ArgumentException("Key cannot be null or empty", nameof(key));

        if (handler == null)

            throw new ArgumentNullException(nameof(handler));


        _handlers[key] = handler;

        _logger.LogDebug($"Handler registered for key '{key}'");

    }


    public async Task<bool> TryHandleAsync(MessageEnvelope<TInput> envelope, CancellationToken cancellationToken = default)

    {

        if (_handlers.TryGetValue(envelope.Key, out var handler))

        {

            try

            {

                await handler(envelope.Payload).ConfigureAwait(false);

                return true;

            }

            catch (Exception ex)

            {

                _logger.LogError($"Error executing handler for key '{envelope.Key}': {ex.Message}");

                throw;

            }

        }


        _logger.LogWarning($"No handler found for key '{envelope.Key}'");

        return false;

    }

}


public class SubscriberHandlerModule<TInput>

{

    private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, ISubscriber<TInput>>> _subscribersByKey;

    private readonly ILogger _logger;

    private readonly IMetricsRecorder _metrics;


    public SubscriberHandlerModule(ILogger logger, IMetricsRecorder metrics)

    {

        _subscribersByKey = new ConcurrentDictionary<string, ConcurrentDictionary<string, ISubscriber<TInput>>>(StringComparer.OrdinalIgnoreCase);

        _logger = logger ?? throw new ArgumentNullException(nameof(logger));

        _metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));

    }


    public bool Subscribe(string key, ISubscriber<TInput> subscriber)

    {

        if (string.IsNullOrWhiteSpace(key))

            throw new ArgumentException("Key cannot be null or empty", nameof(key));

        if (subscriber == null)

            throw new ArgumentNullException(nameof(subscriber));


        var subscribers = _subscribersByKey.GetOrAdd(key, _ => new ConcurrentDictionary<string, ISubscriber<TInput>>());


        if (subscribers.TryAdd(subscriber.Id, subscriber))

        {

            _logger.LogInformation($"Subscriber '{subscriber.Id}' subscribed to key '{key}'");

            return true;

        }


        _logger.LogWarning($"Subscriber '{subscriber.Id}' is already subscribed to key '{key}'");

        return false;

    }


    public bool Unsubscribe(string key, string subscriberId)

    {

        if (string.IsNullOrWhiteSpace(key))

            throw new ArgumentException("Key cannot be null or empty", nameof(key));

        if (string.IsNullOrWhiteSpace(subscriberId))

            throw new ArgumentException("SubscriberId cannot be null or empty", nameof(subscriberId));


        if (_subscribersByKey.TryGetValue(key, out var subscribers))

        {

            if (subscribers.TryRemove(subscriberId, out _))

            {

                _logger.LogInformation($"Subscriber '{subscriberId}' unsubscribed from key '{key}'");


                // Очищаем пустые коллекции

                if (subscribers.IsEmpty)

                    _subscribersByKey.TryRemove(key, out _);


                return true;

            }

        }


        _logger.LogWarning($"Subscriber '{subscriberId}' was not subscribed to key '{key}'");

        return false;

    }


    public async Task<bool> NotifySubscribersAsync(string key, TInput payload, CancellationToken cancellationToken = default)

    {

        if (_subscribersByKey.TryGetValue(key, out var subscribers))

        {

            // Создаем снимок коллекции для безопасной итерации

            var subscribersSnapshot = subscribers.Values.ToArray();

            var envelope = new MessageEnvelope<TInput>(key, payload);


            var tasks = subscribersSnapshot.Select(async subscriber =>

            {

                try

                {

                    var enqueued = await subscriber.InputProcessor.EnqueueAsync(envelope, cancellationToken).ConfigureAwait(false);

                    if (!enqueued)

                    {

                        _logger.LogWarning($"Failed to enqueue message to subscriber '{subscriber.Id}'");

                        _metrics.RecordMessageDropped(subscriber.Id, "EnqueueFailed");

                    }

                }

                catch (Exception ex) when (ex is not OperationCanceledException)

                {

                    _logger.LogError($"Failed to notify subscriber '{subscriber.Id}': {ex.Message}");

                    _metrics.RecordMessageDropped(subscriber.Id, "NotificationFailed");

                }

            });


            await Task.WhenAll(tasks).ConfigureAwait(false);

            _logger.LogDebug($"Notified {subscribersSnapshot.Length} subscribers for key '{key}'");

            return subscribersSnapshot.Length > 0;

        }


        _logger.LogDebug($"No subscribers found for key '{key}'");

        return false;

    }


    public IReadOnlyCollection<string> GetSubscribedKeys() => _subscribersByKey.Keys.ToArray();

}


// =============================================

// ОСНОВНЫЕ СЕРВИСЫ

// =============================================


public class MessageProcessor<TInput> : IDisposable

{

    private readonly IInputProcessor<MessageEnvelope<TInput>> _processor;

    private readonly ILogger _logger;


    public MessageProcessor(

        string id,

        Func<MessageEnvelope<TInput>, Task> handler,

        ILogger logger,

        IMetricsRecorder metrics,

        ProcessingOptions? options = null)

    {

        _logger = logger ?? throw new ArgumentNullException(nameof(logger));

        _processor = new InputProcessor<MessageEnvelope<TInput>>(id, handler, logger, metrics, options);

    }


    public Task StartAsync(CancellationToken cancellationToken = default) => _processor.StartAsync(cancellationToken);

    public Task<bool> StopAsync(TimeSpan timeout) => _processor.StopAsync(timeout);

    public Task<bool> EnqueueAsync(MessageEnvelope<TInput> envelope, CancellationToken cancellationToken = default)

        => _processor.EnqueueAsync(envelope, cancellationToken);


    public ProcessingMetrics GetMetrics() => _processor.GetMetrics();

    public Task WhenIdle() => _processor.WhenIdle();

    public void Dispose() => _processor?.Dispose();

}


public class RegularService<TInput> : IDisposable where TInput : notnull

{

    private readonly MessageProcessor<TInput> _messageProcessor;

    private readonly SelfHandlerModule<TInput> _handlerModule;

    private readonly ILogger _logger;


    public RegularService(

        string id,

        ILogger logger,

        IMetricsRecorder metrics,

        ProcessingOptions? processingOptions = null)

    {

        _logger = logger ?? throw new ArgumentNullException(nameof(logger));

        _handlerModule = new SelfHandlerModule<TInput>(logger);

        _messageProcessor = new MessageProcessor<TInput>($"{id}-InputProcessor", ProcessMessageAsync, logger, metrics, processingOptions);

    }


    private async Task ProcessMessageAsync(MessageEnvelope<TInput> envelope)

    {

        await _handlerModule.TryHandleAsync(envelope).ConfigureAwait(false);

    }


    public void RegisterHandler(string key, Func<TInput, Task> handler)

    {

        _handlerModule.RegisterHandler(key, handler);

    }


    public async Task<bool> SendMessageAsync(string key, TInput payload, CancellationToken cancellationToken = default)

    {

        var envelope = new MessageEnvelope<TInput>(key, payload);

        return await _messageProcessor.EnqueueAsync(envelope, cancellationToken).ConfigureAwait(false);

    }


    public Task StartAsync(CancellationToken cancellationToken = default) => _messageProcessor.StartAsync(cancellationToken);

    public Task<bool> StopAsync(TimeSpan timeout) => _messageProcessor.StopAsync(timeout);

    public ProcessingMetrics GetMetrics() => _messageProcessor.GetMetrics();

    public Task WhenIdle() => _messageProcessor.WhenIdle();

    public void Dispose() => _messageProcessor?.Dispose();

}


public class EventHub<TInput> : IDisposable where TInput : notnull

{

    private readonly SubscriberHandlerModule<TInput> _subscriberModule;

    private readonly ILogger _logger;

    private readonly IMetricsRecorder _metrics;

    private readonly ProcessingOptions _options;


    public EventHub(

        ILogger logger,

        IMetricsRecorder metrics,

        ProcessingOptions? options = null)

    {

        _logger = logger ?? throw new ArgumentNullException(nameof(logger));

        _metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));

        _options = options ?? new ProcessingOptions();

        _subscriberModule = new SubscriberHandlerModule<TInput>(logger, metrics);

    }


    public async Task PublishAsync(string key, TInput @event, CancellationToken cancellationToken = default)

    {

        if (string.IsNullOrWhiteSpace(key))

            throw new ArgumentException("Key cannot be null or empty", nameof(key));


        _logger.LogDebug($"Publishing event '{key}'");


        var hasSubscribers = await _subscriberModule.NotifySubscribersAsync(key, @event, cancellationToken).ConfigureAwait(false);


        if (!hasSubscribers)

        {

            _logger.LogDebug($"Event '{key}' published but no subscribers were notified");

        }

    }


    public bool Subscribe(string key, ISubscriber<TInput> subscriber)

        => _subscriberModule.Subscribe(key, subscriber);


    public bool Unsubscribe(string key, string subscriberId)

        => _subscriberModule.Unsubscribe(key, subscriberId);


    public IReadOnlyCollection<string> GetSubscribedKeys() => _subscriberModule.GetSubscribedKeys();

    public void Dispose() { }

}


public class ServiceSubscriber<TInput> : ISubscriber<TInput>, IDisposable where TInput : notnull

{

    public string Id { get; }

    public IInputProcessor<MessageEnvelope<TInput>> InputProcessor { get; }

    private readonly SelfHandlerModule<TInput> _handlerModule;

    private readonly ILogger _logger;


    public ServiceSubscriber(

        string id,

        ILogger logger,

        IMetricsRecorder metrics,

        Action<SelfHandlerModule<TInput>>? configureHandlers = null,

        ProcessingOptions? processingOptions = null)

    {

        Id = id ?? throw new ArgumentNullException(nameof(id));

        _logger = logger ?? throw new ArgumentNullException(nameof(logger));

        _handlerModule = new SelfHandlerModule<TInput>(logger);

        configureHandlers?.Invoke(_handlerModule);


        InputProcessor = new InputProcessor<MessageEnvelope<TInput>>(

            $"{Id}-InputProcessor",

            async (env) => await _handlerModule.TryHandleAsync(env),

            logger,

            metrics,

            processingOptions);

    }


    public void RegisterHandler(string key, Func<TInput, Task> handler)

        => _handlerModule.RegisterHandler(key, handler);


    public Task StartAsync(CancellationToken cancellationToken = default) => InputProcessor.StartAsync(cancellationToken);

    public Task<bool> StopAsync(TimeSpan timeout) => InputProcessor.StopAsync(timeout);


    // ДОБАВЛЕННЫЙ МЕТОД

    public Task WhenIdle() => InputProcessor.WhenIdle();


    public void Dispose() => (InputProcessor as IDisposable)?.Dispose();

}

------------------------

class Program

{

    const int MyDelayMs = 100;


    static async Task Main(string[] args)

    {

        // Настройка зависимостей

        var logger = new ConsoleLogger("EventHubDemo");

        var metrics = new MetricsRecorder();


        var processingOptions = new ProcessingOptions

        {

            MaxRetries = 3,

            RetryDelay = TimeSpan.FromMilliseconds(50),

            UseExponentialBackoff = true,

            ErrorPolicy = ErrorHandlingPolicy.Retry,

            ChannelCapacity = 100,

            MessageTimeToLive = TimeSpan.FromSeconds(30),

            ShutdownTimeout = TimeSpan.FromSeconds(10)

        };


        try

        {

            await RunDemoAsync(logger, metrics, processingOptions);

        }

        finally

        {

            metrics.Dispose();

        }

    }


    static async Task RunDemoAsync(ILogger logger, IMetricsRecorder metrics, ProcessingOptions options)

    {

        logger.LogInformation("Starting EventHub demo...");


        // 1. Создаем EventHub

        var eventHub = new EventHub<string>(logger, metrics, options);


        // 2. Создаем подписчиков

        var subscriber1 = new ServiceSubscriber<string>("Subscriber1", logger, metrics, module =>

        {

            module.RegisterHandler("greeting", async (payload) =>

            {

                await Task.Delay(MyDelayMs); // Имитация асинхронной работы

                if (payload == "fail")

                    throw new InvalidOperationException("Simulated processing failure");


                logger.LogInformation($"[Subscriber1] Received greeting: {payload}");

            });


            module.RegisterHandler("news", async (payload) =>

            {

                await Task.Delay(5);

                logger.LogInformation($"[Subscriber1] Received news: {payload}");

            });

        }, options);


        var subscriber2 = new ServiceSubscriber<string>("Subscriber2", logger, metrics, module =>

        {

            module.RegisterHandler("greeting", async (payload) =>

            {

                await Task.Delay(15);

                logger.LogInformation($"[Subscriber2] Got greeting: {payload}");

            });

            // subscriber2 не подписывается на "news"

        }, options);


        // 3. Запускаем подписчиков

        await subscriber1.StartAsync();

        await subscriber2.StartAsync();


        // 4. Подписываем их на события в EventHub

        eventHub.Subscribe("greeting", subscriber1);

        eventHub.Subscribe("news", subscriber1);

        eventHub.Subscribe("greeting", subscriber2);


        // 5. Публикуем события

        logger.LogInformation("--- Publishing 'greeting' ---");

        await eventHub.PublishAsync("greeting", "Hello, World!");

        await Task.Delay(100); // Даем время на обработку


        logger.LogInformation("--- Publishing 'news' ---");

        await eventHub.PublishAsync("news", "Breaking: Channels are awesome!");

        await Task.Delay(100); // Даем время на обработку


        logger.LogInformation("--- Publishing 'unknown_event' ---");

        await eventHub.PublishAsync("unknown_event", "This should be ignored.");

        await Task.Delay(100); // Даем время на обработку


        // Тестируем обработку ошибок - ДАЕМ БОЛЬШЕ ВРЕМЕНИ

        logger.LogInformation("--- Testing error handling ---");

        await eventHub.PublishAsync("greeting", "fail");


        // Ждем завершения обработки с ошибкой (все повторные попытки)

        logger.LogInformation("Waiting for error processing to complete...");

        await subscriber1.WhenIdle(); // ТЕПЕРЬ ЭТО РАБОТАЕТ

        await Task.Delay(200); // Дополнительная задержка


        // 6. Отписываем одного подписчика

        logger.LogInformation("--- Unsubscribing Subscriber2 from 'greeting' ---");

        eventHub.Unsubscribe("greeting", "Subscriber2");


        logger.LogInformation("--- Publishing 'greeting' again ---");

        await eventHub.PublishAsync("greeting", "Hello again!");

        await Task.Delay(100); // Даем время на обработку


        // 7. Выводим метрики

        logger.LogInformation("--- Metrics ---");

        var metrics1 = subscriber1.InputProcessor.GetMetrics();

        var metrics2 = subscriber2.InputProcessor.GetMetrics();


        logger.LogInformation($"Subscriber1 - Processed: {metrics1.MessagesProcessed}, Failed: {metrics1.MessagesFailed}, Queue: {metrics1.CurrentQueueLength}");

        logger.LogInformation($"Subscriber2 - Processed: {metrics2.MessagesProcessed}, Failed: {metrics2.MessagesFailed}, Queue: {metrics2.CurrentQueueLength}");


        // 8. Останавливаем подписчиков (graceful shutdown)

        logger.LogInformation("--- Stopping subscribers ---");

        var stopTasks = new List<Task<bool>>

        {

            subscriber1.StopAsync(options.ShutdownTimeout),

            subscriber2.StopAsync(options.ShutdownTimeout)

        };


        var stopResults = await Task.WhenAll(stopTasks);


        for (int i = 0; i < stopResults.Length; i++)

        {

            if (!stopResults[i])

            {

                logger.LogWarning($"Subscriber{i + 1} did not stop gracefully");

            }

        }


        logger.LogInformation("--- EventHub demo finished ---");

    }

}

Комментариев нет:

Отправить комментарий