Подтвердить что ты не робот

Как эффективно регистрировать асинхронно?

Я использую Enterprise Library 4 в одном из моих проектов для ведения журнала (и других целей). Я заметил, что есть некоторые затраты на регистрацию, которую я делаю, что я могу смягчить, выполнив ведение журнала в отдельном потоке.

То, как я делаю это сейчас, это то, что я создаю объект LogEntry, а затем вызываю BeginInvoke на делегат, который вызывает Logger.Write.

new Action<LogEntry>(Logger.Write).BeginInvoke(le, null, null);

То, что я действительно хотел бы сделать, это добавить сообщение журнала в очередь, а затем создать один поток, который вытягивает экземпляры LogEntry из очереди и выполняет операцию журнала. Преимущество этого заключалось бы в том, что ведение журнала не мешает выполнению операции, и не каждая операция ведения журнала приводит к тому, что задание запускается в пуле потоков.

Как создать общую очередь, которая поддерживает много писателей и одного читателя потоковым безопасным способом? Некоторые примеры реализации очереди, которые предназначены для поддержки многих авторов (без создания синхронизации/блокировки) и одного читателя, будут действительно оценены.

Рекомендация относительно альтернативных подходов также будет оценена, но я не заинтересован в изменении рамок регистрации.

4b9b3361

Ответ 1

Я написал этот код некоторое время назад, не стесняйтесь его использовать.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace MediaBrowser.Library.Logging {
    public abstract class ThreadedLogger : LoggerBase {

        Queue<Action> queue = new Queue<Action>();
        AutoResetEvent hasNewItems = new AutoResetEvent(false);
        volatile bool waiting = false;

        public ThreadedLogger() : base() {
            Thread loggingThread = new Thread(new ThreadStart(ProcessQueue));
            loggingThread.IsBackground = true;
            loggingThread.Start();
        }


        void ProcessQueue() {
            while (true) {
                waiting = true;
                hasNewItems.WaitOne(10000,true);
                waiting = false;

                Queue<Action> queueCopy;
                lock (queue) {
                    queueCopy = new Queue<Action>(queue);
                    queue.Clear();
                }

                foreach (var log in queueCopy) {
                    log();
                }
            }
        }

        public override void LogMessage(LogRow row) {
            lock (queue) {
                queue.Enqueue(() => AsyncLogMessage(row));
            }
            hasNewItems.Set();
        }

        protected abstract void AsyncLogMessage(LogRow row);


        public override void Flush() {
            while (!waiting) {
                Thread.Sleep(1);
            }
        }
    }
}

Некоторые преимущества:

  • Он сохраняет фоновый журнал вживую, поэтому ему не нужно вращаться и вращаться вниз.
  • Он использует один поток для обслуживания очереди, что означает, что никогда не будет ситуации, когда 100 потоков обслуживают очередь.
  • Он копирует очереди, чтобы гарантировать, что очередь не заблокирована во время выполнения операции журнала.
  • Он использует AutoResetEvent, чтобы гарантировать, что поток bg находится в состоянии ожидания.
  • Это, ИМХО, очень легко следовать

Вот немного улучшенная версия, имейте в виду, что я провел очень мало тестирования на ней, но она затрагивает несколько незначительных проблем.

public abstract class ThreadedLogger : IDisposable {

    Queue<Action> queue = new Queue<Action>();
    ManualResetEvent hasNewItems = new ManualResetEvent(false);
    ManualResetEvent terminate = new ManualResetEvent(false);
    ManualResetEvent waiting = new ManualResetEvent(false);

    Thread loggingThread; 

    public ThreadedLogger() {
        loggingThread = new Thread(new ThreadStart(ProcessQueue));
        loggingThread.IsBackground = true;
        // this is performed from a bg thread, to ensure the queue is serviced from a single thread
        loggingThread.Start();
    }


    void ProcessQueue() {
        while (true) {
            waiting.Set();
            int i = ManualResetEvent.WaitAny(new WaitHandle[] { hasNewItems, terminate });
            // terminate was signaled 
            if (i == 1) return; 
            hasNewItems.Reset();
            waiting.Reset();

            Queue<Action> queueCopy;
            lock (queue) {
                queueCopy = new Queue<Action>(queue);
                queue.Clear();
            }

            foreach (var log in queueCopy) {
                log();
            }    
        }
    }

    public void LogMessage(LogRow row) {
        lock (queue) {
            queue.Enqueue(() => AsyncLogMessage(row));
        }
        hasNewItems.Set();
    }

    protected abstract void AsyncLogMessage(LogRow row);


    public void Flush() {
        waiting.WaitOne();
    }


    public void Dispose() {
        terminate.Set();
        loggingThread.Join();
    }
}

Преимущества над оригиналом:

  • Это одноразовый, поэтому вы можете избавиться от асинхронного регистратора
  • Улучшена смысловая семантика
  • Он будет немного лучше реагировать на всплеск, за которым следует молчание.

Ответ 2

Да, вам нужна очередь производителей/потребителей. У меня есть один пример этого в моем учебнике по многопоточности - если вы посмотрите на мою страницу "deadlocks/monitor methods" , вы найдете код во втором половина.

Конечно, есть много других примеров в Интернете, и .NET 4.0 будет поставляться с одним в рамках (скорее, более полно, чем у меня!). В .NET 4.0 вы, вероятно, обернете ConcurrentQueue<T> в BlockingCollection<T>.

Версия на этой странице не является универсальной (она была написана давно), но вы, вероятно, захотите сделать ее обобщенной - было бы тривиально делать.

Вы вызывали бы Produce из каждого "нормального" потока и Consume из одного потока, просто зацикливаясь и регистрируя все, что он потребляет. Это, наверное, проще всего сделать, чтобы поток потребителя стал фоновым потоком, поэтому вам не нужно беспокоиться о "остановке" очереди, когда ваше приложение выходит. Это означает, что удаленная возможность пропускать окончательную запись в журнале (хотя бы на полпути через ее запись при выходе из приложения) - или даже больше, если вы создаете быстрее, чем она может потреблять/регистрировать.

Ответ 3

Я предлагаю начать с измерения фактического воздействия на производительность системы в целом (то есть путем запуска профилировщика) и, возможно, переключения на что-то более быстрое, например log4net (я лично перешел к нему из журнала EntLib давным-давно).

Если это не сработает, вы можете попробовать использовать этот простой метод из .NET Framework:

ThreadPool.QueueUserWorkItem

Очередность метода для выполнения. Метод выполняется, когда поток пула потоков становится доступным.

Сведения о MSDN

Если это не сработает, то вы можете прибегнуть к чему-то вроде того, как Джон Скит предложил и фактически сам кодировал фреймворк асинхронного журналирования.

Ответ 4

Вот что я придумал... также см. ответ Сэма Шаффрона. Этот ответ является вики-сообществом в случае возникновения каких-либо проблем, которые люди видят в коде и хотят обновить.

/// <summary>
/// A singleton queue that manages writing log entries to the different logging sources (Enterprise Library Logging) off the executing thread.
/// This queue ensures that log entries are written in the order that they were executed and that logging is only utilizing one thread (backgroundworker) at any given time.
/// </summary>
public class AsyncLoggerQueue
{
    //create singleton instance of logger queue
    public static AsyncLoggerQueue Current = new AsyncLoggerQueue();

    private static readonly object logEntryQueueLock = new object();

    private Queue<LogEntry> _LogEntryQueue = new Queue<LogEntry>();
    private BackgroundWorker _Logger = new BackgroundWorker();

    private AsyncLoggerQueue()
    {
        //configure background worker
        _Logger.WorkerSupportsCancellation = false;
        _Logger.DoWork += new DoWorkEventHandler(_Logger_DoWork);
    }

    public void Enqueue(LogEntry le)
    {
        //lock during write
        lock (logEntryQueueLock)
        {
            _LogEntryQueue.Enqueue(le);

            //while locked check to see if the BW is running, if not start it
            if (!_Logger.IsBusy)
                _Logger.RunWorkerAsync();
        }
    }

    private void _Logger_DoWork(object sender, DoWorkEventArgs e)
    {
        while (true)
        {
            LogEntry le = null;

            bool skipEmptyCheck = false;
            lock (logEntryQueueLock)
            {
                if (_LogEntryQueue.Count <= 0) //if queue is empty than BW is done
                    return;
                else if (_LogEntryQueue.Count > 1) //if greater than 1 we can skip checking to see if anything has been enqueued during the logging operation
                    skipEmptyCheck = true;

                //dequeue the LogEntry that will be written to the log
                le = _LogEntryQueue.Dequeue();
            }

            //pass LogEntry to Enterprise Library
            Logger.Write(le);

            if (skipEmptyCheck) //if LogEntryQueue.Count was > 1 before we wrote the last LogEntry we know to continue without double checking
            {
                lock (logEntryQueueLock)
                {
                    if (_LogEntryQueue.Count <= 0) //if queue is still empty than BW is done
                        return;
                }
            }
        }
    }
}

Ответ 5

В ответ на сообщение Сэма Сафронса я хотел назвать флеш и убедиться, что все действительно закончено. В моем случае я пишу в базу данных в потоке очереди, и все мои события журнала выходили в очередь, но иногда приложение останавливалось, прежде чем все было закончено, что было неприемлемо в моей ситуации. Я изменил несколько фрагментов кода, но главное, что я хотел разделить, - это флеш:

public static void FlushLogs()
        {   
            bool queueHasValues = true;
            while (queueHasValues)
            {
                //wait for the current iteration to complete
                m_waitingThreadEvent.WaitOne();

                lock (m_loggerQueueSync)
                {
                    queueHasValues = m_loggerQueue.Count > 0;
                }
            }

            //force MEL to flush all its listeners
            foreach (MEL.LogSource logSource in MEL.Logger.Writer.TraceSources.Values)
            {                
                foreach (TraceListener listener in logSource.Listeners)
                {
                    listener.Flush();
                }
            }
        }

Надеюсь, это избавит кого-то от разочарования. Это особенно заметно в параллельных процессах, регистрирующих много данных.

Спасибо, что поделились своим решением, он поставил меня в хорошее направление!

- Джонни S

Ответ 6

Я хотел сказать, что мой предыдущий пост был бесполезным. Вы можете просто установить AutoFlush в true, и вам не придется перебирать все слушатели. Тем не менее, у меня все еще была сумасшедшая проблема с параллельными потоками, пытаясь сбросить регистратор. Мне пришлось создать еще одно логическое значение, которое было установлено в true при копировании очереди и выполнении записей LogEntry, а затем в рутине флеша мне пришлось проверить, что логическое значение, чтобы убедиться, что что-то еще не было в очереди, и ничего не обрабатывалось перед возвратом.

Теперь несколько потоков параллельно могут ударить по этой вещи, и когда я вызываю флеш, я знаю, что это действительно покраснело.

     public static void FlushLogs()
    {
        int queueCount;
        bool isProcessingLogs;
        while (true)
        {
            //wait for the current iteration to complete
            m_waitingThreadEvent.WaitOne();

            //check to see if we are currently processing logs
            lock (m_isProcessingLogsSync)
            {
                isProcessingLogs = m_isProcessingLogs;
            }

            //check to see if more events were added while the logger was processing the last batch
            lock (m_loggerQueueSync)
            {
                queueCount = m_loggerQueue.Count;
            }                

            if (queueCount == 0 && !isProcessingLogs)
                break;

            //since something is in the queue, reset the signal so we will not keep looping

            Thread.Sleep(400);
        }
    }

Ответ 8

Здесь может помочь дополнительный уровень косвенности.

Ваш первый вызов метода async может помещать сообщения в синхронизированную очередь и устанавливать событие - поэтому блокировки происходят в пуле потоков, а не в ваших рабочих потоках, а затем еще один поток вытягивает сообщения из очереди когда событие поднято.

Ответ 9

Если вы регистрируете что-то в отдельном потоке, сообщение может не быть записано, если приложение сработает, что делает его бесполезным.

Причина заключается в том, почему вы должны всегда скрываться после каждой записи.

Ответ 10

Если вы имеете в виду очередь SHARED, то я думаю, вам придется синхронизировать записи с ней, нажатиями и попками.

Но я все же считаю, что стоит сосредоточиться на дизайне общей очереди. По сравнению с IO регистрации и, вероятно, по сравнению с другой работой, выполняемой вашим приложением, краткое количество блокировок для нажатий и всплывающих окон, вероятно, не будет значительным.