Подтвердить что ты не робот

Проектирование блокировки на основе ключа (или блокировки карты)

Я пытаюсь создать механизм блокировки на основе ключей: что-то вроде обычной блокировки реентера, но вместо блокировки() и unlock() вы блокируете (ключ) и разблокируете (ключ), с контрактом, что нет -one сможет блокировать (key1) одновременно, если key.equals(key1).

Будет ли этот код работать? Существуют ли более эффективные решения? Мне особенно не нравится цикл while, пытаясь поместить замок на карту...

package luca;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantLock;

public class KeyedReentrantLock<K> {
    private ConcurrentMap<K, ReentrantLock> lockMap = new ConcurrentHashMap<K, ReentrantLock>();

    public void lock(K key) {
        ReentrantLock oldLock = lockMap.get(key);
        if (oldLock != null && oldLock.isHeldByCurrentThread()){
            // increase lock count and return.
            oldLock.lock();
            return;
        }
        ReentrantLock newLock = new ReentrantLock();
        newLock.lock();
        while ((oldLock = lockMap.putIfAbsent(key, newLock)) != null){
            // wait for the old lock to be released;
            oldLock.lock();
            oldLock.unlock();
        }
        return;
    }

    public void unlock(K key){
        ReentrantLock lock = lockMap.get(key);
        if (lock == null) throw new IllegalMonitorStateException("There was no lock for this key!");
        if (lock.getHoldCount() == 1){
            lockMap.remove(key);
        }
        lock.unlock();
    }

}
4b9b3361

Ответ 1

Почему бы просто не использовать простое полосатое блокирование, например:

/**
 * Striped locks holder, contains array of {@link java.util.concurrent.locks.ReentrantLock}, on which lock/unlock
 * operations are performed. Purpose of this is to decrease lock contention.
 * <p>When client requests lock, it gives an integer argument, from which target lock is derived as follows:
 * index of lock in array equals to <code>id & (locks.length - 1)</code>.
 * Since <code>locks.length</code> is the power of 2, <code>locks.length - 1</code> is string of '1' bits,
 * and this means that all lower bits of argument are taken into account.
 * <p>Number of locks it can hold is bounded: it can be from set {2, 4, 8, 16, 32, 64}.
  */
public class StripedLock {
    private final ReentrantLock[] locks;

    /**
     * Default ctor, creates 16 locks
     */
    public StripedLock() {
        this(4);
    }

    /**
     * Creates array of locks, size of array may be any from set {2, 4, 8, 16, 32, 64} 
     * @param storagePower size of array will be equal to <code>Math.pow(2, storagePower)</code>
     */
    public StripedLock(int storagePower) {
        if (!(storagePower >= 1 && storagePower <= 6)) { throw new IllegalArgumentException("storage power must be in [1..6]"); }

        int lockSize = (int) Math.pow(2, storagePower);
        locks = new ReentrantLock[lockSize];
        for (int i = 0; i < locks.length; i++)
            locks[i] = new ReentrantLock();
    }

    /**
     * Locks lock associated with given id.
     * @param id value, from which lock is derived
     */
    public void lock(int id) {
        getLock(id).lock();
    }

    /**
     * Unlocks lock associated with given id.
     * @param id value, from which lock is derived 
     */
    public void unlock(int id) {
        getLock(id).unlock();
    }

    /**
     * Map function between integer and lock from locks array
     * @param id argument
     * @return lock which is result of function 
     */
    private ReentrantLock getLock(int id) {
        return locks[id & (locks.length - 1)];
    }
}

Ответ 2

Пожалуйста, обратитесь к следующему образцу кода, я создал новую блокировку для каждого потока.

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;

public class Processor implements Runnable {

    final static ConcurrentHashMap<Integer, ReentrantReadWriteLock> CONCURRENT_HASH_MAP = new ConcurrentHashMap<Integer, ReentrantReadWriteLock>();

    final private Employee employee;

    public Processor(int id) {
        this.employee = new Employee(id);
    }

    public void run() {
        processDate(employee);
    }

    /**
     * Method to be shared
     * 
     * @param id
     */
    public void processDate(final Employee employee) {

        final int employeeId = employee.getId();
        ReentrantReadWriteLock monitoredObject = new ReentrantReadWriteLock();
        System.out.println("Before taking the lock"
                    + Thread.currentThread().getName());
        while (CONCURRENT_HASH_MAP.putIfAbsent(employeeId, monitoredObject) != null) {
        }
        ReadLock lock = monitoredObject.readLock();
        lock.lock();
        try {
            processXML(employee);
        } catch (Exception e) {
            e.printStackTrace();
        }
        CONCURRENT_HASH_MAP.remove(employeeId);
        lock.unlock();
    }

    /**
     * For similar id object this will run one by one but for different objects
     * this will run parallal.
     * 
     * This method will execute serially if called by multiple threads for
     * employee with same emp id
     * 
     * @throws Exception
     */
    public void processXML(final Employee employee) throws Exception {
        System.out.println("Process XML for " + employee.getId()
                + Thread.currentThread().getName());
        Thread.sleep(2000);
        System.out.println("Done XML Processing for " + employee.getId()
                + Thread.currentThread().getName());
        ReentrantReadWriteLock lock = CONCURRENT_HASH_MAP.get(employee.getId());
        System.out.println("lock object " + lock + "queue length "
                + lock.getQueueLength());
    }

    class Employee {
        private Integer id;

        public Employee(final int id) {
            this.id = id;
        }

        public void setId(Integer id) {
            this.id = id;
        }

        public Integer getId() {
            return id;
        }
    }

    public static void main(String[] args) {
        final ExecutorService executorService = Executors.newFixedThreadPool(10);
        long startTime = System.currentTimeMillis();
        /**
         * In Processors Constructor instead of i write 1 and see the
         * difference.
         */
        for (int i = 1; i <= 5; i++)
            executorService.submit(new Processor(i));
        executorService.shutdown();
        /*
         * Let the main thread wait till the executor service is terminated to
         * observe the total time taken
         */
        while (executorService.isTerminated() != true) {
        }
        long endTime = System.currentTimeMillis();
        long timeTaken = endTime - startTime;
        System.out.println("time taken.... " + timeTaken + " ms");

    }
}