Подтвердить что ты не робот

JSON-кодирование очень длинных итераторов

Я пишу веб-службу, которая возвращает объекты, содержащие очень длинные списки, закодированные в JSON. Конечно, мы хотим использовать итераторы, а не списки Python, чтобы мы могли передавать объекты из базы данных; к сожалению, кодер JSON в стандартной библиотеке (json.JSONEncoder) принимает только списки и кортежи, которые должны быть преобразованы в списки JSON (хотя _iterencode_list выглядит так, как будто он действительно работает на любом итерабельном).

Docstrings предлагают переопределить значение по умолчанию для преобразования объекта в список, но это означает, что мы теряем преимущества потоковой передачи. Раньше мы переопределяли частный метод, но (как и следовало ожидать), который прерывался, когда кодер был реорганизован.

Каков наилучший способ сериализации итераторов как списки JSON в Python потоковым способом?

4b9b3361

Ответ 1

Мне нужно именно это. Первый подход был отменен методом JSONEncoder.iterencode(). Однако это не сработает, потому что, как только итератор не заполнен, внутренняя часть какой-либо функции _iterencode() берет верх.

После некоторого изучения кода я нашел очень хакерское решение, но оно работает. Только Python 3, но я уверен, что такая же магия возможна с помощью python 2 (просто других имен магических методов):

import collections.abc
import json
import itertools
import sys
import resource
import time
starttime = time.time()
lasttime = None


def log_memory():
    if "linux" in sys.platform.lower():
        to_MB = 1024
    else:
        to_MB = 1024 * 1024
    print("Memory: %.1f MB, time since start: %.1f sec%s" % (
        resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / to_MB,
        time.time() - starttime,
        "; since last call: %.1f sec" % (time.time() - lasttime) if lasttime
        else "",
    ))
    globals()["lasttime"] = time.time()


class IterEncoder(json.JSONEncoder):
    """
    JSON Encoder that encodes iterators as well.
    Write directly to file to use minimal memory
    """
    class FakeListIterator(list):
        def __init__(self, iterable):
            self.iterable = iter(iterable)
            try:
                self.firstitem = next(self.iterable)
                self.truthy = True
            except StopIteration:
                self.truthy = False

        def __iter__(self):
            if not self.truthy:
                return iter([])
            return itertools.chain([self.firstitem], self.iterable)

        def __len__(self):
            raise NotImplementedError("Fakelist has no length")

        def __getitem__(self, i):
            raise NotImplementedError("Fakelist has no getitem")

        def __setitem__(self, i):
            raise NotImplementedError("Fakelist has no setitem")

        def __bool__(self):
            return self.truthy

    def default(self, o):
        if isinstance(o, collections.abc.Iterable):
            return type(self).FakeListIterator(o)
        return super().default(o)

print(json.dumps((i for i in range(10)), cls=IterEncoder))
print(json.dumps((i for i in range(0)), cls=IterEncoder))
print(json.dumps({"a": (i for i in range(10))}, cls=IterEncoder))
print(json.dumps({"a": (i for i in range(0))}, cls=IterEncoder))


log_memory()
print("dumping 10M numbers as incrementally")
with open("/dev/null", "wt") as fp:
    json.dump(range(10000000), fp, cls=IterEncoder)
log_memory()
print("dumping 10M numbers built in encoder")
with open("/dev/null", "wt") as fp:
    json.dump(list(range(10000000)), fp)
log_memory()

Результаты:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[]
{"a": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]}
{"a": []}
Memory: 8.4 MB, time since start: 0.0 sec
dumping 10M numbers as incrementally
Memory: 9.0 MB, time since start: 8.6 sec; since last call: 8.6 sec
dumping 10M numbers built in encoder
Memory: 395.5 MB, time since start: 17.1 sec; since last call: 8.5 sec

Ясно, что IterEncoder не нуждается в запоминающем устройстве для хранения 10M int, сохраняя при этом одну и ту же скорость кодирования.

(храбрый) трюк состоит в том, что _iterencode_list на самом деле не нуждается в каких-либо элементах списка. Он просто хочет знать, пустой ли список (__bool__), а затем получить его итератор. Однако он попадает только в этот код, когда isinstance(x, (list, tuple)) возвращает True. Поэтому я упаковываю итератор в список-подкласс, а затем отключая весь случайный доступ, получая первый элемент вперед, чтобы я знал, пуст он или нет, и возвращает итератор обратно. Затем метод default возвращает этот поддельный список в случае итератора.

Ответ 2

Сохраните это в файл модуля и импортируйте его или вставьте непосредственно в свой код.

'''
Copied from Python 2.7.8 json.encoder lib, diff follows:
@@ -331,6 +331,8 @@
                     chunks = _iterencode(value, _current_indent_level)
                 for chunk in chunks:
                     yield chunk
+        if first:
+            yield buf
         if newline_indent is not None:
             _current_indent_level -= 1
             yield '\n' + (' ' * (_indent * _current_indent_level))
@@ -427,12 +429,12 @@
             yield str(o)
         elif isinstance(o, float):
             yield _floatstr(o)
-        elif isinstance(o, (list, tuple)):
-            for chunk in _iterencode_list(o, _current_indent_level):
-                yield chunk
         elif isinstance(o, dict):
             for chunk in _iterencode_dict(o, _current_indent_level):
                 yield chunk
+        elif hasattr(o, '__iter__'):
+            for chunk in _iterencode_list(o, _current_indent_level):
+                yield chunk
         else:
             if markers is not None:
                 markerid = id(o)
'''
from json import encoder

def _make_iterencode(markers, _default, _encoder, _indent, _floatstr,
        _key_separator, _item_separator, _sort_keys, _skipkeys, _one_shot,
        ## HACK: hand-optimized bytecode; turn globals into locals
        ValueError=ValueError,
        basestring=basestring,
        dict=dict,
        float=float,
        id=id,
        int=int,
        isinstance=isinstance,
        list=list,
        long=long,
        str=str,
        tuple=tuple,
    ):

    def _iterencode_list(lst, _current_indent_level):
        if not lst:
            yield '[]'
            return
        if markers is not None:
            markerid = id(lst)
            if markerid in markers:
                raise ValueError("Circular reference detected")
            markers[markerid] = lst
        buf = '['
        if _indent is not None:
            _current_indent_level += 1
            newline_indent = '\n' + (' ' * (_indent * _current_indent_level))
            separator = _item_separator + newline_indent
            buf += newline_indent
        else:
            newline_indent = None
            separator = _item_separator
        first = True
        for value in lst:
            if first:
                first = False
            else:
                buf = separator
            if isinstance(value, basestring):
                yield buf + _encoder(value)
            elif value is None:
                yield buf + 'null'
            elif value is True:
                yield buf + 'true'
            elif value is False:
                yield buf + 'false'
            elif isinstance(value, (int, long)):
                yield buf + str(value)
            elif isinstance(value, float):
                yield buf + _floatstr(value)
            else:
                yield buf
                if isinstance(value, (list, tuple)):
                    chunks = _iterencode_list(value, _current_indent_level)
                elif isinstance(value, dict):
                    chunks = _iterencode_dict(value, _current_indent_level)
                else:
                    chunks = _iterencode(value, _current_indent_level)
                for chunk in chunks:
                    yield chunk
        if first:
            yield buf
        if newline_indent is not None:
            _current_indent_level -= 1
            yield '\n' + (' ' * (_indent * _current_indent_level))
        yield ']'
        if markers is not None:
            del markers[markerid]

    def _iterencode_dict(dct, _current_indent_level):
        if not dct:
            yield '{}'
            return
        if markers is not None:
            markerid = id(dct)
            if markerid in markers:
                raise ValueError("Circular reference detected")
            markers[markerid] = dct
        yield '{'
        if _indent is not None:
            _current_indent_level += 1
            newline_indent = '\n' + (' ' * (_indent * _current_indent_level))
            item_separator = _item_separator + newline_indent
            yield newline_indent
        else:
            newline_indent = None
            item_separator = _item_separator
        first = True
        if _sort_keys:
            items = sorted(dct.items(), key=lambda kv: kv[0])
        else:
            items = dct.iteritems()
        for key, value in items:
            if isinstance(key, basestring):
                pass
            # JavaScript is weakly typed for these, so it makes sense to
            # also allow them.  Many encoders seem to do something like this.
            elif isinstance(key, float):
                key = _floatstr(key)
            elif key is True:
                key = 'true'
            elif key is False:
                key = 'false'
            elif key is None:
                key = 'null'
            elif isinstance(key, (int, long)):
                key = str(key)
            elif _skipkeys:
                continue
            else:
                raise TypeError("key " + repr(key) + " is not a string")
            if first:
                first = False
            else:
                yield item_separator
            yield _encoder(key)
            yield _key_separator
            if isinstance(value, basestring):
                yield _encoder(value)
            elif value is None:
                yield 'null'
            elif value is True:
                yield 'true'
            elif value is False:
                yield 'false'
            elif isinstance(value, (int, long)):
                yield str(value)
            elif isinstance(value, float):
                yield _floatstr(value)
            else:
                if isinstance(value, (list, tuple)):
                    chunks = _iterencode_list(value, _current_indent_level)
                elif isinstance(value, dict):
                    chunks = _iterencode_dict(value, _current_indent_level)
                else:
                    chunks = _iterencode(value, _current_indent_level)
                for chunk in chunks:
                    yield chunk
        if newline_indent is not None:
            _current_indent_level -= 1
            yield '\n' + (' ' * (_indent * _current_indent_level))
        yield '}'
        if markers is not None:
            del markers[markerid]

    def _iterencode(o, _current_indent_level):
        if isinstance(o, basestring):
            yield _encoder(o)
        elif o is None:
            yield 'null'
        elif o is True:
            yield 'true'
        elif o is False:
            yield 'false'
        elif isinstance(o, (int, long)):
            yield str(o)
        elif isinstance(o, float):
            yield _floatstr(o)
        elif isinstance(o, dict):
            for chunk in _iterencode_dict(o, _current_indent_level):
                yield chunk
        elif hasattr(o, '__iter__'):
            for chunk in _iterencode_list(o, _current_indent_level):
                yield chunk
        else:
            if markers is not None:
                markerid = id(o)
                if markerid in markers:
                    raise ValueError("Circular reference detected")
                markers[markerid] = o
            o = _default(o)
            for chunk in _iterencode(o, _current_indent_level):
                yield chunk
            if markers is not None:
                del markers[markerid]

    return _iterencode

encoder._make_iterencode = _make_iterencode

Ответ 3

Реальная потоковая передача не поддерживается поддержкой json, так как это также означает, что клиентское приложение также должно поддерживать потоковое вещание. Есть несколько java-библиотек, которые поддерживают чтение потоковых потоков json, но это не очень общий. Существуют также некоторые привязки python для yail, который является библиотекой C, которая поддерживает потоковое вещание.

Возможно, вы можете использовать Yaml вместо json. Yaml является надмножеством json. Он имеет лучшую поддержку для потоковой передачи с обеих сторон, и любое сообщение json будет оставаться в силе Yaml.

Но в вашем случае гораздо проще разделить поток объектов в поток отдельных сообщений json.

Смотрите также эту дискуссию, в которой клиентские библиотеки поддерживают потоковое вещание: Есть ли потоковый API для JSON?

Ответ 4

Не так просто. Протокол WSGI (который используется большинством людей) не поддерживает потоковое вещание. И серверы, которые его поддерживают, нарушают спецификацию.

И даже если вы используете несовместимый сервер, вам нужно использовать что-то вроде ijson. Также взгляните на этого парня, у которого была такая же проблема, как у вас http://www.enricozini.org/2011/tips/python-stream-json/

EDIT: Затем все сводится к клиенту, который, я полагаю, будет написан в Javascript (?). Но я не вижу, как вы могли бы создавать объекты javascript (или любого другого языка) из неполных JSON chuncks. Единственное, что я могу придумать, - это вручную разбить длинный JSON на более мелкие объекты JSON (на стороне сервера), а затем передать его один за другим клиенту. Но это требует веб-узлов, а не для HTTP-запросов и ответов без состояния. И если веб-службой вы имеете в виду REST API, то я думаю, что это не то, что вы хотите.