пятница, 10 октября 2025 г.

25.10.11, DeepSeek, Qwen, EventHub, CodeRevew

25.10.11, DeepSeek, Qwen, EventHub, CodeRevew

Проведу детальный код-ревью этого EventHub. Код имеет интересную архитектуру, но есть несколько важных проблем.

🚨 Критические проблемы

1. Нарушение принципов async/await

csharp
public async Task PublishAsync(string key, TInput @event)
{
    // ❌ Нет await - метод синхронный по сути
    _subscriberModule.Handle(key, @event);
}

public async Task SubscribeAsync(string key, ISubscriber<TInput> subscriber)
{
    // ❌ Тоже нет await
    _subscriberModule.Subscribe(key, subscriber);
}

Исправление:

csharp
public Task PublishAsync(string key, TInput @event)
{
    _subscriberModule.Handle(key, @event);
    return Task.CompletedTask; // или сделать метод void
}

// Или переименовать в синхронные методы
public void Publish(string key, TInput @event)
{
    _subscriberModule.Handle(key, @event);
}

2. Потенциальная утечка памяти в SubscriberHandlerModule

csharp
public void Unsubscribe(string key, string subscriberId)
{
    if (_subscribersByKey.TryGetValue(key, out var subscribers))
    {
        var toRemove = subscribers.FirstOrDefault(s => s.Id == subscriberId);
        if (toRemove != null)
        {
            var newSubs = new ConcurrentBag<ISubscriber<TInput>>();
            foreach (var sub in subscribers)
            {
                if (sub.Id != subscriberId) newSubs.Add(sub);
            }
            _subscribersByKey[key] = newSubs; // ❌ Дорогая операция
        }
    }
}

Проблемы:

  • ConcurrentBag не поддерживает эффективное удаление

  • Создание нового bag на каждое удаление - дорого

  • При частой подписке/отписке будет производительность O(n)

Исправление:

csharp
public class SubscriberHandlerModule<TInput>
{
    private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, ISubscriber<TInput>>> _subscribersByKey;

    public void Subscribe(string key, ISubscriber<TInput> subscriber)
    {
        var subscribers = _subscribersByKey.GetOrAdd(key, 
            _ => new ConcurrentDictionary<string, ISubscriber<TInput>>());
        subscribers[subscriber.Id] = subscriber;
    }

    public void Unsubscribe(string key, string subscriberId)
    {
        if (_subscribersByKey.TryGetValue(key, out var subscribers))
        {
            subscribers.TryRemove(subscriberId, out _);
            
            // Очищаем пустые коллекции
            if (subscribers.IsEmpty)
                _subscribersByKey.TryRemove(key, out _);
        }
    }
}

3. Отсутствие обработки ошибок в EventHub

csharp
public void Handle(string key, TInput payload)
{
    if (_subscribersByKey.TryGetValue(key, out var subscribers))
    {
        var envelope = new MessageEnvelope<TInput>(key, payload);
        foreach (var subscriber in subscribers)
        {
            subscriber.InputProcessor.Enqueue(envelope); // ❌ Если один упадет - остальные не получат
        }
    }
}

Исправление:

csharp
public void Handle(string key, TInput payload)
{
    if (_subscribersByKey.TryGetValue(key, out var subscribers))
    {
        var envelope = new MessageEnvelope<TInput>(key, payload);
        foreach (var subscriber in subscribers)
        {
            try
            {
                subscriber.InputProcessor.Enqueue(envelope);
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Failed to enqueue to subscriber {subscriber.Id}: {ex.Message}");
                // Можно добавить dead letter queue
            }
        }
    }
}

⚠️ Архитектурные проблемы

4. Нарушение SRP в BaseMessageProcessor

csharp
public abstract class BaseMessageProcessor<TInput, THandlerModule>
    where THandlerModule : IMessageHandlerModule<TInput>
{
    // ❌ Смешивает обработку сообщений и управление процессором
}

Исправление:

csharp
public class MessageProcessor<TInput> : IDisposable
{
    private readonly IInputProcessor<MessageEnvelope<TInput>> _processor;
    
    public MessageProcessor(string id, Func<MessageEnvelope<TInput>, Task> handler)
    {
        _processor = new InputProcessor<MessageEnvelope<TInput>>(id, handler);
    }
    
    public Task StartAsync() => _processor.StartAsync();
    public Task StopAsync() => _processor.StopAsync();
    public void Enqueue(MessageEnvelope<TInput> envelope) => _processor.Enqueue(envelope);
    public void Dispose() => _processor.Dispose();
}

5. Жесткая связность в ServiceSubscriber

csharp
public class ServiceSubscriber<TInput> : ISubscriber<TInput>
{
    // ❌ Жестко зашиты обработчики "start", "message", "stop"
    handlerModule.RegisterHandler("start", ProcessStartAsync);
    handlerModule.RegisterHandler("message", ProcessMessageAsync); 
    handlerModule.RegisterHandler("stop", ProcessStopAsync);
}

Исправление:

csharp
public class ServiceSubscriber<TInput> : ISubscriber<TInput>
{
    private readonly SelfHandlerModule<TInput> _handlerModule;
    
    public ServiceSubscriber(string id, Action<SelfHandlerModule<TInput>> configureHandlers)
    {
        Id = id;
        _handlerModule = new SelfHandlerModule<TInput>();
        configureHandlers?.Invoke(_handlerModule);
        
        InputProcessor = new InputProcessor<MessageEnvelope<TInput>>(
            $"{Id}-InputProcessor", 
            async env => await _handlerModule.TryHandleAsync(env));
    }
    
    public void RegisterHandler(string key, Func<TInput, Task> handler)
        => _handlerModule.RegisterHandler(key, handler);
}

🐛 Мелкие проблемы

6. Возможность race condition в InputProcessor

csharp
private async Task ProcessLoopAsync()
{
    while (!token.IsCancellationRequested)
    {
        if (_queue.TryDequeue(out input))
        {
            // Обработка
        }
        else 
        { 
            _ = _autoResetEvent.WaitOne(TimeSpan.FromSeconds(1), false); 
            // ❌ Возможна потеря сигнала между TryDequeue и WaitOne
        }
    }
}

Исправление:

csharp
private async Task ProcessLoopAsync()
{
    while (!token.IsCancellationRequested)
    {
        // Ждем сначала, потом проверяем очередь
        _autoResetEvent.WaitOne(TimeSpan.FromMilliseconds(100));
        
        while (_queue.TryDequeue(out var input))
        {
            try 
            { 
                await _processAction(input); 
            }
            catch (Exception ex) 
            { 
                Console.WriteLine($"Error in processor '{_id}': {ex.Message}"); 
            }
        }
    }
}

7. Отсутствие generic constraints

csharp
public class MessageEnvelope<TInput>
{
    // ❌ Нет ограничений на TInput
    public TInput Payload { get; set; }
}

Рекомендация:

csharp
public class MessageEnvelope<TInput> where TInput : notnull
{
    public TInput Payload { get; set; }
}

✅ Положительные моменты

  1. Интерфейсы разделены правильно - хорошая абстракция

  2. Использование CancellationToken - правильная обработка отмены

  3. Concurrent коллекции - потокобезопасность

  4. IDisposable реализован - управление ресурсами

🎯 Итоговые рекомендации

  1. Исправить async методы - убрать misleading async

  2. Заменить ConcurrentBag на ConcurrentDictionary для эффективного удаления

  3. Добавить обработку ошибок в критических местах

  4. Упростить архитектуру - уменьшить наследование

  5. Добавить логирование вместо Console.WriteLine

  6. Рассмотреть использование Channel вместо ConcurrentQueue + AutoResetEvent

Комментариев нет:

Отправить комментарий