Подтвердить что ты не робот

Протокол Buffer over socket в С++

Я пытаюсь изучить протокол Buffer (PB) на платформе Linux, а мой язык кодирования - С++. Я нашел примеры в онлайн-документах буфера протокола, но ничего особенного для сокета не посылал и не получал (или я пропустил его полностью:)). Поэтому я решил добавить сообщение "Длина до фактического сообщения" и отправить его через сокет. Я был бы признателен, если бы кто-нибудь мог предложить лучшее решение, чем то, что я планирую сделать, а также есть ли что-нибудь готовое в PB для создания таких пакетов.

Но у меня все еще возникает проблема на стороне сервера, где я должен декодировать пакет. Скажем, если клиент отправляет пакет из 10 байт, в котором первый 4 байта является длиной пакета; Но до декодирования пакета невозможно узнать длину. Так что даже если я прочитаю первые 4 байта, как я могу вывести значение с половинным считанным пакетом с использованием протокола Buffer.

4b9b3361

Ответ 1

Наконец-то я смог заставить его работать. Я размещаю здесь код, чтобы его можно было просмотреть и прокомментировать, а также, если кто-то хочет реализовать его в С++, этот фрагмент кода может помочь. Его потрепанный код, который я собирался сделать, чтобы заставить Protobuf работать в префиксном порядке. Я взял код клиентского сервера с какого-то сайта, который я не помню, и я его модифицировал для размещения protobuf. Здесь сервер сначала заглядывает в сокет и получает длину всего пакета, а затем выполняется фактическое чтение сокета, чтобы прочитать весь пакет. Могут быть миллионы способов сделать это, но для быстрого решения я сделал это таким образом. Но мне нужно найти лучший способ избежать 2 recv на пакеты, но в моем состоянии все сообщения имеют разный размер, так что это единственный способ, который я предполагаю.

Проточный файл

  message log_packet {
  required fixed64 log_time =1;
  required fixed32 log_micro_sec =2;
  required fixed32 sequence_no =3;
  required fixed32 shm_app_id =4;
  required string packet_id =5;
  required string log_level=6;
  required string log_msg=7;
  }

Протокол буфера Код клиента

#include <unistd.h>
#include "message.pb.h"
#include <iostream>
#include <google/protobuf/message.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>


using namespace google::protobuf::io;

using namespace std;
int main(int argv, char** argc){

/* Coded output stram */

log_packet payload ;

payload.set_log_time(10);
payload.set_log_micro_sec(10);
payload.set_sequence_no(1);
payload.set_shm_app_id(101);
payload.set_packet_id("TST");
payload.set_log_level("DEBUG");
payload.set_log_msg("What shall we say then");

cout<<"size after serilizing is "<<payload.ByteSize()<<endl;
int siz = payload.ByteSize()+4;
char *pkt = new char [siz];
google::protobuf::io::ArrayOutputStream aos(pkt,siz);
CodedOutputStream *coded_output = new CodedOutputStream(&aos);
coded_output->WriteVarint32(payload.ByteSize());
payload.SerializeToCodedStream(coded_output);

        int host_port= 1101;
        char* host_name="127.0.0.1";

        struct sockaddr_in my_addr;

        char buffer[1024];
        int bytecount;
        int buffer_len=0;

        int hsock;
        int * p_int;
        int err;

        hsock = socket(AF_INET, SOCK_STREAM, 0);
        if(hsock == -1){
                printf("Error initializing socket %d\n",errno);
                goto FINISH;
        }

        p_int = (int*)malloc(sizeof(int));
        *p_int = 1;

        if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )||
                (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){
                printf("Error setting options %d\n",errno);
                free(p_int);
                goto FINISH;
        }
        free(p_int);

        my_addr.sin_family = AF_INET ;
        my_addr.sin_port = htons(host_port);

        memset(&(my_addr.sin_zero), 0, 8);
        my_addr.sin_addr.s_addr = inet_addr(host_name);
        if( connect( hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){
                if((err = errno) != EINPROGRESS){
                        fprintf(stderr, "Error connecting socket %d\n", errno);
                        goto FINISH;
                }
        }




       for (int i =0;i<10000;i++){
          for (int j = 0 ;j<10;j++) {

                if( (bytecount=send(hsock, (void *) pkt,siz,0))== -1 ) {
                        fprintf(stderr, "Error sending data %d\n", errno);
                        goto FINISH;
                }
                printf("Sent bytes %d\n", bytecount);
                usleep(1);
         }
        }
        delete pkt;

FINISH:
        close(hsock);

}

Протокол буфера Код сервера

#include <fcntl.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <netinet/in.h>
#include <resolv.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <pthread.h>
#include "message.pb.h"
#include <iostream>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>

using namespace std;
using namespace google::protobuf::io;



void* SocketHandler(void*);

int main(int argv, char** argc){

        int host_port= 1101;

        struct sockaddr_in my_addr;

        int hsock;
        int * p_int ;
        int err;

        socklen_t addr_size = 0;
        int* csock;
        sockaddr_in sadr;
        pthread_t thread_id=0;

        hsock = socket(AF_INET, SOCK_STREAM, 0);
        if(hsock == -1){
                printf("Error initializing socket %d\n", errno);
                goto FINISH;
        }

        p_int = (int*)malloc(sizeof(int));
        *p_int = 1;

        if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )||
                (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){
                printf("Error setting options %d\n", errno);
                free(p_int);
                goto FINISH;
        }
        free(p_int);

        my_addr.sin_family = AF_INET ;
        my_addr.sin_port = htons(host_port);

        memset(&(my_addr.sin_zero), 0, 8);
        my_addr.sin_addr.s_addr = INADDR_ANY ;

        if( bind( hsock, (sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){
                fprintf(stderr,"Error binding to socket, make sure nothing else is listening on this port %d\n",errno);
                goto FINISH;
        }
        if(listen( hsock, 10) == -1 ){
                fprintf(stderr, "Error listening %d\n",errno);
                goto FINISH;
        }

        //Now lets do the server stuff

        addr_size = sizeof(sockaddr_in);

        while(true){
                printf("waiting for a connection\n");
                csock = (int*)malloc(sizeof(int));
                if((*csock = accept( hsock, (sockaddr*)&sadr, &addr_size))!= -1){
                        printf("---------------------\nReceived connection from %s\n",inet_ntoa(sadr.sin_addr));
                        pthread_create(&thread_id,0,&SocketHandler, (void*)csock );
                        pthread_detach(thread_id);
                }
                else{
                        fprintf(stderr, "Error accepting %d\n", errno);
                }
        }

FINISH:
;//oops
}

google::protobuf::uint32 readHdr(char *buf)
{
  google::protobuf::uint32 size;
  google::protobuf::io::ArrayInputStream ais(buf,4);
  CodedInputStream coded_input(&ais);
  coded_input.ReadVarint32(&size);//Decode the HDR and get the size
  cout<<"size of payload is "<<size<<endl;
  return size;
}

void readBody(int csock,google::protobuf::uint32 siz)
{
  int bytecount;
  log_packet payload;
  char buffer [siz+4];//size of the payload and hdr
  //Read the entire buffer including the hdr
  if((bytecount = recv(csock, (void *)buffer, 4+siz, MSG_WAITALL))== -1){
                fprintf(stderr, "Error receiving data %d\n", errno);
        }
  cout<<"Second read byte count is "<<bytecount<<endl;
  //Assign ArrayInputStream with enough memory 
  google::protobuf::io::ArrayInputStream ais(buffer,siz+4);
  CodedInputStream coded_input(&ais);
  //Read an unsigned integer with Varint encoding, truncating to 32 bits.
  coded_input.ReadVarint32(&siz);
  //After the message length is read, PushLimit() is used to prevent the CodedInputStream 
  //from reading beyond that length.Limits are used when parsing length-delimited 
  //embedded messages
  google::protobuf::io::CodedInputStream::Limit msgLimit = coded_input.PushLimit(siz);
  //De-Serialize
  payload.ParseFromCodedStream(&coded_input);
  //Once the embedded message has been parsed, PopLimit() is called to undo the limit
  coded_input.PopLimit(msgLimit);
  //Print the message
  cout<<"Message is "<<payload.DebugString();

}

void* SocketHandler(void* lp){
    int *csock = (int*)lp;

        char buffer[4];
        int bytecount=0;
        string output,pl;
        log_packet logp;

        memset(buffer, '\0', 4);

        while (1) {
        //Peek into the socket and get the packet size
        if((bytecount = recv(*csock,
                         buffer,
                                 4, MSG_PEEK))== -1){
                fprintf(stderr, "Error receiving data %d\n", errno);
        }else if (bytecount == 0)
                break;
        cout<<"First read byte count is "<<bytecount<<endl;
        readBody(*csock,readHdr(buffer));
        }

FINISH:
        free(csock);
    return 0;
}

Ответ 2

К сожалению, protobuf не предоставляет способ для "упаковки" (разграничения) ваших сообщений:

Если вы хотите записать несколько сообщений в один файл или поток, зависит от вас, чтобы отслеживать, где заканчивается одно сообщение, и следующее начинается. Формат протокола буфера протокола не является самограничным, поэтому синтаксические анализаторы протокола не могут определить, где заканчивается сообщение своя. Самый простой способ решить эту проблему - записать размер каждое сообщение, прежде чем вы сами напишите сообщение.

(из документации )

Итак, они в основном рекомендуют то же решение, к которому вы пришли.