Подтвердить что ты не робот

Почему отдельные сообщения UDP всегда опускаются ниже определенного размера буфера?

3 разных сообщения отправляются на один и тот же порт с разной скоростью:

Message   size (bytes)   Sent every transmit speed
High            232                  10 ms           100Hz                  
Medium      148                  20ms            50Hz                    
Low             20                    60 ms           16.6Hz                 

Я могу обрабатывать только одно сообщение каждые ~ 6 мс.
Однопоточный. Блокировка чтения.


Возникает странная ситуация, и у меня нет объяснений. Когда я устанавливаю свой буфер приема на 4,799 байты, все мои сообщения с низкой скоростью будут удалены.
Я вижу, может быть, один или два будут обработаны, а потом ничего.

Когда я устанавливаю свой буфер приема на 4,800 (или выше!), кажется, что все сообщения с низкой скоростью начинают обрабатываться. Я вижу около 16/17 в секунду.


Это наблюдалось последовательно. Приложение, отправляющее пакеты, всегда запускается перед приложением-получателем. Приложение-получатель всегда имеет длительную задержку после создания сокетов и перед его началом. Таким образом, буфер всегда заполняется, когда начинается обработка, и это не тот же самый начальный буфер каждый раз, когда происходит тест. Это связано с тем, что сокет создается после того, как отправитель уже отправляет сообщения, поэтому приемник может начать прослушивание в середине цикла отправки.

Почему увеличение размера принятого буфера одного байта приводит к огромному изменению обработки сообщений с низкой скоростью?

Я построил таблицу, чтобы лучше визуализировать ожидаемую обработку:
введите описание изображения здесь

Как только некоторые из этих сообщений обрабатываются, больше сообщений, скорее всего, попадают в очередь, а не отбрасываются.

Тем не менее, я ожидал бы, что буфер 4,799 byte будет вести себя так же, как 4,800 байты.

Однако это не то, что я наблюдал.


Я думаю, что проблема связана с тем, что сообщения с низкой скоростью отправляются одновременно с двумя другими сообщениями. Он всегда принимается после сообщений с высокой/средней скоростью. (Это было подтверждено на wirehark).

Например, если предположить, что буфер был пуст для начала, ясно, что сообщение с низкой скоростью будет стоять в очереди дольше, чем другие сообщения.
* 1 сообщение каждые 6 мс - около 5 сообщений каждые 30 мс. введите описание изображения здесь

Это еще не объясняет размер буфера.

Мы запускаем VxWorks и используем их sockLib, который является реализацией сокетов Berkeley. Вот фрагмент того, как выглядит наше сокетное создание:
SOCKET_BUFFER_SIZE - это то, что я меняю.

struct sockaddr_in tSocketAddress;                          // Socket address
int     nSocketAddressSize = sizeof(struct sockaddr_in);    // Size of socket address structure
int     nSocketOption = 0;

// Already created
if (*ptParameters->m_pnIDReference != 0)
    return FALSE;

// Create UDP socket
if ((*ptParameters->m_pnIDReference = socket(AF_INET, SOCK_DGRAM, 0)) == ERROR)
{
    // Error
    CreateSocketMessage(ptParameters, "CreateSocket: Socket create failed with error.");

    // Not successful
    return FALSE;
}

// Valid local address
if (ptParameters->m_szLocalIPAddress != SOCKET_ADDRESS_NONE_STRING && ptParameters->m_usLocalPort != 0)
{
    // Set up the local parameters/port
    bzero((char*)&tSocketAddress, nSocketAddressSize);
    tSocketAddress.sin_len = (u_char)nSocketAddressSize;
    tSocketAddress.sin_family = AF_INET;
    tSocketAddress.sin_port = htons(ptParameters->m_usLocalPort);

    // Check for any address
    if (strcmp(ptParameters->m_szLocalIPAddress, SOCKET_ADDRESS_ANY_STRING) == 0)
        tSocketAddress.sin_addr.s_addr = htonl(INADDR_ANY);
    else
    {
        // Convert IP address for binding
        if ((tSocketAddress.sin_addr.s_addr = inet_addr(ptParameters->m_szLocalIPAddress)) == ERROR)
        {
            // Error
            CreateSocketMessage(ptParameters, "Unknown IP address.");

            // Cleanup socket
            close(*ptParameters->m_pnIDReference);
            *ptParameters->m_pnIDReference = ERROR;

            // Not successful
            return FALSE;
        }
    }

    // Bind the socket to the local address
    if (bind(*ptParameters->m_pnIDReference, (struct sockaddr *)&tSocketAddress, nSocketAddressSize) == ERROR)
    {
        // Error
        CreateSocketMessage(ptParameters, "Socket bind failed.");

        // Cleanup socket
        close(*ptParameters->m_pnIDReference);
        *ptParameters->m_pnIDReference = ERROR;

        // Not successful
        return FALSE;
    }
}

// Receive socket
if (ptParameters->m_eType == SOCKTYPE_RECEIVE || ptParameters->m_eType == SOCKTYPE_RECEIVE_AND_TRANSMIT)
{
    // Set the receive buffer size
    nSocketOption = SOCKET_BUFFER_SIZE;
    if (setsockopt(*ptParameters->m_pnIDReference, SOL_SOCKET, SO_RCVBUF, (char *)&nSocketOption, sizeof(nSocketOption)) == ERROR)
    {
        // Error
        CreateSocketMessage(ptParameters, "Socket buffer size set failed.");

        // Cleanup socket
        close(*ptParameters->m_pnIDReference);
        *ptParameters->m_pnIDReference = ERROR;

        // Not successful
        return FALSE;
    }
}

и сокет принимает вызов, вызываемый в бесконечном цикле:
* Размер буфера определенно достаточно большой

int SocketReceive(int nSocketIndex, char *pBuffer, int nBufferLength)
{
    int nBytesReceived = 0;
    char szError[256];

    // Invalid index or socket
    if (nSocketIndex < 0 || nSocketIndex >= SOCKET_COUNT || g_pnSocketIDs[nSocketIndex] == 0)
    {
        sprintf(szError,"SocketReceive: Invalid socket (%d) or ID (%d)", nSocketIndex, g_pnSocketIDs[nSocketIndex]);
        perror(szError);
        return -1;
    }

    // Invalid buffer length
    if (nBufferLength == 0)
    {
        perror("SocketReceive: zero buffer length");
        return 0;
    }

    // Send data
    nBytesReceived = recv(g_pnSocketIDs[nSocketIndex], pBuffer, nBufferLength, 0);

    // Error in receiving
    if (nBytesReceived == ERROR)
    {
        // Create error string
        sprintf(szError, "SocketReceive: Data Receive Failure: <%d> ", errno);

        // Set error message
        perror(szError);

        // Return error
        return ERROR;
    }

    // Bytes received
    return nBytesReceived;
}

Любые подсказки о том, почему увеличение размера буфера до 4800 приводит к успешному и последовательному чтению сообщений с низкой скоростью?

4b9b3361

Ответ 1

Основной ответ на вопрос о том, почему размер SO_RCVBUF 4799 приводит к потерям сообщений с низкой скоростью и размеру 4800, отлично работает, когда смесь входящих пакетов UDP, скорость, с которой они идут в, скорость обработки входящих пакетов и размер mbuff и номера кластеров в вашем ядре vxWorks позволяют обеспечить достаточную пропускную способность сетевого стека, чтобы сообщения с низкой скоростью не отбрасывались с большим размером.

Описание опции SO_SNDBUF на странице руководства setsockopt() по URL http://www.vxdev.com/docs/vx55man/vxworks/ref/sockLib.html#setsockopt, упомянутой в комментарии выше, имеет это, чтобы сказать о указанный размер и влияние на mbuff использование:

Эффект установки максимального размера буферов (для SO_SNDBUF и SO_RCVBUF, описанные ниже) на самом деле не выделять mbufs из пула mbuf. Вместо этого эффект заключается в том, чтобы установить знак высокой воды в структуре данных протокола, которая используется позже для ограничения количество распределения mbuf.

UDP-пакеты являются дискретными единицами. Если вы отправляете 10 пакетов размером 232, которые не считаются 2320 байтами данных в непрерывной памяти. Вместо этого это 10 буферов памяти в сетевом стеке, потому что UDP является дискретными пакетами, а TCP - непрерывным потоком байтов.

См. Как настроить буферизацию сети в VxWorks 5.4? на веб-сайте сообщества DDS, где дается обсуждение взаимозависимости смеси размеров mbuff и сетевых кластеров.

См. Как решить проблему с буферами VxWorks? на веб-сайте сообщества DDS.

См. этот PDF-документ слайд-презентации, Новый инструмент для изучения истощения сетевых стеков в VxWorks с 2004 года, в котором обсуждается использование различных инструментов, таких как mBufShow и inetStatShow, чтобы увидеть, что происходит в сетевом стеке.

Ответ 2

Без детального анализа реализации каждого сетевого стека по пути, который отправляются ваши сообщения UDP, почти невозможно указать результирующее поведение.

Реализации UDP разрешено отбрасывать любой пакет по своему усмотрению. Обычно это происходит, когда стек приходит к выводу, что ему нужно будет отбросить пакеты, чтобы получать новые. Не существует формального требования о том, чтобы упавшие пакеты были самыми старыми или самыми новыми. Также может быть, что определенный класс размеров больше зависит от стратегий управления внутренней памятью.

Из стеков IP наиболее интересным является тот, который находится на принимающей машине.

Конечно, вы получите лучший опыт приема, если вы измените сторону приема, чтобы иметь буфер приема, который займет несколько секунд, ожидаемых. Я бы начал с по крайней мере 10k.

Наблюдаемое "изменение" в поведении при переходе от 4 799 до 4800 может быть результатом того, что более поздняя версия просто позволяет получать одно из небольших сообщений до того, как его нужно снова отбросить, в то время как меньший размер просто заставляет его слегка отбросить ранее. Если приложение-получатель достаточно быстро для чтения ожидающего сообщения, вы получите небольшие сообщения в одном случае и небольшие сообщения в другом случае.