Подтвердить что ты не робот

Предварительная инициализация пула рабочих потоков для повторного использования объектов подключения (сокетов)

Мне нужно создать пул работников на Java, где каждый рабочий имеет свой собственный подключенный сокет; когда рабочий поток работает, он использует сокет, но сохраняет его открытым для повторного использования позже. Мы решили использовать этот подход, поскольку накладные расходы, связанные с созданием, подключением и уничтожением сокетов на разовой основе, потребовали слишком много накладных расходов, поэтому нам нужен метод, с помощью которого пул работников предварительно инициализируется их сокетным соединением, готовым к приступайте к работе, сохраняя ресурсы сокетов в безопасности от других потоков (сокеты не являются потокобезопасными), поэтому нам нужно что-то в этом направлении...

public class SocketTask implements Runnable {
  Socket socket;
  public SocketTask(){
    //create + connect socket here
  }

  public void run(){
    //use socket here
  }

}

При запуске приложения мы хотим инициализировать рабочих и, надеюсь, сокет-подключения так или иначе...

MyWorkerPool pool = new MyWorkerPool();
for( int i = 0; i < 100; i++)
   pool.addWorker( new WorkerThread());

Поскольку работа запрашивается приложением, мы отправляем задачи в рабочий пул для немедленного выполнения...

pool.queueWork( new SocketTask(..));


Обновлено с помощью рабочего кода
Основываясь на полезных комментариях от Gray и jontejj, у меня есть следующий код, работающий...

SocketTask

public class SocketTask implements Runnable {
    private String workDetails;
    private static final ThreadLocal<Socket> threadLocal = 
           new ThreadLocal<Socket>(){
        @Override
        protected Socket initialValue(){
            return new Socket();
        }           
    };

    public SocketTask(String details){              
        this.workDetails = details;
    }

    public void run(){      
        Socket s = getSocket(); //gets from threadlocal
        //send data on socket based on workDetails, etc.
    }

    public static Socket getSocket(){
        return threadLocal.get();
    }
}

ExecutorService

ExecutorService threadPool = 
    Executors.newFixedThreadPool(5, Executors.defaultThreadFactory());

    int tasks = 15;  
    for( int i = 1; i <= tasks; i++){
        threadPool.execute(new SocketTask("foobar-" + i));
    }   

Мне нравится этот подход по нескольким причинам...

  • Сокеты - это локальные объекты (через ThreadLocal), доступные для выполняемых задач, устраняющие проблемы параллелизма.
  • Сокеты создаются один раз и сохраняются открытыми, повторно используются при постановке новых задач, исключая создание или уничтожение служебных данных сокета.
4b9b3361

Ответ 1

Одной из идей было бы поставить Socket в BlockingQueue. Тогда всякий раз, когда вам нужен Socket ваши потоки могут take() из очереди, а когда они выполняются с помощью Socket они put() обратно в очередь.

public void run() {
    Socket socket = socketQueue.take();
    try {
       // use the socket ...
    } finally {
       socketQueue.put(socket);
    }
}

Это имеет дополнительные преимущества:

  • Вы можете вернуться к использованию кода ExecutorService.
  • Вы можете отделить сокет от обработки результатов.
  • Вам не требуется соответствие 1 к 1 для обработки потоков и сокетов. Но связь сокета может составлять 98% работы, поэтому, возможно, нет выигрыша.
  • Когда все будет готово, а ваш ExecutorService завершен, вы можете отключить свои сокеты, просто отключив их и закрыв их.

Это добавляет дополнительные накладные расходы другого BlockingQueue но если вы используете Socket связь, вы этого не заметите.

мы не считаем, что ThreadFactory удовлетворяет наши потребности...

Я думаю, вы могли бы сделать эту работу, если бы использовали локаторы потоков. Ваша фабрика потоков создаст поток, который сначала откроет сокет, сохранит его в потоковом-локальном, а затем Runnable arg, который выполняет всю работу с сокетом, деинсталлируя задания из внутренней очереди ExecutorService. Как только это будет сделано, метод arg.run() завершится, и вы сможете получить сокет из локального потока и закрыть его.

Что-то вроде следующего. Это немного грязно, но вы должны получить эту идею.

ExecutorService threadPool =
    Executors.newFixedThreadPool(10,
      new ThreadFactory() {
        public Thread newThread(final Runnable r) {
            Thread thread = new Thread(new Runnable() {
                public void run() {
                    openSocketAndStoreInThreadLocal();
                    // our tasks would then get the socket from the thread-local
                    r.run();
                    getSocketFromThreadLocalAndCloseIt();
                }
            });
            return thread;
        }
      }));

Таким образом, ваши задачи будут реализовывать Runnable и выглядеть так:

public SocketWorker implements Runnable {
    private final ThreadLocal<Socket> threadLocal;
    public SocketWorker(ThreadLocal<Socket> threadLocal) {
       this.threadLocal = threadLocal;
    }
    public void run() {
        Socket socket = threadLocal.get();
        // use the socket ...
    }
}

Ответ 2

Я думаю, вы должны использовать ThreadLocal

package com.stackoverflow.q16680096;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main
{
    public static void main(String[] args)
    {
        ExecutorService pool = Executors.newCachedThreadPool();
        int nrOfConcurrentUsers = 100;
        for(int i = 0; i < nrOfConcurrentUsers; i++)
        {
            pool.submit(new InitSocketTask());
        }

        // do stuff...

        pool.submit(new Task());
    }
}

package com.stackoverflow.q16680096;

import java.net.Socket;

public class InitSocketTask implements Runnable
{
    public void run()
    {
        Socket socket = SocketPool.get();
        // Do initial setup here
    }

}

package com.stackoverflow.q16680096;

import java.net.Socket;

public final class SocketPool
{
    private static final ThreadLocal<Socket> SOCKETS = new ThreadLocal<Socket>(){
        @Override
        protected Socket initialValue()
        {
            return new Socket(); // Pass in suitable arguments here...
        }
    };

    public static Socket get()
    {
        return SOCKETS.get();
    }
}

package com.stackoverflow.q16680096;

import java.net.Socket;

public class Task implements Runnable
{
    public void run()
    {
        Socket socket = SocketPool.get();
        // Do stuff with socket...
    }
}

Где каждый поток получает свой собственный сокет.