Подтвердить что ты не робот

Отправка большого количества данных между потоками Qt

У меня есть QThread, который регулярно генерирует довольно большой объем данных (пара мегабайт в секунду), и ему необходимо передать его потоку родительского (GUI).

Я боюсь, что я не настолько уверен во внутренней работе QThread, поэтому я хотел бы попросить наилучшую практику.

Очевидно, что самый прямой способ передачи данных - просто emit массив. Однако насколько это эффективно? Знает ли Qt, где он используется, и избегает его глубокого копирования при отправке и получении?

Если нет, я могу с радостью просто выделить память в основном потоке и дать указатель на дочерний поток, где он будет записывать данные (и только emit короткие сообщения о ходе). Это, кажется, не самое элегантное решение для меня, вот почему я спрашиваю.

Если Qt избегает копирования данных в нескольких буферах при испускании и приеме, гарантируется ли это во всех системах? У меня нет ресурсов, чтобы попробовать сравнить его в разных ОС.

4b9b3361

Ответ 1

QThread Внутренние работы не имеют значения: они не играют никакой роли в том, как работают циклы событий. Когда вы emit сигнал в QObject, который живет в потоке, отличном от объекта слота, сигнал будет помещен как QMetaCallEvent в очередь событий принимающего потока. Затем цикл событий, запущенный в принимающем потоке, будет действовать на это событие и выполнить вызов в слот, который был подключен к испускаемому сигналу.

Итак, независимо от того, что произойдет, любые данные, которые вы отправляете через сигнал, в конечном итоге окажутся полезной нагрузкой в ​​экземпляре класса, полученного из QEvent.

Мягкость проблемы заключается в том, когда QMetaCallEvent достигает цикла события, и контейнер передается в слот в качестве аргумента. Конечно, конструкторы копирования можно было назвать много раз на этом пути. Ниже приведен какой-то простой код, демонстрирующий, сколько раз конструктор копирования и конструктор по умолчанию на самом деле называются

  • для элементов элементов данных неявно разделяемого контейнера copy-on-write (QVector),

  • в пользовательском классе, который находится в контейнере.

Вы будете приятно удивлены:)

Поскольку контейнеры Qt неявно разделяются копированием на запись, их построение копий имеет незначительную стоимость: все, что сделано, является счетчиком ссылок, увеличивается при построении атомарно. Например, ни один из элементов данных не копируется.

Увы, pre-11 С++ показывает свою уродливую сторону: если код слота каким-либо образом изменяет контейнер, то нет возможности передавать ссылки на слот таким образом, чтобы компилятор знал, что исходный контейнер не является нужно больше. Таким образом: если слот получает ссылку const на контейнер, вы гарантируете, что копии не будут сделаны. Если слот получает записываемую копию контейнера и вы его модифицируете, будет сделана совершенно ненужная копия, так как экземпляр, живой на сайте вызова, больше не нужен. В С++ - 11 вы передадите ссылку rvalue в качестве параметра. Передача ссылки rvalue в вызове функции завершает время жизни переданного объекта в вызывающем.

Пример вывода кода:

"Started" copies: 0 assignments: 0 default instances: 0 
"Created Foo" copies: 0 assignments: 0 default instances: 100 
"Created Bar" copies: 0 assignments: 0 default instances: 100 
"Received signal w/const container" copies: 0 assignments: 0 default instances: 100 
"Received signal w/copy of the container" copies: 0 assignments: 0 default instances: 100 
"Made a copy" copies: 100 assignments: 1 default instances: 101 
"Reset" copies: 0 assignments: 0 default instances: 0 
"Received signal w/const class" copies: 2 assignments: 0 default instances: 1 
"Received signal w/copy of the class" copies: 3 assignments: 0 default instances: 1 
//main.cpp
#include <QtCore>

class Class {
    static QAtomicInt m_copies;
    static QAtomicInt m_assignments;
    static QAtomicInt m_instances;
public:
    Class() { m_instances.fetchAndAddOrdered(1); }
    Class(const Class &) { m_copies.fetchAndAddOrdered(1); }
    Class & operator=(const Class &) { m_assignments.fetchAndAddOrdered(1); return *this; }
    static void dump(const QString & s = QString()) {
        qDebug() << s << "copies:" << m_copies << "assignments:" << m_assignments << "default instances:" << m_instances;
    }
    static void reset() {
        m_copies = 0;
        m_assignments = 0;
        m_instances = 0;
    }
};

QAtomicInt Class::m_instances;
QAtomicInt Class::m_copies;
QAtomicInt Class::m_assignments;

typedef QVector<Class> Vector;

Q_DECLARE_METATYPE(Vector)

class Foo : public QObject
{
    Q_OBJECT
    Vector v;
public:
    Foo() : v(100) {}
signals:
    void containerSignal(const Vector &);
    void classSignal(const Class &);
public slots:
    void sendContainer() { emit containerSignal(v); }
    void sendClass() { emit classSignal(Class()); }
};

class Bar : public QObject
{
    Q_OBJECT
public:
    Bar() {}
signals:
    void containerDone();
    void classDone();
public slots:
    void containerSlotConst(const Vector &) {
        Class::dump("Received signal w/const container");
    }
    void containerSlot(Vector v) {
        Class::dump("Received signal w/copy of the container");
        v[99] = Class();
        Class::dump("Made a copy");
        Class::reset();
        Class::dump("Reset");
        emit containerDone();
    }
    void classSlotConst(const Class &) {
        Class::dump("Received signal w/const class");
    }
    void classSlot(Class) {
        Class::dump("Received signal w/copy of the class");
        emit classDone();
        //QThread::currentThread()->quit();
    }
};

int main(int argc, char ** argv)
{
    QCoreApplication a(argc, argv);
    qRegisterMetaType<Vector>("Vector");
    qRegisterMetaType<Class>("Class");

    Class::dump("Started");
    QThread thread;
    Foo foo;
    Bar bar;
    Class::dump("Created Foo");
    bar.moveToThread(&thread);
    Class::dump("Created Bar");
    QObject::connect(&thread, SIGNAL(started()), &foo, SLOT(sendContainer()));
    QObject::connect(&foo, SIGNAL(containerSignal(Vector)), &bar, SLOT(containerSlotConst(Vector)));
    QObject::connect(&foo, SIGNAL(containerSignal(Vector)), &bar, SLOT(containerSlot(Vector)));
    QObject::connect(&bar, SIGNAL(containerDone()), &foo, SLOT(sendClass()));
    QObject::connect(&foo, SIGNAL(classSignal(Class)), &bar, SLOT(classSlotConst(Class)));
    QObject::connect(&foo, SIGNAL(classSignal(Class)), &bar, SLOT(classSlot(Class)));
    QObject::connect(&bar, SIGNAL(classDone()), &thread, SLOT(quit()));
    QObject::connect(&thread, SIGNAL(finished()), &a, SLOT(quit()));
    thread.start();
    a.exec();
    thread.wait();
}

#include "main.moc"

Ответ 2

При передаче больших буферов он "традиционный" к новым() буферным объектам в потоке производителя и, при загрузке, очереди/испускает/независимо от * буфера в потребительский поток и сразу новый() другой, (в тот же * buffer var), для следующей загрузки данных.

Проблема: если ваш поток GUI не может быть в курсе, вы получите память, если вы не возьмете некоторую меру контроля потока (например, предварительно распределите пул * буферов и "распространите" их).

То, что я обычно делаю, это предварительно выделить некоторые экземпляры буфера в цикле (до тысяч на большом сервере) и направить их экземпляры в очередь пула производителей-потребителей. Если дочерний поток хочет загрузить данные из некоторого сетевого соединения в буфер, он должен вытащить один из пула и загрузить его. Затем он может помещать/выделять/независимо от буфера в потребительский поток и выталкивать другой буфер для получения любых дополнительных данных. Потребительский поток получает буфер, обрабатывает данные и перетаскивает "использованный" буфер обратно в очередь пула для повторное использование. Это обеспечивает управление потоком: если дочерний поток загружает буферы быстрее, чем потребительский поток может их обработать, он найдет пул пустым и блокирует его до тех пор, пока потребительский поток не вернет некоторые используемые буферы, поэтому использование кэппинга/использования памяти (а также избегая постоянного нового/распоряжения или GC на тех языках, которые его поддерживают).

Мне нравится сбрасывать счет очереди пула на строку состояния GUI на таймер 1 с - это позволяет мне наблюдать за использованием буфера (и быстро обнаруживать, если какая-либо утечка:).