Подтвердить что ты не робот

Пример python для чтения нескольких сообщений protobuf из потока

Я работаю с данными из spinn3r, который состоит из нескольких разных сообщений protobuf, сериализованных в поток байтов:

http://code.google.com/p/spinn3r-client/wiki/Protostream

"Прототип представляет собой поток протокольных буферов, закодированных на проводе длиной до префикса, согласно спецификации буфера протокола Google. Поток состоит из трех частей: заголовок, полезная нагрузка и маркер хвоста.

Это похоже на довольно стандартный вариант использования protobufs. Фактически, основной дистрибутив protobuf обеспечивает CodedInputStream для С++ и Java. Но, похоже, что protobuf не предоставляет такой инструмент для python - "внутренние" инструменты не настроены для такого внешнего использования:

https://groups.google.com/forum/?fromgroups#!topic/protobuf/xgmUqXVsK-o

Итак... прежде чем я пойду и соберу вместе парсер python varint и инструменты для разбора потока разных типов сообщений: кто-нибудь знает какие-либо инструменты для этого?

Почему он отсутствует в protobuf? (Или я просто не могу его найти?)

Это похоже на большой пробел для protobuf, особенно по сравнению с инструментами эквивалентного финансирования для "транспорта" и "протокола". Правильно ли я просматриваю это?

4b9b3361

Ответ 1

Похоже, что код в другом ответе потенциально снят с здесь. Перед использованием этого файла проверьте лицензию, но мне удалось прочитать ее varint32, используя следующий код:

import sys
import myprotocol_pb2 as proto
import varint # (this is the varint.py file)

data = open("filename.bin", "rb").read() # read file as string
decoder = varint.decodeVarint32          # get a varint32 decoder
                                         # others are available in varint.py

next_pos, pos = 0, 0
while pos < len(data):
    msg = proto.Msg()                    # your message type
    next_pos, pos = decoder(data, pos)
    msg.ParseFromString(data[pos:pos + next_pos])

    # use parsed message

    pos += next_pos
print "done!"

Это очень простой код, предназначенный для загрузки сообщений одного типа с разделителями varint32, которые описывают следующий размер сообщения.


Обновить. Также возможно включить этот файл непосредственно из библиотеки protobuf, используя:

from google.protobuf.internal.decoder import _DecodeVarint32

Ответ 2

Я реализовал пакет small python для сериализации нескольких сообщений protobuf в поток и десериализации их из потока. Вы можете установить его на pip:

pip install pystream-protobuf

Здесь пример кода, в который записываются два списка сообщений protobuf в файл:

import stream

with stream.open("test.gam", "wb") as ostream:
    ostream.write(*objects_list)
    ostream.write(*another_objects_list)

а затем прочесть те же сообщения (например, сообщения выравнивания, определенные в vg_pb2.py) из потока:

import stream
import vg_pb2

alns_list = []
with stream.open("test.gam", "rb") as istream:
    for data in istream:
        aln = vg_pb2.Alignment()
        aln.ParseFromString(data)
        alns_list.append(aln)

Ответ 3

Это достаточно просто, что я вижу, почему, возможно, никто не потрудился сделать инструмент многократного использования:

'''
Parses multiple protobuf messages from a stream of spinn3r data
'''

import sys
sys.path.append('python_proto/src')
import spinn3rApi_pb2
import protoStream_pb2

data = open('8mny44bs6tYqfnofg0ELPg.protostream').read()

def _VarintDecoder(mask):
    '''Like _VarintDecoder() but decodes signed values.'''

    local_ord = ord
    def DecodeVarint(buffer, pos):
        result = 0
        shift = 0
        while 1:
            b = local_ord(buffer[pos])
            result |= ((b & 0x7f) << shift)
            pos += 1
            if not (b & 0x80):
                if result > 0x7fffffffffffffff:
                    result -= (1 << 64)
                    result |= ~mask
                else:
                    result &= mask
                    return (result, pos)
            shift += 7
            if shift >= 64:
                ## need to create (and also catch) this exception class...
                raise _DecodeError('Too many bytes when decoding varint.')
    return DecodeVarint

## get a 64bit varint decoder
decoder = _VarintDecoder((1<<64) - 1)

## get the three types of protobuf messages we expect to see
header    = protoStream_pb2.ProtoStreamHeader()
delimiter = protoStream_pb2.ProtoStreamDelimiter()
entry     = spinn3rApi_pb2.Entry()

## get the header
pos = 0
next_pos, pos = decoder(data, pos)
header.ParseFromString(data[pos:pos + next_pos])
## should check its contents

while 1:
    pos += next_pos
    next_pos, pos = decoder(data, pos)
    delimiter.ParseFromString(data[pos:pos + next_pos])

    if delimiter.delimiter_type == delimiter.END:
        break

    pos += next_pos
    next_pos, pos = decoder(data, pos)
    entry.ParseFromString(data[pos:pos + next_pos])
    print entry