Подтвердить что ты не робот

Как я могу достоверно воспроизвести условия гонки в этом коде python?

Контекст

Недавно я опубликовал класс таймера для обзора обзора кода. У меня было ощущение, что есть ошибки concurrency, как я когда-то видел 1 unit test, но не смог воспроизвести сбой. Отсюда мой пост для проверки кода.

У меня появилась отличная обратная связь, в которой показаны различные условия гонки в коде. (Я думал) Я понял проблему и решение, но прежде чем делать какие-либо исправления, я хотел разоблачить ошибки с помощью unit test. Когда я попытался, я понял, что это сложно. Различные ответы об обмене стеками предполагали, что мне нужно будет контролировать выполнение потоков, чтобы выявить ошибку (-ы), и любое надуманное время не обязательно будет переносимым на другую машину. Это представляло собой много случайной сложности, кроме проблемы, которую я пытался решить.

Вместо этого я попытался использовать лучший инструмент статического анализа (SA) для python, PyLint, чтобы узнать, не выбрал ли он какую-либо ошибку, но это могло бы "т. Почему человек может найти ошибки в обзоре кода (по существу SA), но инструмент SA не смог?

Боясь попробовать заставить Valgrind работать с python (который звучал как як-бритье), я решил иметь bash для исправления ошибок без их воспроизведения первый. Теперь я в рассоле.

Вот код сейчас.

from threading import Timer, Lock
from time import time

class NotRunningError(Exception): pass
class AlreadyRunningError(Exception): pass


class KitchenTimer(object):
    '''
    Loosely models a clockwork kitchen timer with the following differences:
        You can start the timer with arbitrary duration (e.g. 1.2 seconds).
        The timer calls back a given function when time up.
        Querying the time remaining has 0.1 second accuracy.
    '''

    PRECISION_NUM_DECIMAL_PLACES = 1
    RUNNING = "RUNNING"
    STOPPED = "STOPPED"
    TIMEUP  = "TIMEUP"

    def __init__(self):
        self._stateLock = Lock()
        with self._stateLock:
            self._state = self.STOPPED
            self._timeRemaining = 0

    def start(self, duration=1, whenTimeup=None):
        '''
        Starts the timer to count down from the given duration and call whenTimeup when time up.
        '''
        with self._stateLock:
            if self.isRunning():
                raise AlreadyRunningError
            else:
                self._state = self.RUNNING
                self.duration = duration
                self._userWhenTimeup = whenTimeup
                self._startTime = time()
                self._timer = Timer(duration, self._whenTimeup)
                self._timer.start()

    def stop(self):
        '''
        Stops the timer, preventing whenTimeup callback.
        '''
        with self._stateLock:
            if self.isRunning():
                self._timer.cancel()
                self._state = self.STOPPED
                self._timeRemaining = self.duration - self._elapsedTime()
            else:
                raise NotRunningError()

    def isRunning(self):
        return self._state == self.RUNNING

    def isStopped(self):
        return self._state == self.STOPPED

    def isTimeup(self):
        return self._state == self.TIMEUP

    @property
    def timeRemaining(self):
        if self.isRunning():
            self._timeRemaining = self.duration - self._elapsedTime()
        return round(self._timeRemaining, self.PRECISION_NUM_DECIMAL_PLACES)

    def _whenTimeup(self):
        with self._stateLock:
            self._state = self.TIMEUP
            self._timeRemaining = 0
            if callable(self._userWhenTimeup):
                self._userWhenTimeup()

    def _elapsedTime(self):
        return time() - self._startTime

Вопрос

В контексте этого примера кода, как я могу разоблачить условия гонки, исправить их и доказать, что они исправлены?

Дополнительные очки

дополнительные точки для рамки тестирования, подходящие для других реализаций и проблем, а не специально для этого кода.

Вынос

Мой взнос заключается в том, что техническое решение для воспроизведения выявленных условий гонки состоит в том, чтобы контролировать синхронность двух потоков, чтобы обеспечить их выполнение в том порядке, в котором будет обнаружена ошибка. Важным моментом здесь является то, что они уже идентифицировали условия гонки. Лучший способ, который я нашел для определения условий гонки, - это перевести код на проверку кода и побудить больше экспертов анализировать его.

4b9b3361

Ответ 1

Традиционно, форсирование условий гонки в многопоточном коде выполняется с помощью семафоров, поэтому вы можете заставить поток ждать, пока другой поток не достигнет какого-либо краевого условия, прежде чем продолжить.

Например, ваш объект имеет некоторый код для проверки того, что start не вызывается, если объект уже запущен. Вы можете заставить это условие убедиться, что он ведет себя так, как ожидалось, делая что-то вроде этого:

  • запуск KitchenTimer
  • с блоком таймера на семафоре в рабочем состоянии
  • запуск одного и того же таймера в другом потоке
  • ловить AlreadyRunningError

Чтобы сделать это, вам может потребоваться расширить класс KitchenTimer. Формальные модульные тесты часто используют макетные объекты, которые определены для блокировки в критические моменты времени. Макетные объекты - это более важная тема, чем я могу здесь, но googling "python mock object" будет отображать много документации и множество реализаций на выбор.

Здесь вы можете заставить ваш код бросить AlreadyRunningError:

import threading

class TestKitchenTimer(KitchenTimer):

    _runningLock = threading.Condition()

    def start(self, duration=1, whenTimeUp=None):
        KitchenTimer.start(self, duration, whenTimeUp)
        with self._runningLock:
            print "waiting on _runningLock"
            self._runningLock.wait()

    def resume(self):
        with self._runningLock:
            self._runningLock.notify()

timer = TestKitchenTimer()

# Start the timer in a subthread. This thread will block as soon as
# it is started.
thread_1 = threading.Thread(target = timer.start, args = (10, None))
thread_1.start()

# Attempt to start the timer in a second thread, causing it to throw
# an AlreadyRunningError.
try:
    thread_2 = threading.Thread(target = timer.start, args = (10, None))
    thread_2.start()
except AlreadyRunningError:
    print "AlreadyRunningError"
    timer.resume()
    timer.stop()

Прочитав код, определите некоторые граничные условия, которые вы хотите проверить, а затем подумайте, где вам нужно будет приостановить таймер, чтобы заставить это условие возникнуть, и добавьте условия, семафоры, события и т.д., чтобы сделать это бывает. например что произойдет, если, как только таймер запускает обратный вызов whenTimeUp, другой поток пытается его остановить? Вы можете заставить это условие заставить таймер ждать, как только он введет _whenTimeUp:

import threading

class TestKitchenTimer(KitchenTimer):

    _runningLock = threading.Condition()

    def _whenTimeup(self):
        with self._runningLock:
            self._runningLock.wait()
        KitchenTimer._whenTimeup(self)

    def resume(self):
        with self._runningLock:
            self._runningLock.notify()

def TimeupCallback():
    print "TimeupCallback was called"

timer = TestKitchenTimer()

# The timer thread will block when the timer expires, but before the callback
# is invoked.
thread_1 = threading.Thread(target = timer.start, args = (1, TimeupCallback))
thread_1.start()
sleep(2)

# The timer is now blocked. In the parent thread, we stop it.
timer.stop()
print "timer is stopped: %r" % timer.isStopped()

# Now allow the countdown thread to resume.
timer.resume()

Подклассификация класса, который вы хотите протестировать, не является прекрасным способом инструментального тестирования: вам придется переопределить в основном все методы, чтобы проверить условия гонки в каждом из них, и в этот момент есть хороший аргумент, который нужно сделать, чтобы вы не тестировали исходный код. Вместо этого вы можете найти более чистым, чтобы семафоры располагались прямо в объекте KitchenTimer, но по умолчанию были инициализированы None, и ваши методы проверяют if testRunningLock is not None: перед тем, как получить или ждать блокировки. Затем вы можете заставить гонки на действительный код, который вы отправляете.

Некоторое чтение фреймворков Python, которые могут быть полезны. На самом деле, я не уверен, что mocks будет полезен при тестировании этого кода: он почти полностью автономный и не полагается на многие внешние объекты. Но макет учебников иногда затрагивает такие проблемы. Я не использовал ни одного из них, но документация на них как хорошее место для начала:

Ответ 2

Самое распространенное решение для проверки безопасного (не) безопасного кода - начать много потоков и надеяться на лучшее. Проблема, которую я, и я могу представить себе других, связана с тем, что она полагается на случайность и делает тесты "тяжелыми".

Как я натолкнулся на это некоторое время назад, я хотел пойти на точность вместо грубой силы. Результатом является кусок тестового кода, вызывающий условия гонки, позволяя нитям гонять шею.

Пример расистского кода

spam = []

def set_spam():
    spam[:] = foo()
    use(spam)

Если set_spam вызывается из нескольких потоков, существует условие гонки между модификацией и использованием spam. Попробуйте воспроизвести его последовательно.

Как вызвать расовые условия

class TriggeredThread(threading.Thread):
    def __init__(self, sequence=None, *args, **kwargs):
        self.sequence = sequence
        self.lock = threading.Condition()
        self.event = threading.Event()
        threading.Thread.__init__(self, *args, **kwargs)

    def __enter__(self):
        self.lock.acquire()
        while not self.event.is_set():
            self.lock.wait()
        self.event.clear()

    def __exit__(self, *args):
        self.lock.release()
        if self.sequence:
            next(self.sequence).trigger()

    def trigger(self):
        with self.lock:
            self.event.set()
            self.lock.notify()

Затем, чтобы продемонстрировать использование этого потока:

spam = []  # Use a list to share values across threads.
results = []  # Register the results.

def set_spam():
    thread = threading.current_thread()
    with thread:  # Acquires the lock.
        # Set 'spam' to thread name
        spam[:] = [thread.name]
    # Thread 'releases' the lock upon exiting the context.
    # The next thread is triggered and this thread waits for a trigger.
    with thread:
        # Since each thread overwrites the content of the 'spam'
        # list, this should only result in True for the last thread.
        results.append(spam == [thread.name])

threads = [
    TriggeredThread(name='a', target=set_spam),
    TriggeredThread(name='b', target=set_spam),
    TriggeredThread(name='c', target=set_spam)]

# Create a shifted sequence of threads and share it among the threads.
thread_sequence = itertools.cycle(threads[1:] + threads[:1])
for thread in threads:
    thread.sequence = thread_sequence

# Start each thread
[thread.start() for thread in threads]
# Trigger first thread.
# That thread will trigger the next thread, and so on.
threads[0].trigger()
# Wait for each thread to finish.
[thread.join() for thread in threads]
# The last thread 'has won the race' overwriting the value
# for 'spam', thus [False, False, True].
# If set_spam were thread-safe, all results would be true.
assert results == [False, False, True], "race condition triggered"
assert results == [True, True, True], "code is thread-safe"

Я думаю, что я достаточно подробно рассказал об этой конструкции, чтобы вы могли реализовать ее для своей собственной ситуации. Я думаю, что это отлично подходит для раздела "дополнительных очков":

дополнительные точки для рамки тестирования, подходящие для других реализаций и проблем, а не специально для этого кода.

Решение условий гонки

Общие переменные

Каждая проблема с резьбой решается в ней определенным образом. В приведенном выше примере я вызвал условие гонки, разделив значение по потокам. Подобные проблемы могут возникать при использовании глобальных переменных, таких как атрибут модуля. Ключом к решению таких проблем может быть использование локального хранилища потоков:

# The thread local storage is a global.
# This may seem weird at first, but it isn't actually shared among threads.
data = threading.local()
data.spam = []  # This list only exists in this thread.
results = []  # Results *are* shared though.

def set_spam():
    thread = threading.current_thread()
    # 'get' or set the 'spam' list. This actually creates a new list.
    # If the list was shared among threads this would cause a race-condition.
    data.spam = getattr(data, 'spam', [])
    with thread:
        data.spam[:] = [thread.name]
    with thread:
        results.append(data.spam == [thread.name])

# Start the threads as in the example above.

assert all(results)  # All results should be True.

Параллельные чтения/записи

Общей проблемой потоковой передачи является проблема одновременного чтения и/или записи нескольких потоков в держатель данных. Эта проблема решается путем реализации блокировки чтения и записи. Фактическая реализация блокировки чтения-записи может отличаться. Вы можете выбрать блокировку с первым чтением, блокировку с записью или просто в случайном порядке.

Я уверен, что есть примеры, описывающие такие методы блокировки. Я могу написать пример позже, поскольку это уже довольно длинный ответ.; -)

Примечания

Посмотрите документацию по потоковому модулю и немного экспериментируйте с ней. Поскольку каждая проблема с потоками различна, применяются различные решения.

В то время как на тему потоки, взгляните на Python GIL (Global Interpreter Lock). Важно отметить, что потоковая передача на самом деле не может быть лучшим подходом к оптимизации производительности (но это не ваша цель). Я нашел эту презентацию довольно неплохой: https://www.youtube.com/watch?v=zEaosS1U5qY

Ответ 3

Вы можете протестировать его, используя много потоков:

import sys, random, thread
def timeup():
    sys.stdout.write("Timer:: Up %f" % time())

def trdfunc(kt, tid):
    while True :
        sleep(1)
        if not kt.isRunning():
            if kt.start(1, timeup):
                sys.stdout.write("[%d]: started\n" % tid)
        else:
            if random.random() < 0.1:
                kt.stop()
                sys.stdout.write("[%d]: stopped\n" % tid)
        sys.stdout.write("[%d] remains %f\n" % ( tid, kt.timeRemaining))

kt = KitchenTimer()
kt.start(1, timeup)
for i in range(1, 100):
    thread.start_new_thread ( trdfunc, (kt, i) )
trdfunc(kt, 0)

Несколько проблемных проблем, которые я вижу:

  • Когда поток видит таймер как не запущенный и пытается его запустить, код обычно вызывает исключение из-за переключения контекста между проверить и начать. Я думаю, что создание исключения слишком велико. Или вы можете имеют атомную функцию testAndStart

  • Аналогичная проблема возникает при остановке. Вы можете реализовать testAndStop функция.

  • Даже этот код из функции timeRemaining:

    if self.isRunning():
       self._timeRemaining = self.duration - self._elapsedTime()
    

    Требуется какая-то атомичность, возможно, вам нужно захватить замок раньше тестирование isRunning.

Если вы планируете делиться этим классом между потоками, вам необходимо решить эти проблемы.

Ответ 4

В общем - это не жизнеспособное решение. Вы можете воспроизвести это условие гонки, используя отладчик (установите точки останова в некоторых местах кода, а затем, когда он попадает на одну из точек останова, - заморозить поток и запустить код до тех пор, пока он не достигнет другой точки останова, а затем заморозить этот поток и разморозить первый thread, вы можете чередовать выполнение потоков любым способом с помощью этой техники).

Проблема в том, что чем больше потоков и кода у вас есть, тем больше способов чередования побочных эффектов у них будет. Фактически - он будет расти экспоненциально. Нет никакого жизнеспособного решения, чтобы проверить его в целом. Это возможно только в некоторых простых случаях.

Решение этой проблемы хорошо известно. Напишите код, который знает о его побочных эффектах, контролирует побочные эффекты с помощью примитивов синхронизации, таких как блокировки, семафоры или очереди, или использует неизменяемые данные, если это возможно.

Возможно, более практичным способом является использование проверок времени выполнения для принудительного упорядочивания заказа. Например (псевдокод):

class RacyObject:
    def __init__(self):
        self.__cnt = 0
        ...

    def isReadyAndLocked(self):
        acquire_object_lock
            if self.__cnt % 2 != 0:
                # another thread is ready to start the Job
                return False
            if self.__is_ready:
                self.__cnt += 1
                return True
            # Job is in progress or doesn't ready yet
            return False
        release_object_lock

    def doJobAndRelease(self):
        acquire_object_lock
            if self.__cnt % 2 != 1:
                raise RaceConditionDetected("Incorrect order")
            self.__cnt += 1
            do_job()
        release_object_lock

Этот код генерирует исключение, если вы не проверяете isReadyAndLock перед вызовом doJobAndRelease. Это можно легко протестировать, используя только один поток.

obj = RacyObject()
...
# correct usage
if obj.isReadyAndLocked()
    obj.doJobAndRelease()