Подтвердить что ты не робот

Являются ли генераторы Threadsafe?

У меня есть многопоточная программа, где я создаю функцию генератора, а затем передаю ее новым потокам. Я хочу, чтобы он был общим/глобальным по природе, поэтому каждый поток может получить следующее значение от генератора.

Можно ли использовать генератор, как это, или я буду сталкиваться с проблемами/условиями доступа к совместно используемому генератору из нескольких потоков?

Если нет, есть ли лучший способ подойти к проблеме? Мне нужно что-то, что будет циклически перебирать список и вызывать следующее значение для того, что нить вызывает его.

4b9b3361

Ответ 1

Это не потокобезопасное; одновременные вызовы могут чередоваться и помешать локальным переменным.

Общим подходом является использование шаблона Master-Slave (теперь он называется шаблоном фермера-работника на ПК). Создайте третий поток, который генерирует данные, и добавьте очередь между ведущим и подчиненными устройствами, где ведомые будут читать из очереди, и мастер напишет на него. Стандартный модуль очереди обеспечивает необходимую безопасность потока и организует блокировку ведущего устройства, пока ведомые устройства не будут готовы читать больше данных.

Ответ 2

Отредактировано для добавления контрольного показателя ниже.

Вы можете обернуть генератор с помощью блокировки. Например,

import threading
class LockedIterator(object):
    def __init__(self, it):
        self.lock = threading.Lock()
        self.it = it.__iter__()

    def __iter__(self): return self

    def next(self):
        self.lock.acquire()
        try:
            return self.it.next()
        finally:
            self.lock.release()

gen = [x*2 for x in [1,2,3,4]]
g2 = LockedIterator(gen)
print list(g2)

В моей системе блокировка занимает 50 мс, а в очереди - 350 мс. Очередь полезна, когда у вас действительно есть очередь; например, если у вас есть входящие HTTP-запросы, и вы хотите поставить их в очередь для обработки рабочими потоками. (Это не соответствует модели итератора Python - после завершения итератора элементов). Если у вас действительно есть итератор, LockedIterator - это более быстрый и простой способ сделать его потокобезопасным.

from datetime import datetime
import threading
num_worker_threads = 4

class LockedIterator(object):
    def __init__(self, it):
        self.lock = threading.Lock()
        self.it = it.__iter__()

    def __iter__(self): return self

    def next(self):
        self.lock.acquire()
        try:
            return self.it.next()
        finally:
            self.lock.release()

def test_locked(it):
    it = LockedIterator(it)
    def worker():
        try:
            for i in it:
                pass
        except Exception, e:
            print e
            raise

    threads = []
    for i in range(num_worker_threads):
        t = threading.Thread(target=worker)
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

def test_queue(it):
    from Queue import Queue
    def worker():
        try:
            while True:
                item = q.get()
                q.task_done()
        except Exception, e:
            print e
            raise

    q = Queue()
    for i in range(num_worker_threads):
         t = threading.Thread(target=worker)
         t.setDaemon(True)
         t.start()

    t1 = datetime.now()

    for item in it:
        q.put(item)

    q.join()

start_time = datetime.now()
it = [x*2 for x in range(1,10000)]

test_locked(it)
#test_queue(it)
end_time = datetime.now()
took = end_time-start_time
print "took %.01f" % ((took.seconds + took.microseconds/1000000.0)*1000)

Ответ 3

Нет, они не являются потокобезопасными. Вы можете найти интересную информацию о генераторах и многопоточности в:

http://www.dabeaz.com/generators/Generators.pdf

Ответ 4

Это зависит от используемой реализации python. В CPython GIL делает все операции над объектами python потокобезопасными, поскольку только один поток может выполнять код в любой момент времени.

http://en.wikipedia.org/wiki/Global_Interpreter_Lock