Подтвердить что ты не робот

Получить все элементы из очереди потоков

У меня есть один поток, который записывает результаты в очередь.

В другом потоке (GUI) я периодически (в событии IDLE) проверяю, есть ли результаты в очереди, например:

def queue_get_all(q):
    items = []
    while 1:
        try:
            items.append(q.get_nowait())
        except Empty, e:
            break
    return items

Это хороший способ сделать это?

Edit:

Я спрашиваю, потому что иногда ожидающая нить застревает в течение нескольких секунд, не вынимая новых Результаты.

"Застрявшая" проблема оказалась в том, что я выполнял обработку в обработчике событий бездействия, не убедившись, что такие события действительно генерируются, вызывая wx.WakeUpIdle, как рекомендуется.

4b9b3361

Ответ 1

Я был бы очень удивлен, если вызов get_nowait() вызвал паузу, не вернувшись, если список пуст.

Может быть, вы отправляете большое количество (может быть больших?) элементов между проверками, что означает, что принимающий поток имеет большой объем данных, чтобы вытащить из Queue? Вы можете попытаться ограничить число, которое вы получаете в одной партии:

def queue_get_all(q):
    items = []
    maxItemsToRetreive = 10
    for numOfItemsRetrieved in range(0, maxItemsToRetreive):
        try:
            if numOfItemsRetrieved == maxItemsToRetreive:
                break
            items.append(q.get_nowait())
        except Empty, e:
            break
    return items

Это ограничило бы принимающий поток одновременным вытягиванием до 10 элементов.

Ответ 2

Если вы всегда вытаскиваете все доступные элементы из очереди, есть ли реальная точка в использовании очереди, а не только список с блокировкой? то есть:

from __future__ import with_statement
import threading

class ItemStore(object):
    def __init__(self):
        self.lock = threading.Lock()
        self.items = []

    def add(self, item):
        with self.lock:
            self.items.append(item)

    def getAll(self):
        with self.lock:
            items, self.items = self.items, []
        return items

Если вы также потянете их по отдельности и используете поведение блокировки для пустых очередей, тогда вы должны использовать Queue, но ваш пример использования выглядит намного проще и может быть лучше обслуживаться вышеуказанным подходом.

[Edit2] Я пропустил тот факт, что вы просматриваете очередь из цикла ожидания, и из вашего обновления я вижу, что проблема не связана с утверждением, поэтому ниже подход не имеет отношения к вашей проблеме. Я оставил его на случай, если кто-нибудь найдет вариант блокировки этого полезного:

В тех случаях, когда вы хотите заблокировать, пока не получите хотя бы один результат, вы можете изменить приведенный выше код, чтобы дождаться, когда данные станут доступными, если сигнализировать поток производителя. Например.

class ItemStore(object):
    def __init__(self):
        self.cond = threading.Condition()
        self.items = []

    def add(self, item):
        with self.cond:
            self.items.append(item)
            self.cond.notify() # Wake 1 thread waiting on cond (if any)

    def getAll(self, blocking=False):
        with self.cond:
            # If blocking is true, always return at least 1 item
            while blocking and len(self.items) == 0:
                self.cond.wait()
            items, self.items = self.items, []
        return items

Ответ 3

Я думаю, что самый простой способ получить все элементы из очереди:

def get_all_queue_result(queue):

    result_list = []
    while not queue.empty():
        result_list.append(queue.get())

    return result_list

Ответ 4

Я вижу, что вы используете get_nowait(), который в соответствии с документацией "возвращает [s] элемент, если он доступен немедленно, иначе создайте пустое исключение"

Теперь вы выходите из цикла, когда генерируется пустое исключение. Таким образом, если в очереди нет результата, доступного в очереди, ваша функция возвращает список пустых элементов.

Есть ли причина, по которой вы не используете метод get()? Может случиться так, что get_nowait() терпит неудачу, потому что очередь обслуживает запрос put() в тот же самый момент.

Ответ 5

Если вы закончили запись в очередь, qsize должен сделать трюк, не требуя проверки очереди для каждой итерации.

responseList = []
for items in range(0, q.qsize()):
    responseList.append(q.get_nowait())