У меня есть около 60 сокетов и 20 потоков, и я хочу, чтобы каждый поток работал в разных сокетах каждый раз, поэтому я не хочу делиться одним и тем же сокетом между двумя потоками.
В моем классе SocketManager
у меня есть фоновый поток, который запускается каждые 60 секунд и вызывает метод updateLiveSockets()
. В методе updateLiveSockets()
я повторяю все сокеты, которые у меня есть, а затем начинаю их пинговать один за другим, вызывая метод send
класса SendToQueue
и основываясь на ответе. Я отмечаю их как живых или мертвых. В методе updateLiveSockets()
мне всегда нужно перебирать все сокеты и пинговать их, чтобы проверить, являются ли они живыми или мертвыми.
Теперь все потоки чтения вызовут метод getNextSocket()
класса SocketManager
одновременно, чтобы получить следующий доступный в реальном времени сокет для отправки бизнес-сообщения в этот сокет. Поэтому у меня есть два типа сообщений, которые я отправляю по сокету:
- Одно сообщение
ping
в сокете. Это отправляется только из потока таймера, вызывающего методupdateLiveSockets()
в классеSocketManager
. - Другим является сообщение
business
в сокете. Это делается в классеSendToQueue
.
Итак, если поток нити пингует сокет, чтобы проверить, живут ли они или нет, ни один другой бизнес-поток не должен использовать этот сокет. Аналогично, если бизнес-поток использует сокет для отправки данных на него, то нить нити не должен пинговать этот сокет. И это касается всего сокета. Но мне нужно убедиться, что в методе updateLiveSockets
мы пинговаем все доступные сокеты, когда мой фоновый поток запускается, чтобы мы могли выяснить, какой сокет жив или мертв.
Ниже мой класс SocketManager
:
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new ConcurrentHashMap<>();
private final ZContext ctx = new ZContext();
// ...
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
updateLiveSockets();
}
}, 60, 60, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
}
}
private List<SocketHolder> connect(List<String> paddes, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
// ....
return socketList;
}
// this method will be called by multiple threads concurrently to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
return liveSocket;
}
}
return Optional.absent();
}
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!listOfEndPoints.isEmpty()) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
}
}
return Optional.absent();
}
// runs every 60 seconds to ping all the available socket to make sure whether they are alive or not
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) {
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
// pinging to see whether a socket is live or not
boolean isLive = SendToQueue.getInstance().send(message.getAddress(), message.getEncodedRecords(), socket);
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
}
}
И вот мой класс SendToQueue
:
// this method will be called by multiple reader threads (around 20) concurrently to send the data
public boolean sendAsync(final long address, final byte[] encodedRecords) {
PendingMessage m = new PendingMessage(address, encodedRecords, true);
cache.put(address, m);
return doSendAsync(m);
}
private boolean doSendAsync(final PendingMessage pendingMessage) {
Optional<SocketHolder> liveSocket = SocketManager.getInstance().getNextSocket();
if (!liveSocket.isPresent()) {
// log error
return false;
}
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// send data on a socket LINE A
return msg.send(liveSocket.get().getSocket());
} finally {
msg.destroy();
}
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
return false;
} finally {
cache.invalidate(address);
}
}
Заявление о проблемах
Теперь, как вы можете видеть выше, я использую один и тот же сокет между двумя потоками. Кажется, что getNextSocket()
в классе SocketManager
может вернуть 0MQ socket
в Thread A
. Одновременно, timer thread
может получить доступ к тому же 0MQ socket
, чтобы выполнить ping. В этом случае Thread A
и timer thread
мутируют то же самое 0MQ socket
, что может привести к проблемам. Поэтому я пытаюсь найти способ, чтобы я мог запретить разные потоки отправлять данные в один и тот же сокет одновременно и удалять мои данные.
Одним из решений, о котором я могу думать, является использование synchronization
в сокете при отправке данных, но если многие потоки используют один и тот же сокет, ресурсы не используются. Более того, если msg.send(socket);
заблокирован (технически он не должен), все потоки, ожидающие этого сокета, блокируются. Поэтому я предполагаю, что может быть лучший способ гарантировать, что каждый поток использует другой отдельный роутер одновременно, вместо синхронизации на конкретном сокете.