Подтвердить что ты не робот

Python скрученный: итераторы и уроки/inlineCallbacks

Folks, Я полностью смущен, поэтому, возможно, я даже не задаю вопросы правильно, но здесь говорится:

У меня есть скрученное приложение, использующее inlineCallbacks. Теперь мне нужно определить итератор, который будет означать, что генератор возвращается вызывающему. Тем не менее, итератор не может быть inlineCallbacks украшены, не так ли? Если нет, то как я это делаю, я кодирую что-то вроде этого.

Просто чтобы уточнить: цель process_loop должна быть вызвана каждый, скажем, 5 секунд, он может обрабатывать только ОДИН кусок, скажем 10, а затем он должен отпустить. Однако, чтобы знать, что кусок 10 (хранится в кэше, который является dict dict), ему нужно вызвать функцию, которая возвращает отложенную.

@inlineCallbacks ### can\'t have inlineCallbacks here, right?
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield (call func here which returns deferred)
        if result is True:
            for k,v in cachedvalue.items():
                yield cachename, k, v

@inlineCallbacks
def process_chunk(myiter, num):
    try:
        for i in xrange(num):
            nextval = myiter.next()
            yield some_processing(nextval)
        returnValue(False)
    except StopIteration:
        returnValue(True)

@inlineCallbacks
def process_loop(cached):
    myiter = cacheiter(cached)
    result = yield process_chunk(myiter, 10)
    if not result:
        print 'More left'
        reactor.callLater(5, process_loop, cached)
    else:
        print 'All done'
4b9b3361

Ответ 1

Вы правы, что не можете выразить то, что хотите выразить в cacheiter. Декоратор inlineCallbacks не позволит вам иметь функцию, которая возвращает итератор. Если вы украшаете функцию с ней, то результатом будет функция, которая всегда возвращает Deferred. Это для чего.

Отчасти это затрудняет то, что итераторы не работают с асинхронным кодом. Если есть Отсрочка, участвующая в создании элементов вашего итератора, тогда элементы, которые выходят из вашего итератора, сначала будут отложены.

Вы можете сделать что-то подобное, чтобы учесть это:

@inlineCallbacks
def process_work():
    for element_deferred in some_jobs:
        element = yield element_deferred
        work_on(element)

Это может работать, но выглядит особенно странно. Поскольку генераторы могут уступать только своему вызывающему абоненту (не, например, вызывающему абоненту), тотератор some_jobs ничего не может поделать; только код, лексически расположенный в пределах process_work, может привести к отложенному трамплину с трафиком inlineCallbacks для ожидания.

Если вы не возражаете против этого шаблона, тогда мы могли бы визуализировать ваш код, написанный примерно так:

from twisted.internet.task import deferLater
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet import reactor

class cacheiter(object):
    def __init__(self, cached):
        self._cached = iter(cached.items())
        self._remaining = []

    def __iter__(self):
        return self


    @inlineCallbacks
    def next(self):
        # First re-fill the list of synchronously-producable values if it is empty
        if not self._remaining:
            for name, value in self._cached:
                # Wait on this Deferred to determine if this cache item should be included
                if (yield check_condition(name, value)):
                    # If so, put all of its values into the value cache so the next one
                    # can be returned immediately next time this method is called.
                    self._remaining.extend([(name, k, v) for (k, v) in value.items()])

        # Now actually give out a value, if there is one.
        if self._remaining:
            returnValue(self._remaining.pop())

        # Otherwise the entire cache has been visited and the iterator is complete.
        # Sadly we cannot signal completion with StopIteration, because the iterator
        # protocol isn't going to add an errback to this Deferred and check for
        # StopIteration.  So signal completion with a simple None value.
        returnValue(None)


@inlineCallbacks
def process_chunk(myiter, num):
    for i in xrange(num):
        nextval = yield myiter.next()
        if nextval is None:
            # The iterator signaled completion via the special None value.
            # Processing is complete.
            returnValue(True)
        # Otherwise process the value.
        yield some_processing(nextval)

    # Indicate there is more processing to be done.
    returnValue(False)


def sleep(sec):
    # Simple helper to delay asynchronously for some number of seconds.
    return deferLater(reactor, sec, lambda: None)


@inlineCallbacks
def process_loop(cached):
    myiter = cacheiter(cached)
    while True:
        # Loop processing 10 items from myiter at a time, until process_chunk signals
        # there are no values left.
        result = yield process_chunk(myiter, 10)
        if result:
            print 'All done'
            break

        print 'More left'
        # Insert the 5 second delay before starting on the next chunk.
        yield sleep(5)

d = process_loop(cached)

Другой подход, который вы могли бы использовать, заключается в использовании twisted.internet.task.cooperate. cooperate принимает итератор и потребляет его, полагая, что его потребление потенциально дорогостоящее, и расщепление работы на нескольких итерациях реактора. Принимая определение cacheiter сверху:

from twisted.internet.task import cooperate

def process_loop(cached):
    finished = []

    def process_one(value):
        if value is None:
            finished.append(True)
        else:
            return some_processing(value)

    myiter = cacheiter(cached)

    while not finished:
        value_deferred = myiter.next()
        value_deferred.addCallback(process_one)
        yield value_deferred

task = cooperate(process_loop(cached))
d = task.whenDone()

Ответ 2

Я думаю, вы пытаетесь это сделать:

@inlineCallbacks
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield some_deferred() # some deferred you'd like evaluated
        if result is True:
            # here you want to return something, so you have to use returnValue
            # the generator you want to return can be written as a generator expression
            gen = ((cachename, k, v) for k,v in cachedvalue.items())
            returnValue(gen)

Когда ген xp не может выразить то, что вы пытаетесь вернуть, вы можете написать закрытие:

@inlineCallbacks
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield some_deferred()
        if result is True:
            # define the generator, saving the current values of the cache
            def gen(cachedvalue=cachedvalue, cachename=cachename):
                for k,v in cachedvalue.items():
                    yield cachename, k, v
            returnValue(gen()) # return it

Ответ 3

Попробуйте написать свой итератор как DeferredGenerator.