Подтвердить что ты не робот

Пул потоков для обработки сообщений параллельно, но сохраняйте порядок в цепочках

Мне нужно обрабатывать сообщения параллельно, но сохраняйте порядок обработки сообщений с тем же идентификатором беседы.

Пример:
Пусть определите такое сообщение:

class Message {
    Message(long id, long conversationId, String someData) {...}
}

Предположим, что сообщения поступают в следующем порядке:
Сообщение (1, 1, "a1" ), Сообщение (2, 2, "a2" ), Сообщение (3, 1, "b1" ), Сообщение (4, 2, "b2" ).

Мне нужно, чтобы сообщение 3 обрабатывалось после сообщения 1, поскольку сообщения 1 и 3 имеют одинаковый идентификатор беседы (аналогично, сообщение 4 должно обрабатываться после 2 по той же причине).
Меня не волнует относительный порядок между, например, 1 и 2, так как они имеют разные идентификаторы разговора.

Я хотел бы как можно больше использовать функциональность java ThreadPoolExecutor, чтобы избежать необходимости замены мертвых потоков вручную в моем коде и т.д.

Обновление: Количество возможных "идентификаторов беседы" не ограничено, и нет времени для разговора. (Я лично не вижу в этом проблемы, так как у меня может быть простое сопоставление с идентификатором conversId с номером рабочего, например, talkId% totalWorkers).

Обновление 2:. Существует одна проблема с решением с несколькими очередями, где номер очереди определяется, например. 'index = Objects.hash(talkId)% total': если для обработки некоторого сообщения требуется много времени, все сообщения с одним и тем же "индексом", но разные "talkId" будут ждать, даже если другие потоки доступны для его обработки. То есть, я считаю, что решения с одной интеллектуальной блокирующей очередью будут лучше, но это всего лишь мнение, я открыт для любого хорошего решения.

Вы видите элегантное решение этой проблемы?

4b9b3361

Ответ 1

Мне нужно было сделать что-то очень похожее некоторое время назад, так что вот адаптация.

(Посмотрите на действие онлайн)

На самом деле это то же самое, что и в базовой базе, но в моем случае ключ был строкой, и, что более важно, набор ключей не увеличивался бесконечно, поэтому мне пришлось добавить "планировщик очистки". Кроме того, это в основном тот же код, поэтому я надеюсь, что я не потерял ничего серьезного в процессе адаптации. Я тестировал его, похоже, что он работает. Это больше, чем другие решения, хотя, возможно, более сложные...

Основная идея:

  • MessageTask обертывает сообщение в Runnable и уведомляет очередь, когда она завершена
  • ConvoQueue: блокировка очереди сообщений для разговора. Действует как предвестник, который гарантирует желаемый порядок. Смотрите это трио в частности: ConvoQueue.runNextIfPossible()MessageTask.run()ConvoQueue.complete() →...
  • MessageProcessor имеет Map<Long, ConvoQueue>, a ExecutorService
  • сообщения обрабатываются любым потоком в исполнителе, ConvoQueue подают ExecutorService и гарантируют порядок сообщений для каждого конвота, но не глобально (поэтому "трудное" сообщение не блокирует обработку других разговоров, в отличие от некоторых другие решения, и это свойство было критически важным в нашем случае - если это не так важно для вас, возможно, более простое решение лучше).
  • очистка с помощью ScheduledExecutorService (занимает 1 поток)

Визуально:

   ConvoQueues              ExecutorService internal queue
                            (shared, but has at most 1 MessageTask per convo)
Convo 1   ########   
Convo 2      #####   
Convo 3    #######                        Thread 1
Convo 4              } →    ####    → {
Convo 5        ###                        Thread 2
Convo 6  #########   
Convo 7      #####   

(Convo 4 is about to be deleted)

Ниже всех классов (MessageProcessorTest можно выполнить непосредственно):

// MessageProcessor.java
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static java.util.concurrent.TimeUnit.SECONDS;

public class MessageProcessor {

    private static final long CLEANUP_PERIOD_S = 10;
    private final Map<Long, ConvoQueue> queuesByConvo = new HashMap<>();
    private final ExecutorService executorService;

    public MessageProcessor(int nbThreads) {
        executorService = Executors.newFixedThreadPool(nbThreads);
        ScheduledExecutorService cleanupScheduler = Executors.newScheduledThreadPool(1);
        cleanupScheduler.scheduleAtFixedRate(this::removeEmptyQueues, CLEANUP_PERIOD_S, CLEANUP_PERIOD_S, SECONDS);
    }

    public void addMessageToProcess(Message message) {
        ConvoQueue queue = getQueue(message.getConversationId());
        queue.addMessage(message);
    }

    private ConvoQueue getQueue(Long convoId) {
        synchronized (queuesByConvo) {
            return queuesByConvo.computeIfAbsent(convoId, p -> new ConvoQueue(executorService));
        }
    }

    private void removeEmptyQueues() {
        synchronized (queuesByConvo) {
            queuesByConvo.entrySet().removeIf(entry -> entry.getValue().isEmpty());
        }
    }

}


// ConvoQueue.java
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;

class ConvoQueue {

    private Queue<MessageTask> queue;
    private MessageTask activeTask;
    private ExecutorService executorService;

    ConvoQueue(ExecutorService executorService) {
        this.executorService = executorService;
        this.queue = new LinkedBlockingQueue<>();
    }

    private void runNextIfPossible() {
        synchronized(this) {
            if (activeTask == null) {
                activeTask = queue.poll();
                if (activeTask != null) {
                    executorService.submit(activeTask);
                }
            }
        }
    }

    void complete(MessageTask task) {
        synchronized(this) {
            if (task == activeTask) {
                activeTask = null;
                runNextIfPossible();
            }
            else {
                throw new IllegalStateException("Attempt to complete task that is not supposed to be active: "+task);
            }
        }
    }

    boolean isEmpty() {
        return queue.isEmpty();
    }

    void addMessage(Message message) {
        add(new MessageTask(this, message));
    }

    private void add(MessageTask task) {
        synchronized(this) {
            queue.add(task);
            runNextIfPossible();
        }
    }

}

// MessageTask.java
public class MessageTask implements Runnable {

    private ConvoQueue convoQueue;
    private Message message;

    MessageTask(ConvoQueue convoQueue, Message message) {
        this.convoQueue = convoQueue;
        this.message = message;
    }

    @Override
    public void run() {
        try {
            processMessage();
        }
        finally {
            convoQueue.complete(this);
        }
    }

    private void processMessage() {
        // Dummy processing with random delay to observe reordered messages & preserved convo order
        try {
            Thread.sleep((long) (50*Math.random()));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(message);
    }

}

// Message.java
class Message {

    private long id;
    private long conversationId;
    private String data;

    Message(long id, long conversationId, String someData) {
        this.id = id;
        this.conversationId = conversationId;
        this.data = someData;
    }

    long getConversationId() {
        return conversationId;
    }

    String getData() {
        return data;
    }

    public String toString() {
        return "Message{" + id + "," + conversationId + "," + data + "}";
    }
}

// MessageProcessorTest.java
public class MessageProcessorTest {
    public static void main(String[] args) {
        MessageProcessor test = new MessageProcessor(2);
        for (int i=1; i<100; i++) {
            test.addMessageToProcess(new Message(1000+i,i%7,"hi "+i));
        }
    }
}

Вывод (для каждого символа заказа кода (2-го поля) сохраняется):

Message{1002,2,hi 2}
Message{1001,1,hi 1}
Message{1004,4,hi 4}
Message{1003,3,hi 3}
Message{1005,5,hi 5}
Message{1006,6,hi 6}
Message{1009,2,hi 9}
Message{1007,0,hi 7}
Message{1008,1,hi 8}
Message{1011,4,hi 11}
Message{1010,3,hi 10}
...
Message{1097,6,hi 97}
Message{1095,4,hi 95}
Message{1098,0,hi 98}
Message{1099,1,hi 99}
Message{1096,5,hi 96}

Тест выше дал мне уверенность в том, чтобы поделиться им, но я немного обеспокоен тем, что мог забыть подробности о патологических случаях. Он работает в производстве в течение многих лет без задержек (хотя с большим количеством кода, который позволяет проверять его вживую, когда нам нужно посмотреть, что происходит, почему определенная очередь требует времени и т.д. - никогда не проблема с самой системой сама по себе, но иногда с обработкой конкретной задачи)

Изменить: нажмите здесь, чтобы проверить онлайн. Альтернативный вариант: скопируйте этот gist в там и нажмите "Скомпилировать и выполнить".

Ответ 2

Не знаете, как вы хотите обрабатывать сообщения. Для удобства каждое сообщение имеет тип Runnable, который является местом выполнения.

Решение для всего этого состоит в том, чтобы иметь число Executor, которые подаются в параллель ExecutorService. Используйте операцию modulo для вычисления, к которому Executor необходимо отправить входящее сообщение. Очевидно, что для той же самой беседы id ее те же Executor, следовательно, у вас есть параллельная обработка, но последовательная для одного и того же идентификатора беседы. Это не гарантирует, что сообщения с разным идентификатором разговора всегда будут выполняться параллельно (в общем, вы ограничены, по крайней мере, количеством физических ядер в вашей системе).

public class MessageExecutor {

    public interface Message extends Runnable {

        long getId();

        long getConversationId();

        String getMessage();

    }

    private static class Executor implements Runnable {

        private final LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();

        private volatile boolean stopped;

        void schedule(Message message) {
            messages.add(message);
        }

        void stop() {
            stopped = true;
        }

        @Override
        public void run() {
            while (!stopped) {
                try {
                    Message message = messages.take();
                    message.run();
                } catch (Exception e) {
                    System.err.println(e.getMessage());
                }
            }
        }
    }

    private final Executor[] executors;
    private final ExecutorService executorService;

    public MessageExecutor(int poolCount) {
        executorService = Executors.newFixedThreadPool(poolCount);
        executors = new Executor[poolCount];

        IntStream.range(0, poolCount).forEach(i -> {
            Executor executor = new Executor();
            executorService.submit(executor);
            executors[i] = executor;
        });
    }

    public void submit(Message message) {
        final int executorNr = Objects.hash(message.getConversationId()) % executors.length;
        executors[executorNr].schedule(message);
    }

    public void stop() {
        Arrays.stream(executors).forEach(Executor::stop);
        executorService.shutdown();
    }
}

Затем вы можете запустить исполнитель сообщений с пулом и отправить ему сообщения.

public static void main(String[] args) {
    MessageExecutor messageExecutor = new MessageExecutor(Runtime.getRuntime().availableProcessors());
    messageExecutor.submit(new Message() {
        @Override
        public long getId() {
            return 1;
        }

        @Override
        public long getConversationId() {
            return 1;
        }

        @Override
        public String getMessage() {
            return "abc1";
        }

        @Override
        public void run() {
            System.out.println(this.getMessage());
        }
    });
    messageExecutor.submit(new Message() {
        @Override
        public long getId() {
            return 1;
        }

        @Override
        public long getConversationId() {
            return 2;
        }

        @Override
        public String getMessage() {
            return "abc2";
        }

        @Override
        public void run() {
            System.out.println(this.getMessage());
        }
    });
    messageExecutor.stop();
}

Когда я запускаю счет пула 2 и отправляю количество сообщений:

Message with conversation id [1] is scheduled on scheduler #[0]
Message with conversation id [2] is scheduled on scheduler #[1]
Message with conversation id [3] is scheduled on scheduler #[0]
Message with conversation id [4] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [1] is scheduled on scheduler #[0]
Message with conversation id [2] is scheduled on scheduler #[1]
Message with conversation id [3] is scheduled on scheduler #[0]
Message with conversation id [3] is scheduled on scheduler #[0]
Message with conversation id [4] is scheduled on scheduler #[1]

Когда столько же сообщений запускается со счетом пула 3:

Message with conversation id [1] is scheduled on scheduler #[2]
Message with conversation id [2] is scheduled on scheduler #[0]
Message with conversation id [3] is scheduled on scheduler #[1]
Message with conversation id [4] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [1] is scheduled on scheduler #[2]
Message with conversation id [2] is scheduled on scheduler #[0]
Message with conversation id [3] is scheduled on scheduler #[1]
Message with conversation id [3] is scheduled on scheduler #[1]
Message with conversation id [4] is scheduled on scheduler #[2]

Сообщения распределяются между пулом Executor:).

РЕДАКТИРОВАТЬ: Executor run() захватывает все Исключения, чтобы гарантировать, что он не сломается, когда одно сообщение не работает.

Ответ 3

Вы действительно хотите, чтобы работа выполнялась последовательно в рамках разговора. Одним из решений было бы синхронизировать мьютекс, который является уникальным для этого разговора. Недостатком этого решения является то, что если разговоры недолговечны и частые начинаются новые разговоры, карта "мьютексов" будет быстро расти.

Для краткости я пропустил выключение исполнителя, фактическую обработку сообщений, обработку исключений и т.д.

public class MessageProcessor {

  private final ExecutorService executor;
  private final ConcurrentMap<Long, Object> mutexes = new ConcurrentHashMap<> ();

  public MessageProcessor(int threadCount) {
    executor = Executors.newFixedThreadPool(threadCount);
  }

  public static void main(String[] args) throws InterruptedException {
    MessageProcessor p = new MessageProcessor(10);
    BlockingQueue<Message> queue = new ArrayBlockingQueue<> (1000);

    //some other thread populates the queue

    while (true) {
      Message m = queue.take();
      p.process(m);
    }
  }

  public void process(Message m) {
    Object mutex = mutexes.computeIfAbsent(m.getConversationId(), id -> new Object());
    executor.submit(() -> {
      synchronized(mutex) {
        //That where you actually process the message
      }
    });
  }
}

Ответ 4

У меня была аналогичная проблема в моем приложении. Мое первое решение сортировало их с помощью java.util.ConcurrentHashMap. Таким образом, в вашем случае это будет ConcurrentHashMap с ключом talkId как ключом и списком сообщений в качестве значения. Проблема заключалась в том, что HashMap слишком большой, занимая слишком много места.

Мое текущее решение: Один поток получает сообщения и сохраняет их в java.util.ArrayList. После приема N сообщений он переводит список во второй поток. Этот поток сортирует сообщения, используя метод ArrayList.sort, с помощью talkId и id. Затем поток проходит через отсортированный список и ищет блоки, которые могут быть обработаны. Каждый блок, который может быть обработан, вынимается из списка. Чтобы обработать блок, вы можете создать runnable с этим блоком и нажать его в службу-исполнитель. Сообщения, которые не могут быть обработаны, остаются в списке и будут проверяться в следующем раунде.

Ответ 5

Для чего это стоит, API Kafka Streams предоставляет большую часть этой функции. Перегородки сохраняют порядок. Это более крупный бай-ин, чем ExecutorService, но может быть интересным, особенно если вы уже используете Kafka.

Ответ 6

Я бы использовал три исполнительных сервиса (один для приема сообщений, один для сортировки сообщений, один для обработки сообщений).

Я бы также использовал одну очередь, чтобы поместить все полученные сообщения и другую очередь с сообщениями, отсортированными и сгруппированными (отсортированные по идентификатору беседы, затем создайте группы сообщений, которые используют один и тот же идентификатор беседы).

Наконец: один поток для приема сообщений, один поток для сортировки сообщений и все остальные потоки, используемые для обработки сообщений.

см. ниже:

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.stream.Collectors;



public class MultipleMessagesExample {


    private static int MAX_ELEMENTS_MESSAGE_QUEUE = 1000;
    private BlockingQueue<Message> receivingBlockingQueue = new LinkedBlockingDeque<>(MAX_ELEMENTS_MESSAGE_QUEUE);
    private BlockingQueue<List<Message>> prioritySortedBlockingQueue = new LinkedBlockingDeque<>(MAX_ELEMENTS_MESSAGE_QUEUE);

    public static void main(String[] args) {


        MultipleMessagesExample multipleMessagesExample = new MultipleMessagesExample();
        multipleMessagesExample.doTheWork();

    }

    private void doTheWork() {
        int totalCores = Runtime.getRuntime().availableProcessors();
        int totalSortingProcesses = 1;
        int totalMessagesReceiverProcess = 1;
        int totalMessagesProcessors = totalCores - totalSortingProcesses - totalMessagesReceiverProcess;

        ExecutorService messagesReceiverExecutorService = Executors.newFixedThreadPool(totalMessagesReceiverProcess);
        ExecutorService sortingExecutorService = Executors.newFixedThreadPool(totalSortingProcesses);
        ExecutorService messageProcessorExecutorService = Executors.newFixedThreadPool(totalMessagesProcessors);

        MessageReceiver messageReceiver = new MessageReceiver(receivingBlockingQueue);
        messagesReceiverExecutorService.submit(messageReceiver);

        MessageSorter messageSorter = new MessageSorter(receivingBlockingQueue, prioritySortedBlockingQueue);
        sortingExecutorService.submit(messageSorter);


        for (int i = 0; i < totalMessagesProcessors; i++) {
            MessageProcessor messageProcessor = new MessageProcessor(prioritySortedBlockingQueue);
            messageProcessorExecutorService.submit(messageProcessor);
        }

    }
}


class Message {
    private Long id;
    private Long conversationId;
    private String someData;

    public Message(Long id, Long conversationId, String someData) {
        this.id = id;
        this.conversationId = conversationId;
        this.someData = someData;
    }


    public Long getId() {
        return id;
    }

    public Long getConversationId() {
        return conversationId;
    }

    public String getSomeData() {
        return someData;
    }
}

class MessageReceiver implements Callable<Void> {
    private BlockingQueue<Message> bloquingQueue;


    public MessageReceiver(BlockingQueue<Message> bloquingQueue) {
        this.bloquingQueue = bloquingQueue;
    }

    @Override
    public Void call() throws Exception {
        System.out.println("receiving messages...");

        bloquingQueue.add(new Message(1L, 1000L, "conversation1 data fragment 1"));
        bloquingQueue.add(new Message(2L, 2000L, "conversation2 data fragment 1"));
        bloquingQueue.add(new Message(3L, 1000L, "conversation1 data fragment 2"));
        bloquingQueue.add(new Message(4L, 2000L, "conversation2 data fragment 2"));

        return null;
    }
}

/**
 * sorts messages. group together same conversation IDs
 */
class MessageSorter implements Callable<Void> {

    private BlockingQueue<Message> receivingBlockingQueue;
    private BlockingQueue<List<Message>> prioritySortedBlockingQueue;
    private List<Message> intermediateList = new ArrayList<>();
    private MessageComparator messageComparator = new MessageComparator();


    private static int BATCH_SIZE = 10;

    public MessageSorter(BlockingQueue<Message> receivingBlockingQueue, BlockingQueue<List<Message>> prioritySortedBlockingQueue) {
        this.receivingBlockingQueue = receivingBlockingQueue;
        this.prioritySortedBlockingQueue = prioritySortedBlockingQueue;

    }

    @Override
    public Void call() throws Exception {

        while (true) {
            boolean messagesReceivedQueueIsEmpty = false;
            intermediateList = new ArrayList<>();
            for (int i = 0; i < BATCH_SIZE; i++) {
                try {
                    Message message = receivingBlockingQueue.remove();
                    intermediateList.add(message);
                } catch (NoSuchElementException e) {
                    // this is expected when queue is empty
                    messagesReceivedQueueIsEmpty = true;
                    break;
                }

            }
            Collections.sort(intermediateList, messageComparator);

            if (intermediateList.size() > 0) {
                Map<Long, List<Message>> map = intermediateList.stream().collect(Collectors.groupingBy(message -> message.getConversationId()));
                map.forEach((k, v) -> prioritySortedBlockingQueue.add(new ArrayList<>(v)));
                System.out.println("new batch of messages was sorted and is ready to be processed");
            }

            if (messagesReceivedQueueIsEmpty) {
                System.out.println("message processor is waiting for messages...");
                Thread.sleep(1000);  // no need to use CPU if there are no messages to process
            }
        }
    }

}


/**
 * process groups of messages with same conversationID
 */
class MessageProcessor implements Callable<Void> {

    private BlockingQueue<List<Message>> prioritySortedBlockingQueue;

    public MessageProcessor(BlockingQueue<List<Message>> prioritySortedBlockingQueue) {
        this.prioritySortedBlockingQueue = prioritySortedBlockingQueue;
    }

    @Override
    public Void call() throws Exception {
        while (true) {
            List<Message> messages = prioritySortedBlockingQueue.take();  // blocks if no message is available
            messages.stream().forEach(m -> processMessage(m));
        }
    }

    private void processMessage(Message message) {
        System.out.println(message.getId() + " - " + message.getConversationId() + " - " + message.getSomeData());
    }
}


class MessageComparator implements Comparator<Message> {

    @Override
    public int compare(Message o1, Message o2) {
        return (int) (o1.getConversationId() - o2.getConversationId());
    }
}

Ответ 7

создать класс исполнителя, расширяющий Executor.On submit, вы можете ввести код, как показано ниже.

public void execute(Runnable command) {

        final int key= command.getKey();
         //Some code to check if it is runing
        final int index = key != Integer.MIN_VALUE ? Math.abs(key) % size : 0;
        workers[index].execute(command);
    }

Создайте рабочего с очередью, чтобы, если вы хотите, чтобы какая-то задача выполнялась последовательно, запустите.

private final AtomicBoolean scheduled = new AtomicBoolean(false);

private final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(maximumQueueSize);

public void execute(Runnable command) {
    long timeout = 0;
    TimeUnit timeUnit = TimeUnit.SECONDS;
    if (command instanceof TimeoutRunnable) {
        TimeoutRunnable timeoutRunnable = ((TimeoutRunnable) command);
        timeout = timeoutRunnable.getTimeout();
        timeUnit = timeoutRunnable.getTimeUnit();
    }

    boolean offered;
    try {
        if (timeout == 0) {
            offered = workQueue.offer(command);
        } else {
            offered = workQueue.offer(command, timeout, timeUnit);
        }
    } catch (InterruptedException e) {
        throw new RejectedExecutionException("Thread is interrupted while offering work");
    }

    if (!offered) {
        throw new RejectedExecutionException("Worker queue is full!");
    }

    schedule();
}

private void schedule() {
    //if it is already scheduled, we don't need to schedule it again.
    if (scheduled.get()) {
        return;
    }

    if (!workQueue.isEmpty() && scheduled.compareAndSet(false, true)) {
        try {
            executor.execute(this);
        } catch (RejectedExecutionException e) {
            scheduled.set(false);
            throw e;
        }
    }
}

public void run() {
    try {
        Runnable r;
        do {
            r = workQueue.poll();
            if (r != null) {
                r.run();
            }
        }
        while (r != null);
    } finally {
        scheduled.set(false);
        schedule();
    }
}