Подтвердить что ты не робот

Повышайте эффективность и справедливость при объединении событий, близких к времени

У меня есть куча потоков, которые генерируют события типа A и тип B.

Моя программа принимает эти события, обматывает их в сообщении и отправляет их по сети. Сообщение A может содержать одно событие A, событие B или одно событие A и одно событие B:

SendMessage(new Message(a: 1,    b: null));
SendMessage(new Message(a: null, b: 2   ));
SendMessage(new Message(a: 3,    b: 4   ));

События типа A происходят довольно часто, а события типа B встречаются гораздо реже. Таким образом, когда поток генерирует событие B, моя программа ждет бит, чтобы узнать, генерирует ли другой поток событие A и объединяет событие A и событие B, если это возможно.

Вот мой код:

object gate = new object();
int? pendingB;

Message WrapA(int a, int millisecondsTimeout)
{
    int? b;

    lock (gate)
    {
        b = pendingB;
        pendingB = null;
        Monitor.Pulse(gate);
    }

    return new Message(a, b);
}

Message WrapB(int b, int millisecondsTimeout)
{
    lock (gate)
    {
        if (pendingB == null)
        {
            pendingB = b;
            Monitor.Wait(gate, millisecondsTimeout);
            if (pendingB != b) return null;
            pendingB = null;
        }
    }

    return new Message(null, b);
}

Это работает до сих пор. Однако есть две проблемы:

  • Если есть много событий A и много событий B, алгоритм не очень эффективен: только определенный процент событий B привязан к событиям A, даже если там достаточно A событий.

  • Если в течение некоторого времени не генерируются события A (необычные, но не невозможные), алгоритм совершенно несправедлив: один поток, генерирующий события B, должен ждать каждый раз, тогда как все остальные потоки могут отправьте свои события B.

Как я могу повысить эффективность и справедливость алгоритма?

<суб > Ограничения:
& Пули; WrapA и WrapB должны заканчиваться в течение короткого, детерминированного количества времени.
& Пули; SendMessage должен вызываться вне любых замков.
& Пули; Механизм синхронизации отсутствует, кроме gate.
& Пули; Нет дополнительных потоков, задач, таймеров и т.д. & Пули; Поскольку события типа A происходят так часто в обычном случае, ожидание в WrapB выполняется нормально. Суб >


Вот тестовая программа, которая может использоваться в качестве эталона:

public static class Program
{
    static int counter0 = 0;
    static int counterA = 0;
    static int counterB = 0;
    static int counterAB = 0;

    static void SendMessage(Message m)
    {
        if (m != null)
            if (m.a != null)
                if (m.b != null)
                    Interlocked.Increment(ref counterAB);
                else
                    Interlocked.Increment(ref counterA);
            else
                if (m.b != null)
                    Interlocked.Increment(ref counterB);
                else
                    Interlocked.Increment(ref counter0);
    }

    static Thread[] Start(int threadCount, int eventCount,
        int eventInterval, int wrapTimeout, Func<int, int, Message> wrap)
    {
        Thread[] threads = new Thread[threadCount * eventCount];
        for (int i = 0; i < threadCount; i++)
        {
            for (int j = 0; j < eventCount; j++)
            {
                int k = i * 1000 + j;
                int l = j * eventInterval + i;
                threads[i * eventCount + j] = new Thread(() =>
                {
                    Thread.Sleep(l);
                    SendMessage(wrap(k, wrapTimeout));
                });
                threads[i * eventCount + j].Start();
            }
        }
        return threads;
    }

    static void Join(params Thread[] threads)
    {
        for (int i = 0; i < threads.Length; i++)
        {
            threads[i].Join();
        }
    }

    public static void Main(string[] args)
    {
        var wrapper = new MessageWrapper();
        var sw = Stopwatch.StartNew();

        // Only A events
        var t0 = Start(10, 40, 7, 1000, wrapper.WrapA);
        Join(t0);

        // A and B events
        var t1 = Start(10, 40, 7, 1000, wrapper.WrapA);
        var t2 = Start(10, 10, 19, 1000, wrapper.WrapB);
        Join(t1);
        Join(t2);

        // Only B events
        var t3 = Start(10, 20, 7, 1000, wrapper.WrapB);
        Join(t3);

        Console.WriteLine(sw.Elapsed);

        Console.WriteLine("0:  {0}", counter0);
        Console.WriteLine("A:  {0}", counterA);
        Console.WriteLine("B:  {0}", counterB);
        Console.WriteLine("AB: {0}", counterAB);

        Console.WriteLine("Generated A: {0}, Sent A: {1}",
            10 * 40 + 10 * 40, counterA + counterAB);
        Console.WriteLine("Generated B: {0}, Sent B: {1}",
            10 * 10 + 10 * 20, counterB + counterAB);
    }
}
4b9b3361

Ответ 1

Для удовольствия от этого здесь реализована блокировка:

public sealed class MessageWrapper
{
    private int pendingB;

    public Message WrapA(int a, int millisecondsTimeout)
    {
        int b = Interlocked.Exchange(ref pendingB, -1);
        return new Message(a, b == -1 ? null : b);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        var sw = new SpinWait();
        while (Interlocked.CompareExchange(ref pendingB, b, -1) != -1)
        {
            // Spin
            sw.SpinOnce();

            if (sw.NextSpinWillYield)
            {
                // Let us make progress instead of yielding the processor
                // (avoid context switch)
                return new Message(null, b);
            }
        }

        return null;
    }
}

Результаты

Исходная реализация:

00:00:02.0433298
0:  0
A:  733
B:  233
AB: 67
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

Непрерывная реализация:

00:00:01.2546310
0:  0
A:  717
B:  217
AB: 83
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

Обновление

К сожалению, выше реализации есть ошибка и некоторый недостаток. Вот улучшенная версия:

public class MessageWrapper
{
    private int pendingB = EMPTY;
    private const int EMPTY = -1;

    public Message WrapA(int a, int millisecondsTimeout)
    {
        int? b;
        int count = 0;
        while ((b = Interlocked.Exchange(ref pendingB, EMPTY)) == EMPTY)
        {
            if (count % 7 == 0)
            {
                Thread.Sleep(0);
            }
            else if (count % 23 == 0)
            {
                Thread.Sleep(1);
            }
            else
            {
                Thread.Yield();
            }
            if (++count == 480)
            {
                return new Message(a, null);
            }
        }
        return new Message(a, b);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        int count = 0;
        while (Interlocked.CompareExchange(ref pendingB, b, EMPTY) != EMPTY)
        {
            // Spin
            Thread.SpinWait((4 << count++));
            if (count > 10)
            {
                // We didn't manage to place our payload.
                // Let send it ourselves:
                return new Message(null, b);
            }
        }

        // We placed our payload. 
        // Wait some more to see if some WrapA snatches it.
        while (Interlocked.CompareExchange(ref pendingB, EMPTY, EMPTY) == b)
        {
            Thread.SpinWait((4 << count++));
            if (count > 20)
            {
                // No WrapA came along. Pity, we will have to send it ourselves
                int payload = Interlocked.CompareExchange(ref pendingB, EMPTY, b);
                return payload == b ? new Message(null, b) : null;
            }
        }
        return null;
    }
}

Результаты:

Реализация OP

00:00:02.1389474
0:  0
A:  722
B:  222
AB: 78
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

Вторая реализация:

00:00:01.2752425
0:  0
A:  700
B:  200
AB: 100
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

Ответ 2

Для разнообразия я попробовал подход, основанный на параллельных коллекциях. Мне не ясно из опубликованных ограничений, все ли в порядке, но я все равно запишу свой ответ:

Это типичный результат вашего исходного кода на моей машине:

00:00:01.7835426
0:  0
A:  723
B:  223
AB: 77
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

Это типичный результат моего предложения, примерно на 20% медленнее исходного кода, но он захватывает больше сообщений "AB":

00:00:02.1322512
0:  0
A:  701
B:  201
AB: 99
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

Реализация MessageWrapper:

public class MessageWrapper
{
    private BlockingCollection<int?> messageA = new BlockingCollection<int?>();
    private BlockingCollection<int?> messageB = new BlockingCollection<int?>();

    public Message WrapA(int a, int millisecondsTimeout)
    {
        messageA.Add(a);
        return CreateMessage(0);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        messageB.Add(b);
        return CreateMessage(millisecondsTimeout);
    }

    private Message CreateMessage(int timeout)
    {
        int? a, b;

        if (messageB.TryTake(out b) | messageA.TryTake(out a, timeout))
        {
            return new Message(a, b);
        }
        else
        {
            return null;
        }
    }
}

Ответ 3

Кажется, это идеальный кандидат для Reactive Extesions. Вы можете использовать метод Buffer для группировки событий или других подобных расширений для фильтрации и объединения событий.

Возможно, это решение не соответствует одному из ваших ограничений, но, на мой взгляд, это лучшее решение. Реактивные расширения очень мощные.

Ответ 4

Я дам еще одно предложение, которое следует за приведенными ограничениями немного более строго; на моей машине эта реализация последовательно захватывает 97 или более сообщений "AB" при запуске тестовой программы с примерно 5% ухудшением производительности от исходного кода:

class MessageWrapper
{
    object gate = new object();
    int? pendingB;

    public Message WrapA(int a, int millisecondsTimeout)
    {
        Message returnMessage = null;
        bool lockTaken = false;

        Monitor.TryEnter(gate, 100, ref lockTaken);

        if (lockTaken)
        {
            returnMessage = new Message(a, pendingB);

            pendingB = null;
            Monitor.Pulse(gate);

            Monitor.Exit(gate);
        }
        else
        {
            returnMessage = new Message(a, null);
        }

        return returnMessage;
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        Message returnMessage = null;
        bool lockTaken = false;

        Monitor.TryEnter(gate, 100, ref lockTaken);

        if (lockTaken)
        {
            if (pendingB != null)
            {
                Monitor.Wait(gate, 100);
            }

            if (pendingB != null)
            {
                returnMessage = new Message(null, b);
            }
            else
            {
                pendingB = b;

                if (!Monitor.Wait(gate, millisecondsTimeout))
                {
                    pendingB = null;
                    Monitor.Pulse(gate);
                    returnMessage = new Message(null, b);
                }
            }

            Monitor.Exit(gate);
        }
        else
        {
            returnMessage = new Message(null, b);
        }

        return returnMessage;
    }
}

То, что происходит здесь, в основном такое же, как и в исходном коде, но мы также ждем, когда уже есть объект pendingB, а не просто возвращает сообщение "B". Это улучшает количество сообщений "AB", которые мы можем найти, с небольшой стоимостью.

Это выглядит немного грязно, но в основном потому, что я решил использовать более удобную конструкцию Monitor.TryTake в режиме реального времени вместо сырой блокировки. Кроме того, наличие единственного оператора возврата - это аккуратный трюк, чтобы избежать случайного возврата блокировок до вызова Monitor.Exit.

С помощью различных тайм-аутов можно повысить производительность за счет точности, или наоборот. 100 мс был моим первоначальным догадкой для всех, и он по-прежнему выглядит достойно на моей машине.


Как последнее замечание, в этой реализации WrapB мы могли бы изменить строки

            if (pendingB != null)
            {
                Monitor.Wait(gate, 100);
            }

к

            while (pendingB != null)
            {
                Monitor.Wait(gate, 100);
            }

чтобы получить 100% -ную точность, но это сильно помешает метрикам из тестовой программы, так как синхронизирует события "В", которые, очевидно, выполняются крайне плохо, когда есть поток только сообщений "В".

Если я удалю тест t3, он будет работать на 5% быстрее, чем исходный код, последовательно обнаруживая 100 из 100 сообщений "AB". Но тогда время выполнения, конечно, уже не является детерминированным, поскольку мы не можем сказать, сколько раз мы будем вращаться вокруг цикла.

Edit:

Хорошо, если мы не сделаем что-то вроде

            int spinCount = 0;

            while (pendingB != null && spinCount < 5)
            {
                spinCount++;
                Monitor.Wait(gate, 100);
            }

что даст нам верхнюю оценку времени ожидания. Он решает проблемы с производительностью, когда у нас есть поток только сообщений "B", и работает примерно в то же время, что и ваш исходный код, в то время как он последовательно обнаруживает 100 из 100 сообщений "AB".

Ответ 5

Хорошо, поэтому я попытался создать Fast A и AB, а затем медленный B. Это означает, что мое общее время медленнее (в основном из-за потока только b), но комбинированное время и только время Быстрее. Вот результаты:

A only: 00:00:00.3975499
Combine: 00:00:00.4234934
B only: 00:00:02.0079422
Total: 00:00:02.8314751
0:  0
A:  700
B:  200
AB: 100
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

Здесь код:

    class MessageWrapper
    {
        object bMessageLock = new object();
        object pendingBLock = new object();
        int? pendingB;

        ManualResetEvent gateOpen = new ManualResetEvent(true); // Gate is open initially.


        private bool IsGateOpen()
        {
            return gateOpen.WaitOne(0);
        }

        private void OpenGate()
        {
            gateOpen.Set();
        }

        private void CloseGate()
        {
            gateOpen.Reset();
        }


        public Message WrapA(int a, int millisecondsTimeout)
        {
            // check if the gate is open. Use WaitOne(0) to return immediately.
            if (IsGateOpen())
            {
                return new Message(a, null);
            }
            else
            {
                // This extra lock is to make sure that we don't get stale b's.
                lock (pendingBLock)
                {
                    // and reopen the gate.
                    OpenGate();

                    // there is a waiting b
                    // Send combined message
                    var message = new Message(a, pendingB);

                    pendingB = null;

                    return message;
                }
            }
        }

        public Message WrapB(int b, int millisecondsTimeout)
        {

            // Remove this if you don't have overlapping B's
            var timespentInLock = Stopwatch.StartNew();

            lock (bMessageLock) // Only one B message can be sent at a time.... may need to fix this.
            {
                pendingB = b;

                // Close gate
                CloseGate();


                // Wait for the gate to be opened again (meaning that the message has been sent)
                if (timespentInLock.ElapsedMilliseconds < millisecondsTimeout && 
                    gateOpen.WaitOne(millisecondsTimeout - (int)timespentInLock.ElapsedMilliseconds)) 
                // If you don't have overlapping b use this clause instead.
                //if (gateOpen.WaitOne(millisecondsTimeout)) 
                {
                    lock (pendingBLock)
                    {
                        // Gate was opened, so combined message was sent.
                        return null;
                    }
                }
                else
                {
                    // Timeout expired, so send b-only message.
                    lock (pendingBLock)
                    {
                        // reopen gate.
                        OpenGate();
                        pendingB = null;
                        return new Message(null, b);
                    }
                }
            }
        }


    }

Основная работа выполняется с использованием руководства reset. Идея состоит в том, что если ворота открыты, вы можете свободно отправить A. Когда приходит "b", вы закрываете ворота и заставляете A комбинировать его. Должен сказать, что наличие одного поля pendingB несколько ограничивает эту операцию. Наличие только одной переменной означает, что только один поток может хранить ее b в pendingB. Вот почему у меня есть дополнительный bMessageLock.

Кроме того, необходимо контролировать доступ к этой переменной, поэтому pendingBLock.

В этом коде могут быть ошибки, но насколько я его тестирую, я все равно получаю все 100 сообщений.

Наконец, я включил проверку против времени ожидания WrapB. Первоначально WrapB просто стоял в очереди, занимая в общей сложности 200 секунд. Если у вас есть перекрывающиеся вызовы, вы можете добавить чек. Если вы не возражаете против их очереди, используйте вместо этого более простой код.

Ответ 6

Ну, моя первая идее будет иметь семафор, который также относится к приоритету, но, возможно, этот пост даст вам более глубокое понимание .Net Mutex Question

В принципе, идее будет иметь некоторый способ определить приоритеты двух типов событий, чтобы события типа B могли работать как можно быстрее, если не были получены события типа A.

Я понимаю, что это может быть неправильным решением для вас из-за вашего третьего ограничения, что никакой механизм синхронизации, отличный от Gate, доступен, но я надеюсь, что я могу указать вам в правильном направлении.

Ответ 7

Это эскиз подхода, который улучшает справедливость - это будет означать, что все B -передачи подвержены задержке до 100 мс. Я не знаю, однако, если он соответствует вашим ограничениям.

  • В глобальном контексте используйте один объект MessageSender типа IMessageSender
  • Существуют две реализации IMessageSender, а именно DefaultMessageSender и BWrappingMessageSender (которая сохраняет значение B)

Послание отправителей сообщений выглядит следующим образом:

  • DefaultMessageSender при попытке отправить A: просто отправляет его
  • DefaultMessageSender при попытке отправить B: переключает глобальный MessageSender на новый BWrappingMessageSender, который знает только что переданное значение B

  • BWrappingMessageSender при попытке отправить A: отправляет AB с переданным A и его собственным B и переключает глобальный MessageSender на DefaultMessageSender

  • BWrappingMessageSender при попытке отправить B: отправляет B со своим собственным B и переключает глобальный MessageSender на новый BWrappingMessageSender, который знает только что переданное значение B

То, что я не приколол, - это способ, который недавно созданный BWrappingMessageSender знает, чтобы отправить обычную B 100 мс после создания, если ей не было в это время сказано что-либо сделать.

Ответ 8

Вот мое решение после некоторых экспериментов:

  • Если одноэлементная очередь пуста, мы берем ее.
  • Если место уже занято, мы вежливо подталкиваем пассажира двигаться дальше, немного подождать и повторить попытку.
  • Если кто-то грубит и захватывает место, пока мы ждем, мы переходим в очередь и двигаемся дальше.

код:

Message WrapA(int a, int millisecondsTimeout)
{
    bool lockTaken = false;
    int? b = null;

    try
    {
        Monitor.TryEnter(gate, millisecondsTimeout, ref lockTaken);
        if (lockTaken)
        {
            if (pendingB != null)
            {
                b = pendingB;
                pendingB = null;
                Monitor.Pulse(gate);
            }
        }
    }
    finally
    {
        if (lockTaken)
        {
            Monitor.Exit(gate);
        }
    }

    return new Message(a, b);
}

Message WrapB(int b, int millisecondsTimeout)
{
    bool lockTaken = false;

    try
    {
        TimeoutHelper timeout = new TimeoutHelper(millisecondsTimeout);
        Monitor.TryEnter(gate, timeout.RemainingTime(), ref lockTaken);
        if (lockTaken)
        {
            if (pendingB == null)
            {
                pendingB = b;
                Monitor.Wait(gate, timeout.RemainingTime());
                if (pendingB == null) return null;
                pendingB = null;
            }
            else
            {
                Monitor.Pulse(gate);
                try { }
                finally { lockTaken = false; Monitor.Exit(gate); }
                Thread.Sleep(1);
                Monitor.TryEnter(gate, timeout.RemainingTime(), ref lockTaken);
                if (lockTaken)
                {
                    if (pendingB == null)
                    {
                        pendingB = b;
                        Monitor.Wait(gate, timeout.RemainingTime());
                        if (pendingB == null) return null;
                        pendingB = null;
                    }
                }
            }
        }
    }
    finally
    {
        if (lockTaken)
        {
            Monitor.Exit(gate);
        }
    }

    return new Message(null, b);
}

Ответ 9

Не уверен, что он делает то, что вы хотите, но вот мое предложение. Он в принципе передает все сообщения B в A, когда это возможно, и проверяет, что сообщение было отправлено в конце концов:

class MessageWrapper
{
    object gate = new object();
    int? pendingB;

    public Message WrapA(int a, int millisecondsTimeout)
    {
        int? b;

        lock (gate)
        {
            b = pendingB;
            pendingB = null;
            Thread.Sleep(1); // yield. 1 seems the best value after some testing
        }

        return new Message(a, b);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        int? bb = b;

        lock (gate)
        {
            if (pendingB == null)
            {
                pendingB = b;
                bb = null;
            }
        }

        Thread.Sleep(3);

        if (bb == null)
        {
            lock (gate)
            {
                if (pendingB != null)
                {
                    bb = pendingB;
                    pendingB = null;
                }
            }
        }
        return new Message(null, bb);
    }
}

Ответ 10

Вот еще одна попытка. Подходом является ожидание генерации события A для присоединения к событию B вместо ожидания события B для привязки к событию A.

object gate = new object();
int? pendingA;

public Message WrapA(int a, int millisecondsTimeout)
{
    bool queued = false;

    lock (gate)
    {
        if (pendingA == null)
        {
            queued = true;
            pendingA = a;
            Monitor.Pulse(gate);
        }
    }

    if (queued)
    {
        Thread.Sleep(3);
        lock (gate)
        {
            if (pendingA == null)
                return null;

            a = pendingA.Value;
            pendingA = null;
        }
    }

    return new Message(a, null);
}

public Message WrapB(int b, int millisecondsTimeout)
{
    int? a;

    lock (gate)
    {
        if (pendingA == null)
            Monitor.Wait(gate, millisecondsTimeout);

        a = pendingA;
        pendingA = null;
    }

    return new Message(a, b);
}

Ответ 11

После трех часов попыток мне удалось получить следующие результаты:

00:00:01.8577304
0:  0
A:  741
B:  241
AB: 59
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
Total: 1100

Мой метод:

(1) Всякий раз, когда есть сообщение B (теперь называется B), и пока еще нет ожидания B, он будет помещен в очередь. Если в течение данного тайм-аута нет другого пакета, он отправит сообщение. (2) Когда в очереди есть B, он будет отбрасывать первый B в очереди и отправит это сообщение. Это необходимо для обеспечения справедливости. Новый B, который отправляется, будет следовать той же ситуации, что и ситуация 1 (он будет поставлен в очередь и отправлен в течение заданного промежутка времени). (3) Когда есть сообщение A (теперь называется A), и там нет ожидающего B, A будет отправлено немедленно. Никакого фактического ожидания не выполняется. (4) При отправке A и там B в очереди, он "украдет" его из очереди. Оба сообщения обернуты и отправляются вместе. Поскольку B ждет отправки в другой поток, и A украл его, нам нужна нулевая проверка. A уведомит B, но B замечает, что ему нечего посылать. B вернет значение null.

Чтобы выполнить это в коде:

public class MessageWrapper
{
    readonly object _gate = new object();
    int? _pendingB;

    public Message WrapA(int a, int millisecondsTimeout)
    {
        int? currentB;

        lock (_gate)
        {
            currentB = _pendingB;
            _pendingB = null;

            Monitor.Pulse(_gate); // B stolen, get rid of waiting threads
        }

        return new Message(a, currentB);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        lock (_gate)
        {
            if (_pendingB != null)
            {
                var currentB = _pendingB;
                _pendingB = b;

                Monitor.Pulse(_gate); // release for fairness
                Monitor.Wait(_gate, millisecondsTimeout); // wait for fairness

                return new Message(null, currentB);
            }
            else
            {
                _pendingB = b;

                Monitor.Pulse(_gate); // release for fairness
                Monitor.Wait(_gate, millisecondsTimeout); // wait for A

                if (_pendingB == null) return null;

                var currentB = _pendingB;
                _pendingB = null;
                return new Message(null, currentB);
            }
        }
    }
}

Ответ 12

Большая проблема. Мне очень понравилось проводить некоторое время на этом. Решение, которое я использовал, в 4 раза превышало количество совпадений, с которыми столкнулась ваша оригинальная проблема на моем компьютерном оборудовании.

Возможно, кто-то, кто более осведомлен, чем я с монитором и замками, может улучшить это.

  • Отпустите другой поток, когда выполняется совпадение, вместо того, чтобы этот поток выполнял полный сон, чтобы вернуть нуль в конце. Возможно, это действительно не так дорого. Чтобы решить эту проблему, я ввел AutoResetEvent, но по причинам, которые я не понимаю, AutoResetEvent не действует так, как я предполагал, и уменьшает совпадения от 100 до 70.

  • Окончательный тайм-аут потоков может быть улучшен с тех пор, как с его помощью он все равно должен пройти оспариваемую блокировку.

Он полностью соответствует требованиям:

  • Все процессы будут завершены в течение указанного периода времени (последняя блокировка может добавить несколько циклов в зависимости от того, насколько оспаривается блокировка).
  • Отправка выполняется вне замков.
  • Синхронизирует с использованием gate
  • Дополнительные таймеры
  • Предпочтения и потоки обрабатываются одинаково.

Результаты исходных вопросов:

  • Время: 4,5 секунды
  • A: 773
  • B: 273
  • AB: 27

Результаты этого класса:

  • Время: 5.4 секунды
  • A: 700
  • B: 300
  • AB: 100

    class MessageWrapper
    {
    object gate = new object();
    int EmptyThreadsToReleaseA = 0;
    int EmptyThreadsToReleaseB = 0;
    Queue<int> queueA = new Queue<int>();
    Queue<int> queueB = new Queue<int>();
    AutoResetEvent EmptyThreadEventA = new AutoResetEvent(false);
    AutoResetEvent EmptyThreadEventB = new AutoResetEvent(false);
    
    public Message WrapA(int a, int millisecondsTimeout)
    {
        lock (gate)
        {
            if (queueB.Count > 0)
            {
                Interlocked.Increment(ref EmptyThreadsToReleaseB);
                EmptyThreadEventB.Set();
                return new Message(a, queueB.Dequeue());
            }
            else
            {
                queueA.Enqueue(a);
            }
        }
    
        System.Threading.Thread.Sleep(millisecondsTimeout);
        //EmptyThreadEventA.WaitOne(millisecondsTimeout);
    
        lock (gate)
        {
            if (EmptyThreadsToReleaseA > 0)
            {
                Interlocked.Decrement(ref EmptyThreadsToReleaseA);
                return null;
            }
    
            return new Message(queueA.Dequeue(), null);
        }
    }
    
    public Message WrapB(int b, int millisecondsTimeout)
    {
        lock (gate)
        {
            if (queueA.Count > 0)
            {
                Interlocked.Increment(ref EmptyThreadsToReleaseA);
                EmptyThreadEventA.Set();
                return new Message(queueA.Dequeue(), b);
            }
            else
            {
                queueB.Enqueue(b);
            }
        }
    
        System.Threading.Thread.Sleep(millisecondsTimeout);
        //EmptyThreadEventB.WaitOne(millisecondsTimeout);
    
        lock (gate)
        {
            if (EmptyThreadsToReleaseB > 0)
            {
                Interlocked.Decrement(ref EmptyThreadsToReleaseB);
                return null;
            }
    
            return new Message(null, queueB.Dequeue());
        }
    }
    }
    

Ответ 13

Я пытался избежать ненужных блокировок, особенно для событий типа A. Также я внес некоторые изменения в логику класса-оболочки. Я обнаружил, что было бы удобнее отправлять сообщения непосредственно из этого класса, а не просто возвращать сообщения, потому что в моей реализации один вызов SendB мог бы отправить два сообщения B. Я добавил несколько пояснительных комментариев в код

public class MessageWrapper
{
    private readonly object _gate = new object();
    private object _pendingB;

    public void SendA(int a, int millisecondsTimeout, Action<Message> send)
    {
        var b = Interlocked.Exchange<object>(ref _pendingB, null);

        send(new Message(a, (int?)b));

        // this code will just release any pending "assure that B was sent" threads.
        // but everything works fine even without it
        lock (_gate)
        {
            Monitor.PulseAll(_gate);
        }
    }

    public void SendB(int b, int millisecondsTimeout, Action<Message> send)
    {
        // needed for Interlocked to function properly and to be able to chack that exatly this b event was sent.
        var boxedB = (object)(int?)b;

        // excange currently pending B event with newly arrived one
        var enqueuedB = Interlocked.Exchange(ref _pendingB, boxedB);

        if (enqueuedB != null)
        {
            // if there was some pending B event then just send it.
            send(new Message(null, (int?)enqueuedB));
        }

        // now we have to wait up to millisecondsTimeout to ensure that our message B was sent
        lock (_gate)
        {
            // release any currently waiting threads.
            Monitor.PulseAll(_gate);

            if (Monitor.Wait(_gate, millisecondsTimeout))
            {
                // if we there pulsed, then we have nothing to do, as our event was already sent 
                return;
            }
        }

        // check whether our event is still pending 
        enqueuedB = Interlocked.CompareExchange(ref _pendingB, null, boxedB);

        if (ReferenceEquals(enqueuedB, boxedB))
        {
            // if so, then just send it.
            send(new Message(null, (int?)enqueuedB));
        }
    }
}

Также я исправил некоторые изменения в вашем тестовом классе, по одной из причин, о которых я упомянул в комментариях, - я добавил событие синхронизации ко всем тестовым потокам для случая, когда мы тестируем предложение AB. Также я уменьшил количество одновременно работающих потоков от 500 в вашей версии до 20 (это все для предложения AB). Тем не менее вызовы во всех этих потоках смещаются на число потоков (переданных как параметр в методе начала потока), поэтому я надеюсь, что тест по-прежнему весьма уместен.

public static class Program
{
    private static int _counter0 = 0;
    private static int _counterA = 0;
    private static int _counterB = 0;
    private static int _counterAb = 0;
    private static object _lastA;
    private static object _lastB;

    private static object _firstA;
    private static object _firstB;

    public static void Main(string[] args)
    {
        var wrapper = new MessageWrapper();
        var sw = Stopwatch.StartNew();

        var threadsCount = 10;
        var a0called = 40;

        // Only A events
        var t0 = Start(threadsCount, a0called, 7, 1000, wrapper.SendA);
        Join(t0);

        var aJointCalled = 40;
        var bJointCalled = 10;

        var syncEvent = new CountdownEvent(threadsCount + threadsCount);
        _firstA = null;
        _firstB = null;
        // A and B events
        var t1 = Start(threadsCount, aJointCalled, 7, 1000, wrapper.SendA, syncEvent);
        var t2 = Start(threadsCount, bJointCalled, 19, 1000, wrapper.SendB, syncEvent);
        Join(t1);
        Join(t2);
        var lastA = _lastA;
        var lastB = _lastB;

        var b0called = 20;

        // Only B events
        var t3 = Start(threadsCount, b0called, 7, 1000, wrapper.SendB);
        Join(t3);

        Console.WriteLine(sw.Elapsed);

        Console.WriteLine("0:  {0}", _counter0);
        Console.WriteLine("A:  {0}", _counterA);
        Console.WriteLine("B:  {0}", _counterB);
        Console.WriteLine("AB: {0}", _counterAb);

        Console.WriteLine(
            "Generated A: {0}, Sent A: {1}",
            (threadsCount * a0called) + (threadsCount * aJointCalled),
            _counterA + _counterAb);
        Console.WriteLine(
            "Generated B: {0}, Sent B: {1}",
            (threadsCount * bJointCalled) + (threadsCount * b0called),
            _counterB + _counterAb);

        Console.WriteLine("First A was sent on {0: MM:hh:ss ffff}", _firstA);
        Console.WriteLine("Last A was sent on {0: MM:hh:ss ffff}", lastA);
        Console.WriteLine("First B was sent on {0: MM:hh:ss ffff}", _firstB);
        Console.WriteLine("Last B was sent on {0: MM:hh:ss ffff}", lastB);

        Console.ReadLine();
    }

    private static void SendMessage(Message m)
    {
        if (m != null)
        {
            if (m.A != null)
            {
                if (m.B != null)
                {
                    Interlocked.Increment(ref _counterAb);
                }
                else
                {
                    Interlocked.Increment(ref _counterA);
                    Interlocked.Exchange(ref _lastA, DateTime.Now);
                    Interlocked.CompareExchange(ref _firstA, DateTime.Now, null);
                }
            }
            else if (m.B != null)
            {
                Interlocked.Increment(ref _counterB);
                Interlocked.Exchange(ref _lastB, DateTime.Now);
                Interlocked.CompareExchange(ref _firstB, DateTime.Now, null);
            }
            else
            {
                Interlocked.Increment(ref _counter0);
            }
        }
    }

    private static Thread[] Start(
        int threadCount, 
        int eventCount, 
        int eventInterval, 
        int wrapTimeout, 
        Action<int, int, Action<Message>> wrap,
        CountdownEvent syncEvent = null)
    {
        var threads = new Thread[threadCount];
        for (int i = 0; i < threadCount; i++)
        {
            threads[i] = new Thread(
                (p) =>
                    {
                        if (syncEvent != null)
                        {
                            syncEvent.Signal();
                            syncEvent.Wait();
                        }

                        Thread.Sleep((int)p);

                        for (int j = 0; j < eventCount; j++)
                        {
                            int k = (((int)p) * 1000) + j;
                            Thread.Sleep(eventInterval);
                            wrap(k, wrapTimeout, SendMessage);
                        }
                    });

            threads[i].Start(i);
        }

        return threads;
    }

    private static void Join(params Thread[] threads)
    {
        foreach (Thread t in threads)
        {
            t.Join();
        }
    }
}

P.S. Кроме того, спасибо за действительно интересный вопрос.

Ответ 14

Ограничивающий фактор на этом действительно является ограничениями, в частности, необходимость использования gate для синхронизации и невозможности порождать любые другие таймеры/потоки/задачи и т.д. Это в конечном итоге связывает программное решение с использованием Monitor объектов. Например, решение Christoffer, хотя и элегантно, технически использует синхронизацию, отличную от gate, поскольку она завернута внутри внутренних элементов BlockingCollection. Другое очень инновационное решение, указанное ранее afrischke, также использует синхронизацию, отличную от gate.

После большого количества экспериментов и чтения и исследований я должен сказать, что я не думаю, что эта проблема имеет лучшее (более быстрое?) решение, которое точно соответствует ограничениям. Мне удалось получить предельное усиление производительности, используя следующий механизм. Это не очень хорошо, но оно соответствует требованиям и примерно на 1-5% быстрее, по крайней мере, на моей машине;

object gate = new object();
ConcurrentDictionary<Guid, int> _bBag = new ConcurrentDictionary<Guid, int>();

public Message WrapA(int a, int millisecondsTimeout)
{
    Message message = null;
    int? b = null;
    lock (gate)
    {
        if (!_bBag.IsEmpty)
        {
            Guid key = _bBag.Keys.FirstOrDefault();
            int gotB = 0;
            if (_bBag.TryRemove(key, out gotB))
            {
                b = gotB;
                Monitor.PulseAll(gate);
            }
        }
    }

    message = new Message(a, b);
    return message;
}

public Message WrapB(int b, int millisecondsTimeout)
{
    Guid key = Guid.NewGuid();
    _bBag.TryAdd(key, b);
    lock (gate) { Monitor.Wait(gate, millisecondsTimeout); }
    int storedB = 0;
    if (_bBag.TryRemove(key, out storedB))
    {
        return new Message(null, b);
    }
    return null;    
}

Расслабление требования gate улучшает скорость еще немного, особенно когда он не работает в режиме отладки;

object gate = new object();
ManualResetEvent mre = new ManualResetEvent(false /*initialState*/);
ConcurrentDictionary<Guid, int> _bBag = new ConcurrentDictionary<Guid, int>();

public Message WrapA(int a, int millisecondsTimeout)
{
    Message message = null;
    int? b = null;
    lock (gate)
    {
        if (!_bBag.IsEmpty)
        {
            Guid key = _bBag.Keys.FirstOrDefault();
            int gotB = 0;
            if (_bBag.TryRemove(key, out gotB))
            {
                b = gotB;
                Monitor.PulseAll(gate);
            }
        }
    }

    message = new Message(a, b);
    return message;
}

public Message WrapB(int b, int millisecondsTimeout)
{
    Guid key = Guid.NewGuid();
    _bBag.TryAdd(key, b);
    mre.WaitOne(millisecondsTimeout);    // use a manual reset instead of Monitor
    int storedB = 0;
    if (_bBag.TryRemove(key, out storedB))
    {
        return new Message(null, b);
    }
    return null;
}

В целом, я бы сказал, что у вас уже есть очень тонкое решение, учитывая жесткие требования. Я действительно надеюсь, что я ошибаюсь, и кто-то находит лучшее решение - это будет очень информативно!