У меня есть куча потоков, которые генерируют события типа A
и тип B
.
Моя программа принимает эти события, обматывает их в сообщении и отправляет их по сети. Сообщение A может содержать одно событие A
, событие B
или одно событие A
и одно событие B
:
SendMessage(new Message(a: 1, b: null));
SendMessage(new Message(a: null, b: 2 ));
SendMessage(new Message(a: 3, b: 4 ));
События типа A
происходят довольно часто, а события типа B
встречаются гораздо реже. Таким образом, когда поток генерирует событие B
, моя программа ждет бит, чтобы узнать, генерирует ли другой поток событие A
и объединяет событие A
и событие B
, если это возможно.
Вот мой код:
object gate = new object();
int? pendingB;
Message WrapA(int a, int millisecondsTimeout)
{
int? b;
lock (gate)
{
b = pendingB;
pendingB = null;
Monitor.Pulse(gate);
}
return new Message(a, b);
}
Message WrapB(int b, int millisecondsTimeout)
{
lock (gate)
{
if (pendingB == null)
{
pendingB = b;
Monitor.Wait(gate, millisecondsTimeout);
if (pendingB != b) return null;
pendingB = null;
}
}
return new Message(null, b);
}
Это работает до сих пор. Однако есть две проблемы:
-
Если есть много событий
A
и много событийB
, алгоритм не очень эффективен: только определенный процент событийB
привязан к событиямA
, даже если там достаточноA
событий. -
Если в течение некоторого времени не генерируются события
A
(необычные, но не невозможные), алгоритм совершенно несправедлив: один поток, генерирующий событияB
, должен ждать каждый раз, тогда как все остальные потоки могут отправьте свои событияB
.
Как я могу повысить эффективность и справедливость алгоритма?
<суб > Ограничения:
& Пули; WrapA
и WrapB
должны заканчиваться в течение короткого, детерминированного количества времени.
& Пули; SendMessage
должен вызываться вне любых замков.
& Пули; Механизм синхронизации отсутствует, кроме gate
.
& Пули; Нет дополнительных потоков, задач, таймеров и т.д.
& Пули; Поскольку события типа A
происходят так часто в обычном случае, ожидание в WrapB
выполняется нормально.
Суб >
Вот тестовая программа, которая может использоваться в качестве эталона:
public static class Program
{
static int counter0 = 0;
static int counterA = 0;
static int counterB = 0;
static int counterAB = 0;
static void SendMessage(Message m)
{
if (m != null)
if (m.a != null)
if (m.b != null)
Interlocked.Increment(ref counterAB);
else
Interlocked.Increment(ref counterA);
else
if (m.b != null)
Interlocked.Increment(ref counterB);
else
Interlocked.Increment(ref counter0);
}
static Thread[] Start(int threadCount, int eventCount,
int eventInterval, int wrapTimeout, Func<int, int, Message> wrap)
{
Thread[] threads = new Thread[threadCount * eventCount];
for (int i = 0; i < threadCount; i++)
{
for (int j = 0; j < eventCount; j++)
{
int k = i * 1000 + j;
int l = j * eventInterval + i;
threads[i * eventCount + j] = new Thread(() =>
{
Thread.Sleep(l);
SendMessage(wrap(k, wrapTimeout));
});
threads[i * eventCount + j].Start();
}
}
return threads;
}
static void Join(params Thread[] threads)
{
for (int i = 0; i < threads.Length; i++)
{
threads[i].Join();
}
}
public static void Main(string[] args)
{
var wrapper = new MessageWrapper();
var sw = Stopwatch.StartNew();
// Only A events
var t0 = Start(10, 40, 7, 1000, wrapper.WrapA);
Join(t0);
// A and B events
var t1 = Start(10, 40, 7, 1000, wrapper.WrapA);
var t2 = Start(10, 10, 19, 1000, wrapper.WrapB);
Join(t1);
Join(t2);
// Only B events
var t3 = Start(10, 20, 7, 1000, wrapper.WrapB);
Join(t3);
Console.WriteLine(sw.Elapsed);
Console.WriteLine("0: {0}", counter0);
Console.WriteLine("A: {0}", counterA);
Console.WriteLine("B: {0}", counterB);
Console.WriteLine("AB: {0}", counterAB);
Console.WriteLine("Generated A: {0}, Sent A: {1}",
10 * 40 + 10 * 40, counterA + counterAB);
Console.WriteLine("Generated B: {0}, Sent B: {1}",
10 * 10 + 10 * 20, counterB + counterAB);
}
}