Подтвердить что ты не робот

Безопасен ли фильтр

У меня есть поток, который обновляет список под названием l. Правильно ли я говорю, что это поточно-безопасное выполнение следующего из другого потока?

filter(lambda x: x[0] == "in", l)

Если это не безопасный поток, то это правильный подход:

import threading
import time
import Queue

class Logger(threading.Thread):
    def __init__(self, log):
        super(Logger, self).__init__()
        self.log = log
        self.data = []
        self.finished = False
        self.data_lock = threading.Lock()

    def run(self):
        while not self.finished:
            try:
                with self.data_lock: 
                    self.data.append(self.log.get(block=True, timeout=0.1))
            except Queue.Empty:
                pass

    def get_data(self, cond):
        with self.data_lock: 
            d = filter(cond, self.data)      
        return d 

    def stop(self):
        self.finished = True
        self.join()  
        print("Logger stopped")

где метод get_data(self, cond) используется для извлечения небольшого подмножества данных в self.data безопасным потоком.

4b9b3361

Ответ 1

Во-первых, чтобы ответить на ваш вопрос в заголовке: filter - это просто функция. Следовательно, его безопасность потока будет опираться на структуру данных, с которой вы ее используете.

Как уже отмечалось в комментариях, сами операции с списками являются потокобезопасными в CPython и защищены GIL, но это, возможно, только деталь реализации CPython, на которую вы не должны положиться. Даже если вы можете положиться на это, безопасность потоков некоторых из их операций, вероятно, не означает вид безопасности потока, который вы имеете в виду:

Проблема состоит в том, что итерация по последовательности с filter вообще не является атомной операцией. Последовательность может быть изменена во время итерации. В зависимости от структуры данных, лежащей в основе вашего итератора, это может вызвать более или менее странные эффекты. Одним из способов преодоления этой проблемы является повторение копии последовательности, созданной с помощью одного атомарного действия. Самый простой способ сделать это для стандартных последовательностей, таких как tuple, list, string, заключается в следующем:

filter(lambda x: x[0] == "in", l[:])

Помимо этого, не обязательно быть потокобезопасным для других типов данных, есть одна проблема с этим: это только мелкая копия. Поскольку элементы списка, похоже, также похожи на список, другой поток может параллельно del l[1000][:] удалять один из внутренних списков (на которые также указывают и ваши мелкие копии). Это приведет к отказу вашего фильтра с помощью IndexError.

Все, что сказал, не стыдно использовать блокировку для защиты доступа к вашему списку, и я определенно рекомендую его. В зависимости от того, как ваши данные изменяются и как вы работаете с возвращенными данными, может быть даже разумно глубоко скопировать элементы, удерживая блокировку и вернуть эти копии. Таким образом, вы можете гарантировать, что после возвращения условие фильтра не будет внезапно изменяться для возвращаемых элементов.

Wrt. ваш код Logger: я не уверен на 100%, как вы планируете использовать это, и если вам необходимо запустить несколько потоков в одной очереди и join их. Мне кажется, что вы никогда не используете Queue.task_done() (при условии, что его self.log является Queue). Также ваш опрос в очереди потенциально расточительный. Если вам не нужен join потока, я бы предложил по крайней мере включить сбор блокировки:

class Logger(threading.Thread):
    def __init__(self, log):
        super(Logger, self).__init__()
        self.daemon = True
        self.log = log
        self.data = []
        self.data_lock = threading.Lock()

    def run(self):
        while True:
            l = self.log.get()  # thread will sleep here indefinitely
            with self.data_lock: 
                self.data.append(l)
            self.log.task_done()

    def get_data(self, cond):
        with self.data_lock: 
            d = filter(cond, self.data)
            # maybe deepcopy d here
        return d

Внешне вы все равно можете сделать log.join(), чтобы убедиться, что все элементы очереди log обработаны.

Ответ 2

Если один поток записывает в список, а другой поток читает этот список, эти два должны быть синхронизированы. Для этого не имеет значения, использует ли читатель filter(), индекс или итерацию или использует ли автор append() или любой другой метод.

В вашем коде вы достигнете необходимой синхронизации с помощью threading.Lock. Поскольку вы получаете доступ только к списку в контексте with self.data_lock, доступ является взаимоисключающим.

Таким образом, ваш код формально корректен в отношении обработки списка между потоками. Но:

  • Вы получаете доступ к self.finished без блокировки, что является проблематичным. Присвоение этому члену изменит self, то есть сопоставление объекта с соответствующими членами, так что это должно быть синхронизировано. Эффективно это не повредит, потому что True и False являются глобальными константами, в худшем случае у вас будет небольшая задержка между настройкой состояния в одном потоке и наблюдением состояния в другом. Он остается плохим, потому что он формирует привычку.
  • Как правило, при использовании блокировки всегда сохраняется документ, защищающий эту блокировку. Кроме того, документ, к которому осуществляется доступ к объекту, через какой поток. Тот факт, что self.finished является общим и требует синхронизации, был бы очевиден. Кроме того, визуальное различие между публичными функциями и данными и частными (начиная с _underscore, см. PEP 8) помогает отслеживать это. Это также помогает другим читателям.
  • Аналогичная проблема - ваш базовый класс. В общем, наследование от threading.Thread - плохая идея. Скорее, включите экземпляр класса потока и дайте ему функцию, подобную self._main_loop для запуска. Причина в том, что вы говорите, что ваш Logger - это Thread, и что все его публичные члены базового класса также являются публичными членами вашего класса, что, вероятно, намного шире интерфейса, чем то, что вы намеревались.
  • Вы не должны блокировать блокировку. В вашем коде вы блокируете в self.log.get(block=True, timeout=0.1) блокировку мьютекса. В то время, даже если ничего не происходит, ни один другой поток не сможет позвонить и завершить вызов get_data(). На самом деле есть только крошечное окно между разблокировкой мьютекса и блокировкой его снова, когда вызывающему абоненту get_data() не нужно ждать, что очень плохо для производительности. Я даже мог предположить, что ваш вопрос мотивирован очень плохим исполнением, которое это вызывает. Вместо этого вызовите log.get(..) без блокировки, он не нужен. Затем, удерживая блокировку, добавьте данные в self.data и проверьте self.finished.