25.10.11, DeepSeek, EventHub, Code,
F:\Projects\VS\QwenSln\GS26.EventHub.sln
F:\Projects\VS\QwenSln\EventHub083\EventHub083.csproj
F:\Projects\VS\QwenSln\CaEventHub083\CaEventHub083.csproj
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
// =============================================
// ЛОГГИРОВАНИЕ
// =============================================
public interface ILogger
{
void LogTrace(string message);
void LogDebug(string message);
void LogInformation(string message);
void LogWarning(string message);
void LogError(string message);
void LogCritical(string message);
}
public class ConsoleLogger : ILogger
{
private readonly string _categoryName;
public ConsoleLogger(string categoryName = "EventHub")
{
_categoryName = categoryName;
}
public void LogTrace(string message) => Write("TRACE", message);
public void LogDebug(string message) => Write("DEBUG", message);
public void LogInformation(string message) => Write("INFO", message);
public void LogWarning(string message) => Write("WARN", message);
public void LogError(string message) => Write("ERROR", message);
public void LogCritical(string message) => Write("CRITICAL", message);
private void Write(string level, string message)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} [{level}] {_categoryName}: {message}");
}
}
// =============================================
// МЕТРИКИ И МОНИТОРИНГ
// =============================================
public interface IMetricsRecorder
{
void RecordMessageEnqueued(string processorId);
void RecordMessageProcessed(string processorId, bool success, long processingTimeMs);
void RecordMessageDropped(string processorId, string reason);
void RecordRetryAttempt(string processorId, int attempt);
void RecordQueueLength(string processorId, int length);
}
public class MetricsRecorder : IMetricsRecorder, IDisposable
{
private readonly Meter _meter;
private readonly Counter<long> _messagesEnqueued;
private readonly Counter<long> _messagesProcessed;
private readonly Counter<long> _messagesDropped;
private readonly Counter<long> _retryAttempts;
private readonly Histogram<long> _processingTime;
private readonly ObservableGauge<int> _queueLengths;
private readonly ConcurrentDictionary<string, int> _currentQueueLengths = new();
public MetricsRecorder(string meterName = "EventHub")
{
_meter = new Meter(meterName, "1.0.0");
_messagesEnqueued = _meter.CreateCounter<long>("messages.enqueued");
_messagesProcessed = _meter.CreateCounter<long>("messages.processed");
_messagesDropped = _meter.CreateCounter<long>("messages.dropped");
_retryAttempts = _meter.CreateCounter<long>("retry.attempts");
_processingTime = _meter.CreateHistogram<long>("processing.time.ms");
_queueLengths = _meter.CreateObservableGauge<int>("queue.length", () =>
_currentQueueLengths.Select(kvp => new Measurement<int>(kvp.Value, new KeyValuePair<string, object?>("processor", kvp.Key))));
}
public void RecordMessageEnqueued(string processorId)
=> _messagesEnqueued.Add(1, new KeyValuePair<string, object?>("processor", processorId));
public void RecordMessageProcessed(string processorId, bool success, long processingTimeMs)
{
_messagesProcessed.Add(1, new KeyValuePair<string, object?>("processor", processorId),
new KeyValuePair<string, object?>("success", success));
_processingTime.Record(processingTimeMs, new KeyValuePair<string, object?>("processor", processorId));
}
public void RecordMessageDropped(string processorId, string reason)
=> _messagesDropped.Add(1, new KeyValuePair<string, object?>("processor", processorId),
new KeyValuePair<string, object?>("reason", reason));
public void RecordRetryAttempt(string processorId, int attempt)
=> _retryAttempts.Add(1, new KeyValuePair<string, object?>("processor", processorId),
new KeyValuePair<string, object?>("attempt", attempt));
public void RecordQueueLength(string processorId, int length)
=> _currentQueueLengths.AddOrUpdate(processorId, length, (_, __) => length);
public void Dispose() => _meter?.Dispose();
}
// =============================================
// ПОЛИТИКИ ПОВТОРНЫХ ПОПЫТОК И ОШИБОК
// =============================================
public enum ErrorHandlingPolicy
{
Ignore,
Retry,
DeadLetter
}
public class ProcessingOptions
{
public int MaxRetries { get; set; } = 3;
public TimeSpan RetryDelay { get; set; } = TimeSpan.FromMilliseconds(100);
public bool UseExponentialBackoff { get; set; } = true;
public ErrorHandlingPolicy ErrorPolicy { get; set; } = ErrorHandlingPolicy.Retry;
public int ChannelCapacity { get; set; } = 100;
public TimeSpan? MessageTimeToLive { get; set; } = null;
public TimeSpan ShutdownTimeout { get; set; } = TimeSpan.FromSeconds(10);
}
// =============================================
// ОСНОВНЫЕ ТИПЫ ДАННЫХ
// =============================================
public class MessageEnvelope<TInput> where TInput : notnull
{
public string Key { get; }
public TInput Payload { get; }
public string? TargetId { get; }
public DateTime CreatedAt { get; } = DateTime.UtcNow;
public int RetryCount { get; set; } = 0;
public string MessageId { get; } = Guid.NewGuid().ToString();
public MessageEnvelope(string key, TInput payload, string? targetId = null)
{
Key = key ?? throw new ArgumentNullException(nameof(key));
Payload = payload ?? throw new ArgumentNullException(nameof(payload));
TargetId = targetId;
}
public bool IsExpired(TimeSpan? timeToLive)
{
if (!timeToLive.HasValue) return false;
return DateTime.UtcNow - CreatedAt > timeToLive.Value;
}
}
// =============================================
// ИНТЕРФЕЙСЫ ОСНОВНЫХ КОМПОНЕНТОВ
// =============================================
public interface IInputProcessor<TInput> : IDisposable
{
Task<bool> EnqueueAsync(TInput input, CancellationToken cancellationToken = default);
Task StartAsync(CancellationToken cancellationToken = default);
Task<bool> StopAsync(TimeSpan timeout);
string Id { get; }
ProcessingMetrics GetMetrics();
Task WhenIdle();
}
public interface IMessageHandlerModule<TInput>
{
void RegisterHandler(string key, Func<TInput, Task> handler);
Task<bool> TryHandleAsync(MessageEnvelope<TInput> envelope, CancellationToken cancellationToken = default);
}
public interface ISubscriber<TInput>
{
string Id { get; }
IInputProcessor<MessageEnvelope<TInput>> InputProcessor { get; }
Task StartAsync(CancellationToken cancellationToken = default);
Task<bool> StopAsync(TimeSpan timeout);
Task WhenIdle();
}
// =============================================
// ОСНОВНЫЕ КОМПОНЕНТЫ С ОБРАБОТКОЙ ОШИБОК И МЕТРИКАМИ
// =============================================
public class ProcessingMetrics
{
public long MessagesProcessed { get; set; }
public long MessagesFailed { get; set; }
public long MessagesDropped { get; set; }
public long RetryAttempts { get; set; }
public int CurrentQueueLength { get; set; }
public double AverageProcessingTimeMs { get; set; }
public bool IsProcessing { get; set; }
}
public class InputProcessor<TInput> : IInputProcessor<TInput>
{
private readonly Channel<TInput> _channel;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly string _id;
private readonly Func<TInput, Task> _processAction;
private readonly ILogger _logger;
private readonly IMetricsRecorder _metrics;
private readonly ProcessingOptions _options;
private Task? _processingTask;
private readonly ProcessingMetrics _processingMetrics = new();
private readonly Stopwatch _stopwatch = new();
private long _totalProcessingTimeTicks = 0;
private readonly SemaphoreSlim _processingLock = new(1, 1);
private int _activeProcessingCount = 0;
public InputProcessor(
string id,
Func<TInput, Task> processAction,
ILogger logger,
IMetricsRecorder metrics,
ProcessingOptions? options = null)
{
_id = id ?? throw new ArgumentNullException(nameof(id));
_processAction = processAction ?? throw new ArgumentNullException(nameof(processAction));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_options = options ?? new ProcessingOptions();
_cancellationTokenSource = new CancellationTokenSource();
var channelOptions = new BoundedChannelOptions(_options.ChannelCapacity)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = false
};
_channel = Channel.CreateBounded<TInput>(channelOptions);
_stopwatch.Start();
}
public string Id => _id;
public async Task<bool> EnqueueAsync(TInput input, CancellationToken cancellationToken = default)
{
if (input == null) throw new ArgumentNullException(nameof(input));
try
{
await _channel.Writer.WriteAsync(input, cancellationToken).ConfigureAwait(false);
_metrics.RecordMessageEnqueued(_id);
UpdateQueueMetrics();
return true;
}
catch (ChannelClosedException)
{
_logger.LogError($"Failed to enqueue message: Channel is closed for processor '{_id}'");
_metrics.RecordMessageDropped(_id, "ChannelClosed");
return false;
}
catch (OperationCanceledException)
{
_logger.LogDebug($"Enqueue operation was canceled for processor '{_id}'");
return false;
}
catch (Exception ex)
{
_logger.LogError($"Unexpected error while enqueuing message for processor '{_id}': {ex.Message}");
_metrics.RecordMessageDropped(_id, "UnexpectedError");
return false;
}
}
public Task StartAsync(CancellationToken cancellationToken = default)
{
if (_processingTask != null)
throw new InvalidOperationException($"Processor '{_id}' is already running.");
_processingTask = ProcessLoopAsync();
_logger.LogInformation($"Processor '{_id}' started");
return Task.CompletedTask;
}
public async Task<bool> StopAsync(TimeSpan timeout)
{
if (_processingTask == null) return true;
_logger.LogInformation($"Stopping processor '{_id}'...");
// Даем время на завершение текущей обработки
await WhenIdle().ConfigureAwait(false);
// Правильный порядок: сначала завершаем канал, потом отменяем
_channel.Writer.Complete();
_cancellationTokenSource.Cancel();
try
{
using var timeoutCts = new CancellationTokenSource(timeout);
var completionTask = _processingTask;
var timeoutTask = Task.Delay(Timeout.Infinite, timeoutCts.Token);
var completedTask = await Task.WhenAny(completionTask, timeoutTask).ConfigureAwait(false);
if (completedTask == completionTask)
{
await completionTask.ConfigureAwait(false); // Проверяем исключения
_processingTask = null;
_logger.LogInformation($"Processor '{_id}' stopped successfully");
return true;
}
else
{
_logger.LogWarning($"Processor '{_id}' stop timeout after {timeout.TotalSeconds}s");
_processingTask = null;
return false;
}
}
catch (OperationCanceledException)
{
_processingTask = null;
_logger.LogInformation($"Processor '{_id}' stopped (operation canceled)");
return true;
}
catch (Exception ex)
{
_processingTask = null;
_logger.LogError($"Error while stopping processor '{_id}': {ex.Message}");
return false;
}
}
public async Task WhenIdle()
{
while (true)
{
var queueLength = _channel.Reader.Count;
var isProcessing = _processingMetrics.IsProcessing;
if (queueLength == 0 && !isProcessing && _activeProcessingCount == 0)
{
break;
}
_logger.LogDebug($"Waiting for processor '{_id}' to become idle. Queue: {queueLength}, Processing: {isProcessing}, Active: {_activeProcessingCount}");
await Task.Delay(10).ConfigureAwait(false);
}
}
private async Task ProcessLoopAsync()
{
var token = _cancellationTokenSource.Token;
try
{
await foreach (var input in _channel.Reader.ReadAllAsync(token).ConfigureAwait(false))
{
if (token.IsCancellationRequested)
break;
if (input is MessageEnvelope<TInput> envelope && envelope.IsExpired(_options.MessageTimeToLive))
{
_logger.LogWarning($"Message expired and will be skipped for processor '{_id}'");
_metrics.RecordMessageDropped(_id, "Expired");
_processingMetrics.MessagesDropped++;
continue;
}
await ProcessWithRetryAsync(input, token).ConfigureAwait(false);
UpdateQueueMetrics();
}
}
catch (OperationCanceledException)
{
// Ожидаемое исключение при остановке
}
catch (Exception ex)
{
_logger.LogCritical($"Fatal error in processing loop for processor '{_id}': {ex.Message}");
}
}
private async Task ProcessWithRetryAsync(TInput input, CancellationToken cancellationToken)
{
Interlocked.Increment(ref _activeProcessingCount);
_processingMetrics.IsProcessing = true;
try
{
var startTime = Stopwatch.GetTimestamp();
var success = false;
Exception? lastException = null;
for (int attempt = 0; attempt <= _options.MaxRetries && !cancellationToken.IsCancellationRequested; attempt++)
{
try
{
if (attempt > 0)
{
_processingMetrics.RetryAttempts++;
_metrics.RecordRetryAttempt(_id, attempt);
var delay = _options.UseExponentialBackoff
? TimeSpan.FromMilliseconds(_options.RetryDelay.TotalMilliseconds * Math.Pow(2, attempt - 1))
: _options.RetryDelay;
_logger.LogDebug($"Retry attempt {attempt} for processor '{_id}' after {delay.TotalMilliseconds}ms");
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
}
await _processAction(input).ConfigureAwait(false);
success = true;
break;
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
lastException = ex;
_logger.LogWarning($"Processing attempt {attempt} failed for processor '{_id}': {ex.Message}");
if (cancellationToken.IsCancellationRequested)
break;
}
}
var processingTimeMs = (Stopwatch.GetTimestamp() - startTime) * 1000.0 / Stopwatch.Frequency;
_totalProcessingTimeTicks += (long)(processingTimeMs * TimeSpan.TicksPerMillisecond);
if (success)
{
_processingMetrics.MessagesProcessed++;
_logger.LogDebug($"Message processed successfully by processor '{_id}' in {processingTimeMs:F2}ms");
}
else if (!cancellationToken.IsCancellationRequested)
{
_processingMetrics.MessagesFailed++;
_logger.LogError($"Message processing failed after {_options.MaxRetries} attempts for processor '{_id}': {lastException?.Message}");
if (_options.ErrorPolicy == ErrorHandlingPolicy.DeadLetter)
{
_logger.LogWarning($"Message moved to dead letter queue for processor '{_id}'");
}
}
_processingMetrics.AverageProcessingTimeMs =
_processingMetrics.MessagesProcessed > 0
? (_totalProcessingTimeTicks / TimeSpan.TicksPerMillisecond) / _processingMetrics.MessagesProcessed
: 0;
if (!cancellationToken.IsCancellationRequested)
{
_metrics.RecordMessageProcessed(_id, success, (long)processingTimeMs);
}
}
finally
{
Interlocked.Decrement(ref _activeProcessingCount);
_processingMetrics.IsProcessing = false;
}
}
private void UpdateQueueMetrics()
{
_processingMetrics.CurrentQueueLength = _channel.Reader.Count;
_metrics.RecordQueueLength(_id, _processingMetrics.CurrentQueueLength);
}
public ProcessingMetrics GetMetrics()
{
lock (_processingMetrics)
{
return new ProcessingMetrics
{
MessagesProcessed = _processingMetrics.MessagesProcessed,
MessagesFailed = _processingMetrics.MessagesFailed,
MessagesDropped = _processingMetrics.MessagesDropped,
RetryAttempts = _processingMetrics.RetryAttempts,
CurrentQueueLength = _channel.Reader.Count,
AverageProcessingTimeMs = _processingMetrics.AverageProcessingTimeMs,
IsProcessing = _processingMetrics.IsProcessing
};
}
}
public void Dispose()
{
_cancellationTokenSource?.Cancel();
_channel.Writer.Complete();
_cancellationTokenSource?.Dispose();
_processingLock?.Dispose();
_stopwatch.Stop();
}
}
// =============================================
// МОДУЛИ ОБРАБОТКИ СООБЩЕНИЙ
// =============================================
public class SelfHandlerModule<TInput> : IMessageHandlerModule<TInput>
{
private readonly ConcurrentDictionary<string, Func<TInput, Task>> _handlers;
private readonly ILogger _logger;
public SelfHandlerModule(ILogger logger)
{
_handlers = new ConcurrentDictionary<string, Func<TInput, Task>>(StringComparer.OrdinalIgnoreCase);
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public void RegisterHandler(string key, Func<TInput, Task> handler)
{
if (string.IsNullOrWhiteSpace(key))
throw new ArgumentException("Key cannot be null or empty", nameof(key));
if (handler == null)
throw new ArgumentNullException(nameof(handler));
_handlers[key] = handler;
_logger.LogDebug($"Handler registered for key '{key}'");
}
public async Task<bool> TryHandleAsync(MessageEnvelope<TInput> envelope, CancellationToken cancellationToken = default)
{
if (_handlers.TryGetValue(envelope.Key, out var handler))
{
try
{
await handler(envelope.Payload).ConfigureAwait(false);
return true;
}
catch (Exception ex)
{
_logger.LogError($"Error executing handler for key '{envelope.Key}': {ex.Message}");
throw;
}
}
_logger.LogWarning($"No handler found for key '{envelope.Key}'");
return false;
}
}
public class SubscriberHandlerModule<TInput>
{
private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, ISubscriber<TInput>>> _subscribersByKey;
private readonly ILogger _logger;
private readonly IMetricsRecorder _metrics;
public SubscriberHandlerModule(ILogger logger, IMetricsRecorder metrics)
{
_subscribersByKey = new ConcurrentDictionary<string, ConcurrentDictionary<string, ISubscriber<TInput>>>(StringComparer.OrdinalIgnoreCase);
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
}
public bool Subscribe(string key, ISubscriber<TInput> subscriber)
{
if (string.IsNullOrWhiteSpace(key))
throw new ArgumentException("Key cannot be null or empty", nameof(key));
if (subscriber == null)
throw new ArgumentNullException(nameof(subscriber));
var subscribers = _subscribersByKey.GetOrAdd(key, _ => new ConcurrentDictionary<string, ISubscriber<TInput>>());
if (subscribers.TryAdd(subscriber.Id, subscriber))
{
_logger.LogInformation($"Subscriber '{subscriber.Id}' subscribed to key '{key}'");
return true;
}
_logger.LogWarning($"Subscriber '{subscriber.Id}' is already subscribed to key '{key}'");
return false;
}
public bool Unsubscribe(string key, string subscriberId)
{
if (string.IsNullOrWhiteSpace(key))
throw new ArgumentException("Key cannot be null or empty", nameof(key));
if (string.IsNullOrWhiteSpace(subscriberId))
throw new ArgumentException("SubscriberId cannot be null or empty", nameof(subscriberId));
if (_subscribersByKey.TryGetValue(key, out var subscribers))
{
if (subscribers.TryRemove(subscriberId, out _))
{
_logger.LogInformation($"Subscriber '{subscriberId}' unsubscribed from key '{key}'");
// Очищаем пустые коллекции
if (subscribers.IsEmpty)
_subscribersByKey.TryRemove(key, out _);
return true;
}
}
_logger.LogWarning($"Subscriber '{subscriberId}' was not subscribed to key '{key}'");
return false;
}
public async Task<bool> NotifySubscribersAsync(string key, TInput payload, CancellationToken cancellationToken = default)
{
if (_subscribersByKey.TryGetValue(key, out var subscribers))
{
// Создаем снимок коллекции для безопасной итерации
var subscribersSnapshot = subscribers.Values.ToArray();
var envelope = new MessageEnvelope<TInput>(key, payload);
var tasks = subscribersSnapshot.Select(async subscriber =>
{
try
{
var enqueued = await subscriber.InputProcessor.EnqueueAsync(envelope, cancellationToken).ConfigureAwait(false);
if (!enqueued)
{
_logger.LogWarning($"Failed to enqueue message to subscriber '{subscriber.Id}'");
_metrics.RecordMessageDropped(subscriber.Id, "EnqueueFailed");
}
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
_logger.LogError($"Failed to notify subscriber '{subscriber.Id}': {ex.Message}");
_metrics.RecordMessageDropped(subscriber.Id, "NotificationFailed");
}
});
await Task.WhenAll(tasks).ConfigureAwait(false);
_logger.LogDebug($"Notified {subscribersSnapshot.Length} subscribers for key '{key}'");
return subscribersSnapshot.Length > 0;
}
_logger.LogDebug($"No subscribers found for key '{key}'");
return false;
}
public IReadOnlyCollection<string> GetSubscribedKeys() => _subscribersByKey.Keys.ToArray();
}
// =============================================
// ОСНОВНЫЕ СЕРВИСЫ
// =============================================
public class MessageProcessor<TInput> : IDisposable
{
private readonly IInputProcessor<MessageEnvelope<TInput>> _processor;
private readonly ILogger _logger;
public MessageProcessor(
string id,
Func<MessageEnvelope<TInput>, Task> handler,
ILogger logger,
IMetricsRecorder metrics,
ProcessingOptions? options = null)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_processor = new InputProcessor<MessageEnvelope<TInput>>(id, handler, logger, metrics, options);
}
public Task StartAsync(CancellationToken cancellationToken = default) => _processor.StartAsync(cancellationToken);
public Task<bool> StopAsync(TimeSpan timeout) => _processor.StopAsync(timeout);
public Task<bool> EnqueueAsync(MessageEnvelope<TInput> envelope, CancellationToken cancellationToken = default)
=> _processor.EnqueueAsync(envelope, cancellationToken);
public ProcessingMetrics GetMetrics() => _processor.GetMetrics();
public Task WhenIdle() => _processor.WhenIdle();
public void Dispose() => _processor?.Dispose();
}
public class RegularService<TInput> : IDisposable where TInput : notnull
{
private readonly MessageProcessor<TInput> _messageProcessor;
private readonly SelfHandlerModule<TInput> _handlerModule;
private readonly ILogger _logger;
public RegularService(
string id,
ILogger logger,
IMetricsRecorder metrics,
ProcessingOptions? processingOptions = null)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_handlerModule = new SelfHandlerModule<TInput>(logger);
_messageProcessor = new MessageProcessor<TInput>($"{id}-InputProcessor", ProcessMessageAsync, logger, metrics, processingOptions);
}
private async Task ProcessMessageAsync(MessageEnvelope<TInput> envelope)
{
await _handlerModule.TryHandleAsync(envelope).ConfigureAwait(false);
}
public void RegisterHandler(string key, Func<TInput, Task> handler)
{
_handlerModule.RegisterHandler(key, handler);
}
public async Task<bool> SendMessageAsync(string key, TInput payload, CancellationToken cancellationToken = default)
{
var envelope = new MessageEnvelope<TInput>(key, payload);
return await _messageProcessor.EnqueueAsync(envelope, cancellationToken).ConfigureAwait(false);
}
public Task StartAsync(CancellationToken cancellationToken = default) => _messageProcessor.StartAsync(cancellationToken);
public Task<bool> StopAsync(TimeSpan timeout) => _messageProcessor.StopAsync(timeout);
public ProcessingMetrics GetMetrics() => _messageProcessor.GetMetrics();
public Task WhenIdle() => _messageProcessor.WhenIdle();
public void Dispose() => _messageProcessor?.Dispose();
}
public class EventHub<TInput> : IDisposable where TInput : notnull
{
private readonly SubscriberHandlerModule<TInput> _subscriberModule;
private readonly ILogger _logger;
private readonly IMetricsRecorder _metrics;
private readonly ProcessingOptions _options;
public EventHub(
ILogger logger,
IMetricsRecorder metrics,
ProcessingOptions? options = null)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_options = options ?? new ProcessingOptions();
_subscriberModule = new SubscriberHandlerModule<TInput>(logger, metrics);
}
public async Task PublishAsync(string key, TInput @event, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(key))
throw new ArgumentException("Key cannot be null or empty", nameof(key));
_logger.LogDebug($"Publishing event '{key}'");
var hasSubscribers = await _subscriberModule.NotifySubscribersAsync(key, @event, cancellationToken).ConfigureAwait(false);
if (!hasSubscribers)
{
_logger.LogDebug($"Event '{key}' published but no subscribers were notified");
}
}
public bool Subscribe(string key, ISubscriber<TInput> subscriber)
=> _subscriberModule.Subscribe(key, subscriber);
public bool Unsubscribe(string key, string subscriberId)
=> _subscriberModule.Unsubscribe(key, subscriberId);
public IReadOnlyCollection<string> GetSubscribedKeys() => _subscriberModule.GetSubscribedKeys();
public void Dispose() { }
}
public class ServiceSubscriber<TInput> : ISubscriber<TInput>, IDisposable where TInput : notnull
{
public string Id { get; }
public IInputProcessor<MessageEnvelope<TInput>> InputProcessor { get; }
private readonly SelfHandlerModule<TInput> _handlerModule;
private readonly ILogger _logger;
public ServiceSubscriber(
string id,
ILogger logger,
IMetricsRecorder metrics,
Action<SelfHandlerModule<TInput>>? configureHandlers = null,
ProcessingOptions? processingOptions = null)
{
Id = id ?? throw new ArgumentNullException(nameof(id));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_handlerModule = new SelfHandlerModule<TInput>(logger);
configureHandlers?.Invoke(_handlerModule);
InputProcessor = new InputProcessor<MessageEnvelope<TInput>>(
$"{Id}-InputProcessor",
async (env) => await _handlerModule.TryHandleAsync(env),
logger,
metrics,
processingOptions);
}
public void RegisterHandler(string key, Func<TInput, Task> handler)
=> _handlerModule.RegisterHandler(key, handler);
public Task StartAsync(CancellationToken cancellationToken = default) => InputProcessor.StartAsync(cancellationToken);
public Task<bool> StopAsync(TimeSpan timeout) => InputProcessor.StopAsync(timeout);
// ДОБАВЛЕННЫЙ МЕТОД
public Task WhenIdle() => InputProcessor.WhenIdle();
public void Dispose() => (InputProcessor as IDisposable)?.Dispose();
}
------------------------
class Program
{
const int MyDelayMs = 100;
static async Task Main(string[] args)
{
// Настройка зависимостей
var logger = new ConsoleLogger("EventHubDemo");
var metrics = new MetricsRecorder();
var processingOptions = new ProcessingOptions
{
MaxRetries = 3,
RetryDelay = TimeSpan.FromMilliseconds(50),
UseExponentialBackoff = true,
ErrorPolicy = ErrorHandlingPolicy.Retry,
ChannelCapacity = 100,
MessageTimeToLive = TimeSpan.FromSeconds(30),
ShutdownTimeout = TimeSpan.FromSeconds(10)
};
try
{
await RunDemoAsync(logger, metrics, processingOptions);
}
finally
{
metrics.Dispose();
}
}
static async Task RunDemoAsync(ILogger logger, IMetricsRecorder metrics, ProcessingOptions options)
{
logger.LogInformation("Starting EventHub demo...");
// 1. Создаем EventHub
var eventHub = new EventHub<string>(logger, metrics, options);
// 2. Создаем подписчиков
var subscriber1 = new ServiceSubscriber<string>("Subscriber1", logger, metrics, module =>
{
module.RegisterHandler("greeting", async (payload) =>
{
await Task.Delay(MyDelayMs); // Имитация асинхронной работы
if (payload == "fail")
throw new InvalidOperationException("Simulated processing failure");
logger.LogInformation($"[Subscriber1] Received greeting: {payload}");
});
module.RegisterHandler("news", async (payload) =>
{
await Task.Delay(5);
logger.LogInformation($"[Subscriber1] Received news: {payload}");
});
}, options);
var subscriber2 = new ServiceSubscriber<string>("Subscriber2", logger, metrics, module =>
{
module.RegisterHandler("greeting", async (payload) =>
{
await Task.Delay(15);
logger.LogInformation($"[Subscriber2] Got greeting: {payload}");
});
// subscriber2 не подписывается на "news"
}, options);
// 3. Запускаем подписчиков
await subscriber1.StartAsync();
await subscriber2.StartAsync();
// 4. Подписываем их на события в EventHub
eventHub.Subscribe("greeting", subscriber1);
eventHub.Subscribe("news", subscriber1);
eventHub.Subscribe("greeting", subscriber2);
// 5. Публикуем события
logger.LogInformation("--- Publishing 'greeting' ---");
await eventHub.PublishAsync("greeting", "Hello, World!");
await Task.Delay(100); // Даем время на обработку
logger.LogInformation("--- Publishing 'news' ---");
await eventHub.PublishAsync("news", "Breaking: Channels are awesome!");
await Task.Delay(100); // Даем время на обработку
logger.LogInformation("--- Publishing 'unknown_event' ---");
await eventHub.PublishAsync("unknown_event", "This should be ignored.");
await Task.Delay(100); // Даем время на обработку
// Тестируем обработку ошибок - ДАЕМ БОЛЬШЕ ВРЕМЕНИ
logger.LogInformation("--- Testing error handling ---");
await eventHub.PublishAsync("greeting", "fail");
// Ждем завершения обработки с ошибкой (все повторные попытки)
logger.LogInformation("Waiting for error processing to complete...");
await subscriber1.WhenIdle(); // ТЕПЕРЬ ЭТО РАБОТАЕТ
await Task.Delay(200); // Дополнительная задержка
// 6. Отписываем одного подписчика
logger.LogInformation("--- Unsubscribing Subscriber2 from 'greeting' ---");
eventHub.Unsubscribe("greeting", "Subscriber2");
logger.LogInformation("--- Publishing 'greeting' again ---");
await eventHub.PublishAsync("greeting", "Hello again!");
await Task.Delay(100); // Даем время на обработку
// 7. Выводим метрики
logger.LogInformation("--- Metrics ---");
var metrics1 = subscriber1.InputProcessor.GetMetrics();
var metrics2 = subscriber2.InputProcessor.GetMetrics();
logger.LogInformation($"Subscriber1 - Processed: {metrics1.MessagesProcessed}, Failed: {metrics1.MessagesFailed}, Queue: {metrics1.CurrentQueueLength}");
logger.LogInformation($"Subscriber2 - Processed: {metrics2.MessagesProcessed}, Failed: {metrics2.MessagesFailed}, Queue: {metrics2.CurrentQueueLength}");
// 8. Останавливаем подписчиков (graceful shutdown)
logger.LogInformation("--- Stopping subscribers ---");
var stopTasks = new List<Task<bool>>
{
subscriber1.StopAsync(options.ShutdownTimeout),
subscriber2.StopAsync(options.ShutdownTimeout)
};
var stopResults = await Task.WhenAll(stopTasks);
for (int i = 0; i < stopResults.Length; i++)
{
if (!stopResults[i])
{
logger.LogWarning($"Subscriber{i + 1} did not stop gracefully");
}
}
logger.LogInformation("--- EventHub demo finished ---");
}
}
Комментариев нет:
Отправить комментарий