Подтвердить что ты не робот

Не используйте один и тот же сокет между двумя потоками одновременно.

У меня есть около 60 сокетов и 20 потоков, и я хочу, чтобы каждый поток работал в разных сокетах каждый раз, поэтому я не хочу делиться одним и тем же сокетом между двумя потоками.

В моем классе SocketManager у меня есть фоновый поток, который запускается каждые 60 секунд и вызывает метод updateLiveSockets(). В методе updateLiveSockets() я повторяю все сокеты, которые у меня есть, а затем начинаю их пинговать один за другим, вызывая метод send класса SendToQueue и основываясь на ответе. Я отмечаю их как живых или мертвых. В методе updateLiveSockets() мне всегда нужно перебирать все сокеты и пинговать их, чтобы проверить, являются ли они живыми или мертвыми.

Теперь все потоки чтения вызовут метод getNextSocket() класса SocketManager одновременно, чтобы получить следующий доступный в реальном времени сокет для отправки бизнес-сообщения в этот сокет. Поэтому у меня есть два типа сообщений, которые я отправляю по сокету:

  • Одно сообщение ping в сокете. Это отправляется только из потока таймера, вызывающего метод updateLiveSockets() в классе SocketManager.
  • Другим является сообщение business в сокете. Это делается в классе SendToQueue.

Итак, если поток нити пингует сокет, чтобы проверить, живут ли они или нет, ни один другой бизнес-поток не должен использовать этот сокет. Аналогично, если бизнес-поток использует сокет для отправки данных на него, то нить нити не должен пинговать этот сокет. И это касается всего сокета. Но мне нужно убедиться, что в методе updateLiveSockets мы пинговаем все доступные сокеты, когда мой фоновый поток запускается, чтобы мы могли выяснить, какой сокет жив или мертв.

Ниже мой класс SocketManager:

public class SocketManager {
  private static final Random random = new Random();
  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
  private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
      new ConcurrentHashMap<>();
  private final ZContext ctx = new ZContext();

  // ...

  private SocketManager() {
    connectToZMQSockets();
    scheduler.scheduleAtFixedRate(new Runnable() {
      public void run() {
        updateLiveSockets();
      }
    }, 60, 60, TimeUnit.SECONDS);
  }

  // during startup, making a connection and populate once
  private void connectToZMQSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
      liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
    }
  }

  private List<SocketHolder> connect(List<String> paddes, int socketType) {
    List<SocketHolder> socketList = new ArrayList<>();
    // ....
    return socketList;
  }

  // this method will be called by multiple threads concurrently to get the next live socket
  // is there any concurrency or thread safety issue or race condition here?
  public Optional<SocketHolder> getNextSocket() {
    for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
      Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
      if (liveSocket.isPresent()) {
        return liveSocket;
      }
    }
    return Optional.absent();
  }

  private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
    if (!listOfEndPoints.isEmpty()) {
      // The list of live sockets
      List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
      for (SocketHolder obj : listOfEndPoints) {
        if (obj.isLive()) {
          liveOnly.add(obj);
        }
      }
      if (!liveOnly.isEmpty()) {
        // The list is not empty so we shuffle it an return the first element
        return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
      }
    }
    return Optional.absent();
  }

  // runs every 60 seconds to ping all the available socket to make sure whether they are alive or not
  private void updateLiveSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) {
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        // pinging to see whether a socket is live or not
        boolean isLive = SendToQueue.getInstance().send(message.getAddress(), message.getEncodedRecords(), socket);
        SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
        liveUpdatedSockets.add(zmq);
      }
      liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
    }
  }
}

И вот мой класс SendToQueue:

  // this method will be called by multiple reader threads (around 20) concurrently to send the data
  public boolean sendAsync(final long address, final byte[] encodedRecords) {
    PendingMessage m = new PendingMessage(address, encodedRecords, true);
    cache.put(address, m);
    return doSendAsync(m);
  }

  private boolean doSendAsync(final PendingMessage pendingMessage) {
    Optional<SocketHolder> liveSocket = SocketManager.getInstance().getNextSocket();
    if (!liveSocket.isPresent()) {
      // log error
      return false;
    }       
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // send data on a socket LINE A
      return msg.send(liveSocket.get().getSocket());
    } finally {
      msg.destroy();
    }
  }

  public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m, socket)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      cache.invalidate(address);
    }
  }

Заявление о проблемах

Теперь, как вы можете видеть выше, я использую один и тот же сокет между двумя потоками. Кажется, что getNextSocket() в классе SocketManager может вернуть 0MQ socket в Thread A. Одновременно, timer thread может получить доступ к тому же 0MQ socket, чтобы выполнить ping. В этом случае Thread A и timer thread мутируют то же самое 0MQ socket, что может привести к проблемам. Поэтому я пытаюсь найти способ, чтобы я мог запретить разные потоки отправлять данные в один и тот же сокет одновременно и удалять мои данные.

Одним из решений, о котором я могу думать, является использование synchronization в сокете при отправке данных, но если многие потоки используют один и тот же сокет, ресурсы не используются. Более того, если msg.send(socket); заблокирован (технически он не должен), все потоки, ожидающие этого сокета, блокируются. Поэтому я предполагаю, что может быть лучший способ гарантировать, что каждый поток использует другой отдельный роутер одновременно, вместо синхронизации на конкретном сокете.

4b9b3361

Ответ 1

Итак, я пытаюсь найти способ, чтобы я мог запретить разные потоки отправлять данные в один и тот же сокет одновременно и удалять мои данные.

Есть, конечно, несколько разных способов сделать это. Для меня это похоже на BlockingQueue, это правильная вещь для использования. Деловые потоки будут принимать сокет из очереди и будут гарантированы, что никто другой не будет использовать этот сокет.

private final BlockingQueue<SocketHolder> socketHolderQueue = new LinkedBlockingQueue<>();
...
public Optional<SocketHolder> getNextSocket() {
   SocketHolder holder = socketHolderQueue.poll();
   return holder;
}
...
public void finishedWithSocket(SocketHolder holder) {
   socketHolderQueue.put(holder);
}

Я думаю, что синхронизация в сокете не является хорошей идеей по причинам, которые вы упомянули - поток ping будет блокировать бизнес-поток.

Существует несколько способов обработки логики потока ping. Я бы сохранил ваш Socket с последним временем использования, а затем ваш поток ping мог так часто брать каждый из сокетов из одного и того же BlockingQueue, проверять его и помещать каждый обратно в конец очереди после тестирования.

public void testSockets() {
   // one run this for as many sockets as are in the queue
   int numTests = socketHolderQueue.size();
   for (int i = 0; i < numTests; i++) {
      SocketHolder holder = socketHolderQueue.poll();
      if (holder == null) {
         break;
      }
      if (socketIsOk(socketHolder)) {
          socketHolderQueue.put(socketHolder);
      } else {
          // close it here or something
      }
   }
}

Вы также можете иметь код getNextSocket(), который отменяет потоки из очереди, проверять таймер и помещать их в тестовую очередь для используемого потока ping, а затем вывести следующий из очереди. Деловые потоки никогда не будут использовать один и тот же сокет одновременно с пинговым потоком.

В зависимости от того, когда вы хотите протестировать сокеты, вы также можете reset таймер после того, как бизнес-поток вернет его в очередь, чтобы поток ping проверил сокет после X секунд без использования.

Ответ 2

Похоже, вам стоит рассмотреть возможность использования функции try-with-resource. У вас есть класс SocketHolder или Option, реализующий интерфейс AutoCloseable. Например, допустим, что Option реализует этот интерфейс. Метод close close затем добавит экземпляр в контейнер. Я создал простой пример, который показывает, что я имею в виду. Он не является полным, но дает вам представление о том, как реализовать это в вашем коде.

    public class ObjectManager implements AutoCloseable {

    private static class ObjectManagerFactory {
        private static ObjectManager objMgr = new ObjectManager();
    }

    private ObjectManager() {}

    public static ObjectManager getInstance() { return ObjectManagerFactory.objMgr; }

    private static final int SIZE = 10;

    private static BlockingQueue<AutoCloseable> objects = new LinkedBlockingQueue<AutoCloseable>();

    private static ScheduledExecutorService sch;
    static {
        for(int cnt = 0 ; cnt < SIZE ; cnt++) {
            objects.add(new AutoCloseable() {

                @Override
                public void close() throws Exception {
                    System.out.println(Thread.currentThread() + " - Adding object back to pool:" + this + " size: " + objects.size());
                    objects.put(this);
                    System.out.println(Thread.currentThread() + " - Added object back to pool:" + this);
                }

            });
        }
        sch = Executors.newSingleThreadScheduledExecutor();
        sch.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                // TODO Auto-generated method stub
                updateObjects();
            }

        }, 10, 10, TimeUnit.MICROSECONDS);
    }

    static void updateObjects() {
        for(int cnt = 0 ; ! objects.isEmpty() && cnt < SIZE ; cnt++ ) {
            try(AutoCloseable object = objects.take()) {
                System.out.println(Thread.currentThread() + " - updateObjects - updated object: " + object + " size: " + objects.size());
            } catch (Throwable t) {
                // TODO Auto-generated catch block
                t.printStackTrace();
            }
        }
    }

    public AutoCloseable getNext() {
        try {
            return objects.take();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            return null;
        }
    }

    public static void main(String[] args) {
        try (ObjectManager mgr = ObjectManager.getInstance()) {

            for (int cnt = 0; cnt < 5; cnt++) {
                try (AutoCloseable o = mgr.getNext()) {
                    System.out.println(Thread.currentThread() + " - Working with " + o);
                    Thread.sleep(1000);
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        } catch (Throwable tt) {
            tt.printStackTrace();
        }
    }

    @Override
    public void close() throws Exception {
        // TODO Auto-generated method stub
        ObjectManager.sch.shutdownNow();
    }
}

Ответ 3

Здесь я остановлюсь. В методе getNextSocket метод getOrderedDatacenters всегда будет возвращать тот же упорядоченный список, поэтому вы всегда будете выбирать из одних и тех же датацентров от начала до конца (это не проблема).

Как вы гарантируете, что два потока не получат одинаковые liveSocket из getNextSocket?

Что вы здесь говорите, это правда:

Одновременно поток таймера может обращаться к одному и тому же гнезду 0MQ к ping он.

Я думаю, что основная проблема здесь заключается в том, что вы не различаете свободные сокеты и зарезервированные сокеты.

Один из вариантов, как вы сказали, - синхронизировать каждый сокет. Другой вариант - сохранить список зарезервированных сокетов и когда вы хотите получить следующий сокет или обновить сокеты, выбрать только из бесплатных сокетов. Вы не хотите обновлять сокет, который уже зарезервирован.

Также вы можете посмотреть здесь, если это соответствует вашим потребностям.

Ответ 4

В разработке программного обеспечения для операционных систем появилась концепция критического раздела. Критический раздел возникает, когда два или более процесса имеют общие данные, и они выполняются одновременно, в этом случае ни один процесс не должен изменять или даже читать эти общие данные, если другой процесс обращается к этим данным. Так как процесс входит в критический раздел, он должен уведомлять обо всех других одновременно выполняемых процессах, которые он в настоящее время модифицирует критический раздел, поэтому все остальные процессы должны быть заблокированы-ждут, чтобы войти в этот критический раздел. вы спросите, кто организует, какой процесс входит, это еще одна проблема, называемая планированием процессов, которая контролирует, какой процесс должен войти в этот критический раздел, и операционная система сделает это за вас.

поэтому наилучшим решением для вас является семафор, где значение семафора - это количество сокетов, в вашем случае, я думаю, у вас есть один сокет, поэтому вы будете использовать семафор-двоичный семафор, инициализированный с помощью значения семафора = 1, тогда ваш код должен быть разделен на четыре основных раздела: запись критического раздела, критический раздел, критический раздел, выходящий из и остальной части.

  • Запись критического раздела:, когда процесс входит в критический раздел и блокирует все остальные процессы. Семафор позволит одному процессу-потоку войти в критический раздел - использовать сокет, а значение семафора будет уменьшаться - равно нулю -.
  • Критический раздел: код критического раздела, который должен выполнять процесс.
  • Критический раздел: процесс, освобождающий критический раздел для ввода другого процесса. Значение семафора будет увеличено - равно 1, что позволит другому процессу войти в
  • Остальная часть: остальная часть вашего кода, исключая предыдущие 3 раздела.

Теперь вам нужно открыть любые обучающие программы Java по семафорам, чтобы знать, как применять семафор в Java, это очень просто.

Ответ 5

Mouhammed Elshaaer прав, но в дополнение вы также можете использовать любую параллельную коллекцию, например ConcurrentHashMap, где вы можете отслеживать, что каждый поток работает в разных сокетах (например, ConcurrentHashMap key: хэш-код сокета, значение: хэш-код потока или smth еще). Что касается меня, это немного глупое решение, но его можно использовать.

Ответ 6

Для проблемы потоков (поток A и поток таймера), получающих доступ к одному и тому же сокету, я бы сохранил 2 списка сокетов для каждого центра данных:

  • список A: Сокеты, которые не используются
  • список B: используемые сокеты

то есть.,

  • вызов synchronisation getNextSocket(), чтобы найти не используемый в сокете из списка A, удалить его из списка A и добавить его в список B;
  • вызывать synchronisation returnSocket (Socket) после получения ответа /ACK для отправленного сообщения (бизнес или пинг) для перемещения сокета из списка B обратно в список A. Поставить попытку {} finally { } блокировать "отправку сообщения", чтобы убедиться, что сокет будет возвращен в список A, даже если есть исключение.

Ответ 7

У меня есть простое решение, возможно, поможет вам. Я не знаю, может ли в Java добавить пользовательский атрибут к каждому socket. В Socket.io вы можете. Поэтому я хочу рассмотреть это (я удалю этот ответ, если нет).

В каждый сокет добавляется атрибут boolean, называемый locked. Итак, когда ваш поток проверяет сокет первый, атрибут locked будет True. Любой другой поток при вызове ping THIS проверяет, не заблокирован ли атрибут False. Если нет, getNextSocket.

Итак, на этом участке ниже...

...
for (SocketHolder liveSocket : liveSockets) {
        Socket socket = liveSocket.getSocket();
...

Вы проверите, заблокирован ли сокет. Если да, убейте этот поток или прервите его или перейдите к следующему сокету. (Я не знаю, как вы это называете на Java).

Итак, процесс...

  • Поток получает разблокированный сокет
  • Настройте этот socket.locked на True.
  • Сокет для пинга Thread и сделайте все, что хотите.
  • Настройте этот socket.locked на False.
  • Тема переходит к следующему.

Извините, мой плохой английский:)