Подтвердить что ты не робот

Как реализовать блокировку с тайм-аутом в Python 2.7

Есть ли способ реализовать блокировку в Python для многопоточности, метод acquire может иметь произвольный тайм-аут? Единственные рабочие решения, которые я нашел до сих пор, используют опрос, который

  • Я считаю неэлегантным и неэффективным
  • Не сохраняет ограниченную гарантию ожидания/выполнения блокировки как решение проблемы критического сечения

Есть ли лучший способ реализовать это?

4b9b3361

Ответ 1

чтобы уточнить предложение Стивена:

import threading
import time

lock = threading.Lock()
cond = threading.Condition(threading.Lock())

def waitLock(timeout):
    with cond:
        current_time = start_time = time.time()
        while current_time < start_time + timeout:
            if lock.acquire(False):
                return True
            else:
                cond.wait(timeout - current_time + start_time)
                current_time = time.time()
    return False

Замечания:

  • существует два объекта threading.Lock(), один из которых является внутренним для threading.Condition().
  • при манипулировании cond он фиксируется; однако операция wait() разблокирует его, поэтому любое количество потоков может наблюдать за ним.
  • ожидание встроено в цикл for, который отслеживает время. threading.Condition может быть уведомлен по причинам, отличным от тайм-аутов, поэтому вам все равно нужно отслеживать время, если вы действительно хотите, чтобы он истекал.
  • даже с условием, вы по-прежнему "опросите" реальный замок, потому что он позволяет более чем одному потоку просыпаться и гоняться за блокировкой. если lock.acquire терпит неудачу, цикл возвращается к ожиданию.
  • вызывающие функции waitLock должны следовать за lock.release() с cond.notify(), чтобы другие ожидающие его потоки уведомлялись о том, что они должны повторить попытку блокировки. Это не показано в примере.

Ответ 2

Моя версия использует поточные безопасные очереди http://docs.python.org/2/library/queue.html и их методы put/get, которые поддерживают тайм-аут.

До сих пор работает нормально, но если кто-то может сделать рецензирование на нем, я буду благодарен.

"""
Thread-safe lock mechanism with timeout support module.
"""

from threading import ThreadError, current_thread
from Queue import Queue, Full, Empty


class TimeoutLock(object):
    """
    Thread-safe lock mechanism with timeout support.
    """

    def __init__(self, mutex=True):
        """
        Constructor.
        Mutex parameter specifies if the lock should behave like a Mutex, and
        thus use the concept of thread ownership.
        """
        self._queue = Queue(maxsize=1)
        self._owner = None
        self._mutex = mutex

    def acquire(self, timeout=0):
        """
        Acquire the lock.
        Returns True if the lock was succesfully acquired, False otherwise.

        Timeout:
        - < 0 : Wait forever.
        -   0 : No wait.
        - > 0 : Wait x seconds.
        """
        th = current_thread()
        try:
            self._queue.put(
                th, block=(timeout != 0),
                timeout=(None if timeout < 0 else timeout)
            )
        except Full:
            return False

        self._owner = th
        return True

    def release(self):
        """
        Release the lock.
        If the lock is configured as a Mutex, only the owner thread can release
        the lock. If another thread attempts to release the lock a
        ThreadException is raised.
        """
        th = current_thread()
        if self._mutex and th != self._owner:
            raise ThreadError('This lock isn\'t owned by this thread.')

        self._owner = None
        try:
            self._queue.get(False)
            return True
        except Empty:
            raise ThreadError('This lock was released already.')

Ответ 3

Я сомневаюсь, что это можно сделать.

Если вы хотите реализовать это без какого-либо опроса, вам нужно, чтобы ОС знала, что поток заблокирован, и OS должна знать о тайм-ауте, чтобы разблокировать поток через некоторое время. Для этого в ОС требуется поддержка; вы не можете реализовать это на уровне Python.

(Вы могли бы заблокировать поток на уровне ОС или на уровне приложения и иметь механизм, благодаря которому его можно разбудить другим потоком в соответствующее время, но тогда вам понадобится этот другой поток для эффективного опроса )

Как правило, у вас нет действительно ограниченной гарантии ожидания/выполнения блокировки, так как вашему потоку придется ждать неограниченное время для переключения контекста, чтобы он заметил, что он был разблокирован. Поэтому, если вы не можете установить верхнюю границу количества соперников по процессору, вы не сможете использовать таймаут для достижения каких-либо жестких сроков реального времени. Но вам, вероятно, это не нужно, иначе вы не мечтали бы использовать блокировки, реализованные в Python.


Из-за Python GIL (Global Interpreter Lock) эти решения на основе опроса, вероятно, не так неэффективны или так сильно не ограничены, как вы думаете (в зависимости от того, как они реализованы) (и предполагая, что вы используете либо CPython или PyPy).

Там только один поток работает за раз, и по определению есть другой поток, который вы хотите запустить (тот, который удерживает блокировку, которую вы ожидаете). GIL некоторое время удерживается одним потоком, чтобы выполнить кучу байт-кодов, затем отбрасывается и снова появляется, чтобы дать кому-то шанс на него. Поэтому, если поток с заблокированным временем с тайм-аутом находится только в цикле, проверяя время и уступая другим потокам, он будет только просыпаться каждый раз, когда он получает GIL, а затем почти сразу бросает его обратно кому-то другому и блокирует GIL снова. Поскольку этот поток может только когда-либо проснуться, когда он все равно превратится в GIL, он также выполнит эту проверку, как только истечет время ожидания, так как он сможет возобновить выполнение, даже если таймаут был волшебным совершенством.

Единственный раз, когда это приведет к большой неэффективности, - это если ваш поток заблокирован, ожидая поток блокировки, который заблокирован, ожидая чего-то, что не может быть вызвано другим потоком Python (например, заблокировано в IO), и нет других запущенных потоков Python. Тогда ваш тайм-аут опроса действительно будет просто сидеть там, проверяя время неоднократно, что может быть плохо, если вы ожидаете, что эта ситуация произойдет в течение длительных периодов времени.

Ответ 4

Я принял SingleNegationElimination ответ и создал класс, который может быть использован в with -statement следующим образом:

global_lock = timeout_lock()
...

with timeout_lock(owner='task_name', lock=global_lock):
    do()
    some.stuff()

Таким образом он будет < предупреждать, если истекло время ожидания (по умолчанию = 1 с), и покажите владельцу блокировки для расследования.

Используйте его таким образом, и после таймаута будет выбрано исключение:

with timeout_lock(owner='task_name', lock=global_lock, raise_on_timeout=True):
    do()
    some.stuff()

Экземпляр timeout_lock.lock() должен быть создан один раз и может использоваться по потокам.

Вот класс - он работает для меня, но не стесняйтесь комментировать и улучшать:

class timeout_lock:
    ''' taken from https://stackoverflow.com/a/8393033/1668622
    '''
    class lock:
        def __init__(self):
            self.owner = None
            self.lock = threading.Lock()
            self.cond = threading.Condition()

        def _release(self):
            self.owner = None
            self.lock.release()
            with self.cond:
                self.cond.notify()

    def __init__(self, owner, lock, timeout=1, raise_on_timeout=False):
        self._owner = owner
        self._lock = lock
        self._timeout = timeout
        self._raise_on_timeout = raise_on_timeout

    def __enter__(self):
        self.acquire()
        return self

    def __exit__(self, type, value, tb):
        ''' will only be called if __enter__ did not raise '''
        self.release()

    def acquire(self):
        if self._raise_on_timeout:
            if not self._waitLock():
                raise RuntimeError('"%s" could not aquire lock within %d sec'
                                   % (self._owner, self._timeout))
        else:
            while True:
                if self._waitLock():
                    break
                print('"%s" is waiting for "%s" and is getting bored...'
                      % (self._owner, self._lock.owner))
        self._lock.owner = self._owner

    def release(self):
        self._lock._release()

    def _waitLock(self):
        with self._lock.cond:
            _current_t = _start_t = time.time()
            while _current_t < _start_t + self._timeout:
                if self._lock.lock.acquire(False):
                    return True
                else:
                    self._lock.cond.wait(self._timeout - _current_t + _start_t)
                    _current_t = time.time()
        return False

Чтобы потоки действительно не мешали и не ожидали получить уведомление как можно скорее, я написал небольшой многопоточный тест, в котором суммируется время, необходимое для запуска всех потоков:

def test_lock_guard():
    import random

    def locking_thread_fn(name, lock, duration, timeout):
        with timeout_lock(name, lock, timeout=timeout):
            print('%x: "%s" begins to work..' % (threading.get_ident(), name))
            time.sleep(duration)
            print('%x: "%s" finished' % (threading.get_ident(), name))

    _lock = timeout_lock.lock()

    _threads = []
    _total_d = 0
    for i in range(3):
        _d = random.random() * 3
        _to = random.random() * 2
        _threads.append(threading.Thread(
            target=locking_thread_fn, args=('thread%d' % i, _lock, _d, _to)))
        _total_d += _d

    _t = time.time()

    for t in _threads: t.start()
    for t in _threads: t.join()

    _t = time.time() - _t

    print('duration: %.2f sec / expected: %.2f (%.1f%%)'
          % (_t, _total_d, 100 / _total_d * _t))

Выход:

7f940fc2d700: "thread0" begins to work..
"thread2" is waiting for "thread0" and is getting bored...
"thread2" is waiting for "thread0" and is getting bored...
"thread2" is waiting for "thread0" and is getting bored...
7f940fc2d700: "thread0" finished
7f940f42c700: "thread1" begins to work..
"thread2" is waiting for "thread1" and is getting bored...
"thread2" is waiting for "thread1" and is getting bored...
7f940f42c700: "thread1" finished
"thread2" is waiting for "None" and is getting bored...
7f940ec2b700: "thread2" begins to work..
7f940ec2b700: "thread2" finished
duration: 5.20 sec / expected: 5.20 (100.1%)