25.10.11, DeepSeek, Qwen, EventHub, CodeRevew
Проведу детальный код-ревью этого EventHub. Код имеет интересную архитектуру, но есть несколько важных проблем.
🚨 Критические проблемы
1. Нарушение принципов async/await
public async Task PublishAsync(string key, TInput @event)
{
// ❌ Нет await - метод синхронный по сути
_subscriberModule.Handle(key, @event);
}
public async Task SubscribeAsync(string key, ISubscriber<TInput> subscriber)
{
// ❌ Тоже нет await
_subscriberModule.Subscribe(key, subscriber);
}Исправление:
public Task PublishAsync(string key, TInput @event)
{
_subscriberModule.Handle(key, @event);
return Task.CompletedTask; // или сделать метод void
}
// Или переименовать в синхронные методы
public void Publish(string key, TInput @event)
{
_subscriberModule.Handle(key, @event);
}2. Потенциальная утечка памяти в SubscriberHandlerModule
public void Unsubscribe(string key, string subscriberId)
{
if (_subscribersByKey.TryGetValue(key, out var subscribers))
{
var toRemove = subscribers.FirstOrDefault(s => s.Id == subscriberId);
if (toRemove != null)
{
var newSubs = new ConcurrentBag<ISubscriber<TInput>>();
foreach (var sub in subscribers)
{
if (sub.Id != subscriberId) newSubs.Add(sub);
}
_subscribersByKey[key] = newSubs; // ❌ Дорогая операция
}
}
}Проблемы:
ConcurrentBagне поддерживает эффективное удалениеСоздание нового bag на каждое удаление - дорого
При частой подписке/отписке будет производительность O(n)
Исправление:
public class SubscriberHandlerModule<TInput>
{
private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, ISubscriber<TInput>>> _subscribersByKey;
public void Subscribe(string key, ISubscriber<TInput> subscriber)
{
var subscribers = _subscribersByKey.GetOrAdd(key,
_ => new ConcurrentDictionary<string, ISubscriber<TInput>>());
subscribers[subscriber.Id] = subscriber;
}
public void Unsubscribe(string key, string subscriberId)
{
if (_subscribersByKey.TryGetValue(key, out var subscribers))
{
subscribers.TryRemove(subscriberId, out _);
// Очищаем пустые коллекции
if (subscribers.IsEmpty)
_subscribersByKey.TryRemove(key, out _);
}
}
}3. Отсутствие обработки ошибок в EventHub
public void Handle(string key, TInput payload)
{
if (_subscribersByKey.TryGetValue(key, out var subscribers))
{
var envelope = new MessageEnvelope<TInput>(key, payload);
foreach (var subscriber in subscribers)
{
subscriber.InputProcessor.Enqueue(envelope); // ❌ Если один упадет - остальные не получат
}
}
}Исправление:
public void Handle(string key, TInput payload)
{
if (_subscribersByKey.TryGetValue(key, out var subscribers))
{
var envelope = new MessageEnvelope<TInput>(key, payload);
foreach (var subscriber in subscribers)
{
try
{
subscriber.InputProcessor.Enqueue(envelope);
}
catch (Exception ex)
{
Console.WriteLine($"Failed to enqueue to subscriber {subscriber.Id}: {ex.Message}");
// Можно добавить dead letter queue
}
}
}
}⚠️ Архитектурные проблемы
4. Нарушение SRP в BaseMessageProcessor
public abstract class BaseMessageProcessor<TInput, THandlerModule>
where THandlerModule : IMessageHandlerModule<TInput>
{
// ❌ Смешивает обработку сообщений и управление процессором
}Исправление:
public class MessageProcessor<TInput> : IDisposable
{
private readonly IInputProcessor<MessageEnvelope<TInput>> _processor;
public MessageProcessor(string id, Func<MessageEnvelope<TInput>, Task> handler)
{
_processor = new InputProcessor<MessageEnvelope<TInput>>(id, handler);
}
public Task StartAsync() => _processor.StartAsync();
public Task StopAsync() => _processor.StopAsync();
public void Enqueue(MessageEnvelope<TInput> envelope) => _processor.Enqueue(envelope);
public void Dispose() => _processor.Dispose();
}5. Жесткая связность в ServiceSubscriber
public class ServiceSubscriber<TInput> : ISubscriber<TInput>
{
// ❌ Жестко зашиты обработчики "start", "message", "stop"
handlerModule.RegisterHandler("start", ProcessStartAsync);
handlerModule.RegisterHandler("message", ProcessMessageAsync);
handlerModule.RegisterHandler("stop", ProcessStopAsync);
}Исправление:
public class ServiceSubscriber<TInput> : ISubscriber<TInput>
{
private readonly SelfHandlerModule<TInput> _handlerModule;
public ServiceSubscriber(string id, Action<SelfHandlerModule<TInput>> configureHandlers)
{
Id = id;
_handlerModule = new SelfHandlerModule<TInput>();
configureHandlers?.Invoke(_handlerModule);
InputProcessor = new InputProcessor<MessageEnvelope<TInput>>(
$"{Id}-InputProcessor",
async env => await _handlerModule.TryHandleAsync(env));
}
public void RegisterHandler(string key, Func<TInput, Task> handler)
=> _handlerModule.RegisterHandler(key, handler);
}🐛 Мелкие проблемы
6. Возможность race condition в InputProcessor
private async Task ProcessLoopAsync()
{
while (!token.IsCancellationRequested)
{
if (_queue.TryDequeue(out input))
{
// Обработка
}
else
{
_ = _autoResetEvent.WaitOne(TimeSpan.FromSeconds(1), false);
// ❌ Возможна потеря сигнала между TryDequeue и WaitOne
}
}
}Исправление:
private async Task ProcessLoopAsync()
{
while (!token.IsCancellationRequested)
{
// Ждем сначала, потом проверяем очередь
_autoResetEvent.WaitOne(TimeSpan.FromMilliseconds(100));
while (_queue.TryDequeue(out var input))
{
try
{
await _processAction(input);
}
catch (Exception ex)
{
Console.WriteLine($"Error in processor '{_id}': {ex.Message}");
}
}
}
}7. Отсутствие generic constraints
public class MessageEnvelope<TInput>
{
// ❌ Нет ограничений на TInput
public TInput Payload { get; set; }
}Рекомендация:
public class MessageEnvelope<TInput> where TInput : notnull
{
public TInput Payload { get; set; }
}✅ Положительные моменты
Интерфейсы разделены правильно - хорошая абстракция
Использование CancellationToken - правильная обработка отмены
Concurrent коллекции - потокобезопасность
IDisposable реализован - управление ресурсами
🎯 Итоговые рекомендации
Исправить async методы - убрать misleading async
Заменить ConcurrentBag на ConcurrentDictionary для эффективного удаления
Добавить обработку ошибок в критических местах
Упростить архитектуру - уменьшить наследование
Добавить логирование вместо Console.WriteLine
Рассмотреть использование Channel вместо ConcurrentQueue + AutoResetEvent
Комментариев нет:
Отправить комментарий