Подтвердить что ты не робот

Java concurrency шаблон для внешнего общего ресурса (смарт-карты)

У меня есть служба веб-сервера, где клиенты запрашивают вычисления смарт-карт и получают их результат. Доступный номер смарт-карты может уменьшаться или увеличиваться во время работы сервера, например, я могу добавить или удалить физически смарт-карту от читателя (или многие другие события... например, исключение и т.д.).

введите описание изображения здесь

Вычисление смарт-карт может занять некоторое время, поэтому мне нужно оптимизировать эти задания, чтобы использовать все доступные смарт-карты, если на веб-сервере есть параллельные запросы.

Я думал работать с пулом смарт-карт. Необычная вещь, по крайней мере для меня, заключается в том, что пул должен менять свой размер не в зависимости от клиентских запросов, а только от наличия смарт-карты.

введите описание изображения здесь

Я изучил множество примеров:

  • BlockingQueue. Хорошо выглядит, чтобы хранить запрос и останавливать поток, ожидая чего-то.
  • FutureTask. Я могу использовать этот класс, чтобы клиент ожидал ответа, но какой excecutor должен выполнить эту задачу?
  • ThreadPoolExecutor: Мне кажется, что мне нужно, но с этим я не могу изменить размер пула, причем каждый поток должен быть связан с одним слотом для смарт-карт. Это может быть решением, если я могу изменить размер пула (добавив поток при вставленной смарт-карте и удалив поток при удалении смарт-карты), и если я могу назначить конкретную смарт-карту для каждого потока.

Это управление смарт-картами, у меня есть один смарт-карт на смарт-карте, каждая смарт-карта имеет свой собственный номер слота.

public class SmartcardWrapper{

    private int slot;

    public SmartcardWrapper(int slot) {
        this.slot=slot;
    }   

    public byte[] compute(byte[] input) {
        byte[] out=new byte[];
        SmartcardApi.computerInput(slot,input,out); //Native method
        return out;
    }
}

Я попытался создать пул потоков с одним потоком на смарт-карту:

private class SmartcardThread extends Thread{

    protected SmartcardWrapper sw;

    public SmartcardThread(SmartcardWrapper sw){
        this.sw=sw;
    }

    @Override
    public void run() {
        while(true){
            byte[] input=queue.take();
            byte output=sw.compute(input);
            // I have to return back the output to the client
        }           
    }
}

Все ждут чего-то в одной очереди ввода:

BlockingQueue<byte[]> queue=new BlockingQueue<byte[]>();

Но как вернуть обратный вывод из smartcard-thread на веб-сервер-клиент? Это позволяет мне думать, что BlockingQueue не является моим решением.

Как подойти к этой проблеме? Какую модель concurrency следует придерживаться? правильно ли назначить один поток на смарт-карту или я просто могу использовать семафоры?

4b9b3361

Ответ 1

Ваше предположение:

ThreadPoolExecutor: Кажется, что мне нужно, но с этим я не могу изменить размер пула, причем каждый поток должен быть связан с одним слотом смарт-карты.

неверно.

You can set thread pool size dynamically.

Посмотрите ниже ThreadPoolExecutor API

public void setMaximumPoolSize(int maximumPoolSize)

Устанавливает максимально допустимое количество потоков. Это переопределяет любое значение, заданное в конструкторе. Если новое значение меньше текущего значения, избыточные существующие потоки будут прекращены, когда они станут свободными.

public void setCorePoolSize(int corePoolSize)

Устанавливает базовое число потоков. Это переопределяет любое значение, заданное в конструкторе. Если новое значение меньше текущего значения, избыточные существующие потоки будут прекращены, когда они станут свободными. Если больше, новые потоки при необходимости будут запущены для выполнения любых задач в очереди.

Core and maximum pool sizes:

A ThreadPoolExecutor автоматически отрегулирует размер пула в соответствии с границами, установленными corePoolSize и maximumPoolSize.

Когда новая задача отправляется в методе execute(java.lang.Runnable) и выполняется меньше потоков corePoolSize, для обработки запроса создается новый поток, даже если другие рабочие потоки простаивают.

Если существует более чем corePoolSize, но меньше, чем maximumPoolSize потоков, новый поток будет создан только в том случае, если очередь заполнена.

Установив maximumPoolSize на существенно неограниченное значение, такое как Integer.MAX_VALUE, вы позволяете пулу вмещать произвольное количество одновременных задач. Но я бы не рекомендовал иметь такое количество потоков. Установите это значение с осторожностью.

Как правило, размеры ядра и максимального пула устанавливаются только при построении, но их также можно динамически изменять с помощью setCorePoolSize(int) и setMaximumPoolSize(int).

EDIT:

Для лучшего использования пула потоков, если вы знаете, что максимальное количество карт составляет 6, вы можете использовать

 ExecutorService executor = Executors.newFixedThreadPool(6);

ИЛИ

Ответ 2

Рассматривали ли вы вообще Apache Commons Pool?

Вам необходимо поддерживать пул объектов SmartcardWrapper, где каждый SmartcardWrapper будет представлять собой физическую смарт-карту. Всякий раз, когда вам нужно сделать новое вычисление, вы берете объект из пула, делаете расчет и возвращаете объект в пул, чтобы его можно было повторно использовать в следующем потоке.

Сам пул является потокобезопасным и блокирует, когда нет доступных объектов. Все, что вам нужно сделать, это реализовать api для добавления/удаления объектов SmartcardWrapper в пул.

Ответ 3

Возможно, я нашел разумное простое решение, основанное на следующих предположениях:

  • отдельный процесс обрабатывает (системные события) уведомления для смарт-карт, которые становятся доступными или удаляются.
  • Клиент не заботится о том, какую смарт-карту он может использовать, если он может использовать один без помех.

Эти два предположения фактически облегчают создание решения для объединения (разделяемые ресурсы), так как обычно сам пул отвечает за создание и удаление ресурсов, когда это необходимо. Без этой функции упрощается решение для объединения. Я предполагаю, что клиент, который получает смарт-карту из пула для использования, может выполнять необходимые функции смарт-карты в своем собственном потоке выполнения (подобно тому, как соединение с базой данных используется из пула соединений с базой данных для запроса данных из базы данных).

Я выполнил минимальное тестирование для двух классов, показанных ниже, и я боюсь, что основная часть работы заключается в письменных (единичных) тестах, которые доказывают, что пул работает правильно с одновременными клиентскими запросами в сочетании с добавлением и удалением смарт-карты Ресурсы. Если вы не хотите этого делать, то ответ от пользователя769771, вероятно, является лучшим решением. Но если вы это сделаете, попробуйте, посмотрите, подходит ли она. Идея заключается в том, что только один экземпляр пула ресурсов создается и используется всеми клиентами и обновляется отдельным процессом, который управляет доступностью смарт-карт.

import java.util.*;
import java.util.concurrent.*;

/**
 * A resource pool that expects shared resources 
 * to be added and removed from the pool by an external process
 * (i.e. not done by the pool itself, see {@link #add(Object)} and {@link #remove(Object)}.
 * <br>A {@link ResourcePoolValidator} can optionally be used. 
 * @param <T> resource type handed out by the pool.
 */
public class ResourcePool<T> {

    private final Set<T> registered = Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>()); 
    /* Use a linked list as FIFO queue for resources to lease. */
    private final List<T> available = Collections.synchronizedList(new LinkedList<T>()); 
    private final Semaphore availableLock = new Semaphore(0, true); 

    private final ResourcePoolValidator<T> validator;

    public ResourcePool() {
        this(null);
    }

    public ResourcePool(ResourcePoolValidator<T> validator) {
        super();
        this.validator = validator;
    }

    /**
     * Add a resource to the pool.
     * @return true if resource is not already in the pool.
     */
    public synchronized boolean add(T resource) {

        boolean added = false;
        if (!registered.contains(resource)) {
            registered.add(resource);
            available.add(resource);
            availableLock.release();
            added = true;
        }
        return added;
    }

    /**
     * Removes a resource from the pool.
     * The resource might be in use (see {@link #isLeased(Object)})
     * in which case {@link ResourcePoolValidator#abandoned(Object)} will be called 
     * when the resource is no longer used (i.e. released). 
     * @return true if resource was part of the pool and removed from the pool.
     */
    public synchronized boolean remove(T resource) {

        // method is synchronized to prevent multiple threads calling add and remove at the same time 
        // which could in turn bring the pool in an invalid state.
        return registered.remove(resource);
    }

    /**
     * If the given resource is (or was, see also {@link #remove(Object)} part of the pool,
     * a returned value true indicates the resource is in use / checked out.
     * <br>This is a relative expensive method, do not call it frequently.
     */
    public boolean isLeased(T resource) {
        return !available.contains(resource);
    }

    /**
     * Try to get a shared resource for usage. 
     * If a resource is acquired, it must be {@link #release(Object)}d in a finally-block.
     * @return A resource that can be exclusively used by the caller.
     * @throws InterruptedException When acquiring a resource is interrupted.
     * @throws TimeoutException When a resource is not available within the given timeout period.
     */
    public T tryAcquire(long timeout, TimeUnit tunit) throws InterruptedException, TimeoutException {

        T resource = null;
        long timeRemaining = tunit.toMillis(timeout);
        final long tend = System.currentTimeMillis() + timeRemaining;
        do {
            if (availableLock.tryAcquire(timeRemaining, TimeUnit.MILLISECONDS)) {
                resource = available.remove(0);
                if (registered.contains(resource)) {
                    boolean valid = false;
                    try {
                        valid = (validator == null ? true : validator.isValid(resource));
                    } catch (Exception e) {
                        // TODO: log exception
                        e.printStackTrace();
                    }
                    if (valid) {
                        break; // return the "checked out" resource
                    } else {
                        // remove invalid resource from pool
                        registered.remove(resource);
                        if (validator != null) {
                            validator.abandoned(resource);
                        }
                    }
                }
                // resource was removed from pool, try acquire again
                // note that this implicitly lowers the maximum available resources
                // (an acquired permit from availableLock goes unused).
                // TODO: retry puts us at the back of availableLock queue but should put us at the front of the queue
                resource = null;
            }
            timeRemaining = tend - System.currentTimeMillis();
        } while (timeRemaining > 0L);
        if (resource == null) {
            throw new TimeoutException("Unable to acquire a resource within " + tunit.toMillis(timeout) + " ms.");
        }
        return resource;
    }

    /**
     * This method must be called by the caller / client whenever {@link #tryAcquire(long, TimeUnit)}
     * has returned a resource. If the caller has determined the resource is no longer valid,
     * the caller should call {@link #remove(Object)} before calling this method.
     * @param resource no longer used.
     */
    public void release(T resource) {

        if (resource == null) {
            return;
        }
        if (registered.contains(resource)) {
            available.add(resource);
            availableLock.release();
        } else {
            if (validator != null) {
                validator.abandoned(resource);
            }
        }
    }

    /** An array (copy) of all resources registered in the pool. */
    @SuppressWarnings("unchecked")
    public T[] getRegisteredResources() {
        return (T[]) registered.toArray(new Object[registered.size()]);
    }

}

И отдельный класс с функциями, связанными с отдельным процессом, который управляет доступностью smarcard.

import java.util.concurrent.TimeUnit;

/**
 * Used by a {@link ResourcePool} to validate a resource before handing it out for lease
 * (see {@link #isValid(Object)} and signal a resource is no longer used (see {@link #abandoned(Object)}). 
 */
public class ResourcePoolValidator<T> {

    /**
     * Overload this method (this method does nothing by default) 
     * to validate a resource before handing it out for lease.
     * If this method returns false or throws an exception (which it preferably should not do), 
     * the resource is removed from the pool.
     * @return true if the resource is valid for leasing
     */
    public boolean isValid(T resource) {
        return true;
    }

    /**
     * Called by the {@link ResourcePool#release(Object)} method when a resource is released by a caller 
     * but the resource was previously removed from the pool and in use.
     * <br>Called by {@link ResourcePool#tryAcquire(long, TimeUnit)} if a resource if not valid 
     * (see {@link #isValid(Object)}.
     * <br>Overload this method (this method does nothing by default) to create a notification of an unused resource,
     * do NOT do any long period of processing as this method is called from a caller (client) thread.
     */
    public void abandoned(T resource) {
        // NO-OP
    }

}

Ответ 4

Изучив требования, лучшая архитектура будет заключаться в том, чтобы отделить вычисление смарт-карты от ваших веб-сервисов.

Полагаться на веб-службы, чтобы ждать от задач с интенсивным процессором, приведет к таймаутам.

Лучшим решением является предварительная интеллектуальная карта, использующая периодическое задание и сохраняющая эти слоты, пар вычислений на сервере кэширования, например Redis.

введите описание изображения здесь

Задание синхронизатора смарт-карт представляет собой отдельное приложение J2SE Stand Alone, которое периодически проверяет, какая смарт-карта доступна и активна (без ошибок), и обновляет Redis Cache слотом и вычисляет в качестве пары "ключ/значение". Если смарт-карта недоступна, она будет удалена из кеша.

Веб-служба просто проверит кеш Redis для определенного ключа слота, и если он найдет значение, оно вернет его или вернет не найденное для этого слота (недоступно или ошибка)

Эта конструкция масштабируется как в конце смарт-карты, так и в конце клиентских запросов.

Ответ 5

В ответ на ваш вопрос о том, как вернуть результат обратно вызывающему:

Все ждут чего-то в одной очереди ввода:

BlockingQueue queue = new BlockingQueue();

Но как вернуть обратный вывод из smartcard-thread в Веб-сервер-клиент? Это позволяет мне думать, что BlockingQueue не мой Решение.

Идея очереди отправки в основном прекрасна, но вам также нужна очередь в потоке, чтобы вернуть результат в адрес отправителя задания...

Измените очередь отправки на:

BlockingQueue<JobSubmitRec> queue=new BlockingQueue<JobSubmitRec>();

и JobSubmitRec будет иметь байт [] и одноразовую очередь для возврата результата:

class JobSubmitRec
{
  byte[] data;
  BlockingQueue<JobSubmitResult> result=new LinkedBlockingQueue<JobSubmitResult>();
}

и ваш рабочий поток Thread будет выглядеть примерно так:

public void run() {
 while(true){
  JobSubmitRec submitrec = queue.take();
  byte[] input = submitrec.data;
  byte output = sw.compute(input);
  submitrec.result.put( new JobSubmitResult(output) );
 }           
}

и клиент, который отправит задание, будет выглядеть так:

JobSubmitRec jsr = new JobSubmitRec( data );
queue.put( jsr );
JobSubmitResult result = jsr.result.take();
// use result here