Подтвердить что ты не робот

Включить функции с обратным вызовом в генераторы Python?

Функция минимизации Scipy (просто для использования в качестве примера) имеет возможность добавления функции обратного вызова на каждом шаге. Поэтому я могу сделать что-то вроде

def my_callback(x):
    print x
scipy.optimize.fmin(func, x0, callback=my_callback)

Есть ли способ использовать функцию обратного вызова для создания версии генератора fmin, чтобы я мог это сделать,

for x in my_fmin(func,x0):
    print x

Кажется, что это возможно с некоторой комбинацией доходностей и отправлений, но я могу все придумать.

4b9b3361

Ответ 1

Как указано в комментариях, вы можете сделать это в новом потоке, используя Queue. Недостатком является то, что вам все равно нужно каким-то образом получить доступ к окончательному результату (что fmin возвращает в конце). В моем примере ниже используется дополнительный обратный вызов, чтобы что-то с ним делать (другим вариантом было бы просто дать его, хотя ваш код вызова должен был бы различать результаты итерации конечных результатов):

from thread import start_new_thread
from Queue import Queue

def my_fmin(func, x0, end_callback=(lambda x:x), timeout=None):

    q = Queue() # fmin produces, the generator consumes
    job_done = object() # signals the processing is done

    # Producer
    def my_callback(x):
        q.put(x)
    def task():
        ret = scipy.optimize.fmin(func,x0,callback=my_callback)
        q.put(job_done)
        end_callback(ret) # "Returns" the result of the main call

    # Starts fmin in a new thread
    start_new_thread(task,())

    # Consumer
    while True:
        next_item = q.get(True,timeout) # Blocks until an input is available
        if next_item is job_done:
            break
        yield next_item

Обновление:, чтобы заблокировать выполнение следующей итерации до тех пор, пока потребитель не завершит обработку последнего, также необходимо использовать task_done и join.

    # Producer
    def my_callback(x):
        q.put(x)
        q.join() # Blocks until task_done is called

    # Consumer
    while True:
        next_item = q.get(True,timeout) # Blocks until an input is available
        if next_item is job_done:
            break
        yield next_item
        q.task_done() # Unblocks the producer, so a new iteration can start

Обратите внимание, что maxsize=1 не требуется, так как новый элемент не будет добавлен в очередь до тех пор, пока последний не будет использован.

Обновление 2: Также обратите внимание, что, если все элементы в конечном итоге не будут получены этим генератором, созданный поток будет блокирован (он будет блокироваться навсегда, и его ресурсы никогда не будут выпущены). Производитель ждет в очереди, и поскольку он хранит ссылку на эту очередь, он никогда не будет возвращен gc, даже если потребитель. Затем очередь станет недоступной, поэтому никто не сможет освободить блокировку.

Чистое решение для этого неизвестно, если возможно вообще (поскольку оно будет зависеть от конкретной функции, используемой в месте fmin). Обходной путь можно сделать с помощью timeout, если производитель вызывает исключение, если put блокирует слишком долго:

    q = Queue(maxsize=1)

    # Producer
    def my_callback(x):
        q.put(x)
        q.put("dummy",True,timeout) # Blocks until the first result is retrieved
        q.join() # Blocks again until task_done is called

    # Consumer
    while True:
        next_item = q.get(True,timeout) # Blocks until an input is available
        q.task_done()                   # (one "task_done" per "get")
        if next_item is job_done:
            break
        yield next_item
        q.get() # Retrieves the "dummy" object (must be after yield)
        q.task_done() # Unblocks the producer, so a new iteration can start

Ответ 2

Генератор в качестве сопрограммы (без резьбы)

Пусть FakeFtp с функцией retrbinary использует callback, вызываемый при каждом успешном чтении фрагмента данных:

class FakeFtp(object):
    def __init__(self):
        self.data = iter(["aaa", "bbb", "ccc", "ddd"])

    def login(self, user, password):
        self.user = user
        self.password = password

    def retrbinary(self, cmd, cb):
        for chunk in self.data:
            cb(chunk)

Недостатком использования простой функции обратного вызова является то, что она вызывается повторно, и функция обратного вызова не может легко сохранять контекст между вызовами.

Следующий код определяет генератор process_chunks, который сможет получать порции данных один за другим и обрабатывать их. В отличие от простого обратного вызова, здесь мы можем сохранить всю обработку внутри одной функции без потери контекста.

from contextlib import closing
from itertools import count


def main():
    processed = []

    def process_chunks():
        for i in count():
            try:
                # (repeatedly) get the chunk to process
                chunk = yield
            except GeneratorExit:
                # finish_up
                print("Finishing up.")
                return
            else:
                # Here process the chunk as you like
                print("inside coroutine, processing chunk:", i, chunk)
                product = "processed({i}): {chunk}".format(i=i, chunk=chunk)
                processed.append(product)

    with closing(process_chunks()) as coroutine:
        # Get the coroutine to the first yield
        coroutine.next()
        ftp = FakeFtp()
        # next line repeatedly calls 'coroutine.send(data)'
        ftp.retrbinary("RETR binary", cb=coroutine.send)
        # each callback "jumps" to 'yield' line in 'process_chunks'

    print("processed result", processed)
    print("DONE")

Чтобы увидеть код в действии, поместите класс FakeFtp, код, показанный выше, и следующую строку:

main()

в один файл и назовите его:

$ python headsandtails.py
('inside coroutine, processing chunk:', 0, 'aaa')
('inside coroutine, processing chunk:', 1, 'bbb')
('inside coroutine, processing chunk:', 2, 'ccc')
('inside coroutine, processing chunk:', 3, 'ddd')
Finishing up.
('processed result', ['processed(0): aaa', 'processed(1): bbb', 'processed(2): ccc', 'processed(3): ddd'])
DONE

Как это устроено

processed = [], чтобы показать, что у генератора process_chunks не должно быть проблем с его внешним контекстом. Все обернуто в def main(): чтобы доказать, нет необходимости использовать глобальные переменные.

def process_chunks() - это ядро решения. Он может иметь входные параметры одним выстрелом (здесь не используются), но основной точкой, где он получает входные данные, является каждая строка yield возвращающая то, что кто-то отправляет через .send(data) в экземпляр этого генератора. Можно coroutine.send(chunk) но в этом примере это делается с помощью обратного вызова, ссылающегося на эту функцию callback.send.

Обратите внимание, что в реальном решении нет проблем с множественным yield в коде, они обрабатываются один за другим. Это может быть использовано, например, для чтения (и игнорирования) заголовка файла CSV, а затем продолжить обработку записей с данными.

Мы могли бы создать экземпляр и использовать генератор следующим образом:

coroutine = process_chunks()
# Get the coroutine to the first yield
coroutine.next()

ftp = FakeFtp()
# next line repeatedly calls 'coroutine.send(data)'
ftp.retrbinary("RETR binary", cb=coroutine.send)
# each callback "jumps" to 'yield' line in 'process_chunks'

# close the coroutine (will throw the 'GeneratorExit' exception into the
# 'process_chunks' coroutine).
coroutine.close()

Реальный код использует contextlib контекста closing contextlib чтобы всегда вызывать coroutine.close().

Выводы

Это решение не предоставляет своего рода итератор для потребления данных в традиционном стиле "извне". С другой стороны, мы можем:

  • использовать генератор "изнутри"
  • сохранить всю итеративную обработку в одной функции без прерывания между обратными вызовами
  • при желании использовать внешний контекст
  • обеспечить полезные результаты снаружи
  • все это можно сделать без использования потоков

Кредиты: Решение в значительной степени вдохновлено SO ответом. Python FTP "чанк" итератор (без загрузки всего файла в память), написанный user2357112

Ответ 3

Концепция Используйте блокирующую очередь с maxsize=1 и моделью производителя/потребителя.

Выполняется обратный вызов, затем следующий вызов обратного вызова будет блокироваться в полной очереди.

Затем потребитель выдает значение из очереди, пытается получить другое значение и блокирует чтение.

Изготовителю разрешено нажать в очередь, промыть и повторить.

Использование:

def dummy(func, arg, callback=None):
  for i in range(100):
    callback(func(arg+i))

# Dummy example:
for i in Iteratorize(dummy, lambda x: x+1, 0):
  print(i)

# example with scipy:
for i in Iteratorize(scipy.optimize.fmin, func, x0):
   print(i)

Может использоваться как ожидаемый для итератора:

for i in take(5, Iteratorize(dummy, lambda x: x+1, 0)):
  print(i)

Класс итерации:

from thread import start_new_thread
from Queue import Queue

class Iteratorize:
  """ 
  Transforms a function that takes a callback 
  into a lazy iterator (generator).
  """
  def __init__(self, func, ifunc, arg, callback=None):
    self.mfunc=func
    self.ifunc=ifunc
    self.c_callback=callback
    self.q = Queue(maxsize=1)
    self.stored_arg=arg
    self.sentinel = object()

    def _callback(val):
      self.q.put(val)

    def gentask():
      ret = self.mfunc(self.ifunc, self.stored_arg, callback=_callback)
      self.q.put(self.sentinel)
      if self.c_callback:
        self.c_callback(ret)

    start_new_thread(gentask, ())

  def __iter__(self):
    return self

  def next(self):
    obj = self.q.get(True,None)
    if obj is self.sentinel:
     raise StopIteration 
    else:
      return obj

Возможно, с некоторой очисткой можно принять *args и **kwargs для обертываемой функции и/или обратного вызова окончательного результата.

Ответ 4

Решение для обработки неблокирующих обратных вызовов

Решение с использованием threading и queue довольно хорошее, высокопроизводительное и кроссплатформенное, возможно, лучшее.

Здесь я приведу это не слишком плохое решение, которое в основном предназначено для обработки неблокирующих обратных вызовов, например вызывается из родительской функции через threading.Thread(target=callback).start() или другими неблокирующими способами.

import pickle
import select
import subprocess

def my_fmin(func, x0):
    # open a process to use as a pipeline
    proc = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)

    def my_callback(x):
        # x might be any object, not only str, so we use pickle to dump it
        proc.stdin.write(pickle.dumps(x).replace(b'\n', b'\\n') + b'\n')
        proc.stdin.flush()

    from scipy import optimize
    optimize.fmin(func, x0, callback=my_callback)

    # this is meant to handle non-blocking callbacks, e.g. called somewhere 
    # through 'threading.Thread(target=callback).start()'
    while select.select([proc.stdout], [], [], 0)[0]:
        yield pickle.loads(proc.stdout.readline()[:-1].replace(b'\\n', b'\n'))

    # close the process
    proc.communicate()

Затем вы можете использовать функцию следующим образом:

# unfortunately, 'scipy.optimize.fmin' callback is blocking.
# so this example is just for showing how-to.
for x in my_fmin(lambda x: x**2, 3):
    print(x)

Хотя это решение кажется довольно простым и читабельным, оно не столь высокоэффективно, как решения threading и queue, поскольку:

  • Процессы намного тяжелее, чем потоки.
  • Передача данных через канал вместо памяти выполняется намного медленнее.

Кроме того, он не работает в Windows, поскольку модуль select в Windows может обрабатывать только сокеты, а не каналы и другие файловые дескрипторы.

Ответ 5

Как насчет

data = []
scipy.optimize.fmin(func,x0,callback=data.append)
for line in data:
    print line

Если нет, что именно вы хотите делать с данными генератора?

Ответ 6

Вариант ответа Фрица:

  • Поддерживает send, чтобы выбрать возвращаемое значение для обратного вызова
  • Поддерживает throw, чтобы выбрать исключение для обратного вызова
  • Поддерживает close, чтобы изящно завершить работу
  • Не вычисляет элемент очереди, пока он не будет запрошен

Полный код с тестами можно найти на github

import queue
import threading
import collections.abc

class generator_from_callback(collections.abc.Generator):
    def __init__(self, expr):
        """
        expr: a function that takes a callback
        """ 
        self._expr = expr
        self._done = False
        self._ready_queue = queue.Queue(1)
        self._done_queue = queue.Queue(1)
        self._done_holder = [False]

        # local to avoid reference cycles
        ready_queue = self._ready_queue
        done_queue = self._done_queue
        done_holder = self._done_holder

        def callback(value):
            done_queue.put((False, value))
            cmd, *args = ready_queue.get()
            if cmd == 'close':
                raise GeneratorExit
            elif cmd == 'send':
                return args[0]
            elif cmd == 'throw':
                raise args[0]

        def thread_func():
            try:
                cmd, *args = ready_queue.get()
                if cmd == 'close':
                    raise GeneratorExit
                elif cmd == 'send':
                    if args[0] is not None:
                        raise TypeError("can't send non-None value to a just-started generator")
                elif cmd == 'throw':
                    raise args[0]
                ret = expr(callback)
                raise StopIteration(ret)
            except BaseException as e:
                done_holder[0] = True
                done_queue.put((True, e))
        self._thread = threading.Thread(target=thread_func)
        self._thread.start()

    def __next__(self):
        return self.send(None)

    def send(self, value):
        if self._done_holder[0]:
            raise StopIteration
        self._ready_queue.put(('send', value))
        is_exception, val = self._done_queue.get()
        if is_exception:
            raise val
        else:
            return val

    def throw(self, exc):
        if self._done_holder[0]:
            raise StopIteration
        self._ready_queue.put(('throw', exc))
        is_exception, val = self._done_queue.get()
        if is_exception:
            raise val
        else:
            return val

    def close(self):
        if not self._done_holder[0]:
            self._ready_queue.put(('close',))
        self._thread.join()

    def __del__(self):
        self.close()

Который работает как:

In [3]: def callback(f):
   ...:     ret = f(1)
   ...:     print("gave 1, got {}".format(ret))
   ...:     f(2)
   ...:     print("gave 2")
   ...:     f(3)
   ...:

In [4]: i = generator_from_callback(callback)

In [5]: next(i)
Out[5]: 1

In [6]: i.send(4)
gave 1, got 4
Out[6]: 2

In [7]: next(i)
gave 2, got None
Out[7]: 3

In [8]: next(i)
StopIteration

Для scipy.optimize.fmin вы должны использовать generator_from_callback(lambda c: scipy.optimize.fmin(func, x0, callback=c))