Подтвердить что ты не робот

Как сжимать вызовы слотов при использовании очереди в Qt?

После прочтения некоторых статей как это о связях Qt Signal-Slot у меня все еще возникает вопрос о соединении в очереди.

Если у меня есть потоки, которые все время передают сигналы друг другу и говорят, что один thread_slow работает медленный метод в этом цикле событий, а другой thread_fast работает с быстрым, который посылает несколько сигналов, а другой поток все еще работает медленный метод..... когда медленный метод из thread_slow возвращается в цикл событий, будет ли он обрабатывать все сигналы, которые были отправлены ранее, на thread_fast или только последний (все сигналы являются такой же тип)?

Если он обработает все сигналы, есть ли способ сделать thread_slow обработать только последний? (Рассмотрение "последнего" в многопоточном приложении может быть неопределенным, пусть рассмотрим последний сигнал перед тем, как поток попросит последний сигнал, для простоты, поэтому новые отправляются, когда поток ищет последний, может быть потерян).

(Я спрашиваю об этом, потому что у меня есть несколько потоков, получающих данные из нескольких потоков, и я не хочу, чтобы они обрабатывали старые данные, только последний, который был отправлен)

Я провел несколько тестов, и, похоже, Qt обработает все сигналы. Я сделал один поток:

while(true)
{
    QThread::msleep(500);
    emit testQueue(test);
    test++;
}

и слот в другом будет:

void test::testQueue(int test)
{
    test.store(private_test.load() + test);
    emit testText(QString("Test Queue: ") + QString::number(private_test.load()));
}

и поток будет запущен:

while(true)
{
    QThread::msleep(3000);
    QCoreApplication::processEvents();
    private_test.store(private_test.load() + 1000);
}

Я посылаю сигнал от одного потока к другому каждые 500 миллисекунд, а другой поток спящий за 3000 миллисекунд (3 секунды), а затем просыпается и увеличивает внутреннюю переменную на 100. каждый раз, когда слот выполняется, он испускает текст с полученным значением + внутренняя переменная. Результатом, который я получаю, является то, что каждый раз, когда вызывается QCoreApplication::processEvents();, все сигналы выполняются.... (я отредактировал эту часть, потому что обнаружил ошибку в моем предыдущем коде)

4b9b3361

Ответ 1

Я пытаюсь сформировать свой комментарий в ответ. Я согласен с вами в том, что в документации отсутствует эта информация, или, по крайней мере, это не ясно для меня, и, по-видимому, для вас тоже.

Было бы два варианта получения дополнительной информации:

1) Испытание

Поместите инструкцию qDebug() или printf()/fprintf() в свой слот в "медленном" потоке и посмотрите, что он распечатает. Запустите это несколько раз и сделайте вывод.

2) Убедитесь, что

Вам нужно будет прочитать исходный код для этого, как компилятор метаобъекта, иначе. moc получает это из исходного файла. Это немного более активное исследование, но это может привести к определенности.

Насколько я знаю, каждое излучение сигнала публикует соответствующее событие. Затем событие будет помещено в очередь для отдельного потока в классе потоков. Здесь вы можете найти соответствующие два файла исходного кода:

void QCoreApplication:: postEvent (приемник QObject *, событие QEvent *, приоритет int)

и

класс QPostEventList: public QVector

Существует два подхода к их компромиссам:

Очередь в занятом слоте из слота мутатора данных

Основное преимущество заключается в том, что во время операции "занято" сигналы не могут быть потеряны. Однако это может быть по своей сути медленнее, поскольку оно может потенциально обрабатывать гораздо больше операций, чем необходимо.

Идея состоит в том, что данные переустанавливаются для каждого обработанного события, но реальная операция занятости ставится в очередь для выполнения только один раз. Это не обязательно должно быть для первого события, если их больше, но это самая простая реализация.

Foo::Foo(QObject *parent) : QObject(parent)
{
    ...
    connect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), SLOT(dataUpdateSlot(const QByteArray&)));
    connect(this, SIGNAL(queueBusyOperationSignal()), SLOT(busyOperation()));
    ...
}

void Foo::dataUpdateSlot(const QByteArray &data)
{
    m_data = data;

    if (busyOperationQueued);
        emit queueBusyOperationSignal();
        m_busyOperationQueued = true;
    }
}

void MyClass::busyOperationSlot()
{

    // Do the busy work with m_data here

    m_busyOperationQueued = false;    
}

Connect/Disconnect

Идея состоит в том, чтобы отключить слот из соответствующего сигнала при запуске обработки. Это гарантирует, что новое излучение сигнала не будет захвачено, и снова подключите слот к сигналу, когда поток будет свободным для обработки следующих событий.

У этого было бы некоторое время простоя в потоке, хотя между соединением и очередным даже обработанным, но, по крайней мере, это был бы простой способ имплантирования. На самом деле это может быть даже незначительным разницу в производительности в зависимости от большего количества контекста, который здесь не предусмотрен.

Основной недостаток заключается в том, что это приведет к потере сигналов во время операции занятости.

Foo::Foo(QObject *parent) : QObject(parent)
{
    ...
    connect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), SLOT(busyOperationSlot(const QByteArray&)));
    ...
}

void MyClass::busyOperationSlot(const QByteArray &data)
{
    disconnect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), this, SLOT(dataUpdateSlot(const QByteArray&)));

    // Do the busy work with data here

    connect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), SLOT(dataUpdateSlot(const QByteArray&)));
}

Будущие мысли

Я думал, есть ли удобный API - например. метод processEvents(), но с аргументом для обработки только последнего отправленного события - для фактического указания системе событий явно обработать последнее, а не обойти проблему. Это похоже на такой API, однако он является частным.

Возможно, кто-то представит запрос функции, чтобы публиковать что-то подобное.

/*!
\internal
Returns \c true if \a event was compressed away (possibly deleted) and should not be added to the list.
*/
bool QCoreApplication::compressEvent(QEvent *event, QObject *receiver, QPostEventList *postedEvents)

Соответствующий исходный код можно найти здесь.

Он также имеет переопределенную версию в QGuiApplication и QApplication.

Что касается полноты, существует также такой способ:

void QCoreApplication:: removePostedEvents (приемник QObject *, int eventType = 0) [статический]

Удаляет все события данного типа событий, которые были отправлены с помощью postEvent() для получателя.

События не отправляются, вместо этого они удаляются из очереди. Вам не нужно будет вызывать эту функцию. Если вы вызываете это, имейте в виду, что события убийства могут привести к тому, что приемник сломает один или несколько инвариантов.

Если получатель имеет значение null, события eventType удаляются для всех объектов. Если eventType равен 0, все события удаляются для ресивера. Вы никогда не должны вызывать эту функцию с eventType равным 0. Если вы вызываете ее таким образом, имейте в виду, что события убийства могут привести к тому, что приемник разорвет один или несколько инвариантов.

Но это не совсем то, что вы хотели бы иметь здесь в соответствии с документацией.

Ответ 2

QCoreApplication Сжатие QMetaCallEvent

Каждый вызов в очереди в очереди заканчивается при отправке QMetaCallEvent целевому объекту. Событие содержит объект отправителя, идентификатор сигнала, индекс слота и параметры пакетного вызова. На Qt 5 идентификатор сигнала обычно не равен значению, возвращаемому QMetaObject::signalIndex(): это индекс, рассчитанный так, как если бы объект имел только сигнальные методы и никаких других методов.

Цель состоит в том, чтобы сжать такие вызовы, чтобы в очереди событий существовал только один уникальный вызов для данного кортежа (объект-отправитель, сигнал отправителя, объект-приемник, слот-приемник).

Это единственный разумный способ сделать это, без необходимости внесения изменений в исходные или целевые объекты и при сохранении минимальных издержек. Методы рекурсивного цикла событий в моих других ответах имеют серьезную нагрузку на стек для каждого события, порядка 1 кбайт, когда Qt построен для архитектуры с 64-разрядными указателями.

Доступ к очереди событий возможен, когда новые события отправляются на объект, у которого есть одно или несколько событий, уже отправленных на него. В этом случае QCoreApplication::postEvent вызывает QCoreApplication::compressEvent. compressEvent не вызывается, когда первое событие отправляется объекту. При повторной реализации этого метода содержимое QMetaCallEvent, отправленное на целевой объект, может быть проверено для вызова вашего слота, а устаревший дубликат должен быть удален. Частные заголовки Qt должны быть включены для получения определений QMetaCallEvent, QPostEvent и QPostEventList.

Плюсы: ни отправитель, ни получатель не должны ничего знать. Сигналы и слоты работают как есть, включая вызовы-указатели метода в Qt 5. Qt сам использует этот способ сжатия событий.

Минусы: требуется включение закрытых заголовков Qt и принудительная очистка флага QEvent::posted.

Вместо взлома флага QEvent::posted удаляемые события могут быть поставлены в очередь в отдельный список и удалены вне вызова compressEvent при запуске таймера с нулевой продолжительностью. У этого есть накладные расходы на дополнительный список событий, и каждый удаление события повторяется через опубликованный список событий.

Другие подходы

Точка, делающая это другим способом, заключается не в использовании внутренних элементов Qt.

L1 Первое ограничение не имеет доступа к содержимому конфиденциально определенного QMetaCallEvent. Его можно решить следующим образом:

  • Объект-посредник с сигналами и слотами с теми же сигнатурами, что и объекты цели, может быть связан между исходным и целевым объектами.

  • Запуск QMetaCallEvent в прокси-объекте позволяет извлечь тип вызова, идентификатор вызываемого слота и аргументы.

  • Вместо соединений с сигнальным слотом события могут быть явно отправлены на целевой объект. Целевой объект или фильтр событий должен явно повторно синтезировать вызов слота из данных события.

  • Вместо QObject::connect может использоваться пользовательская реализация compressedConnect. Это полностью раскрывает детали сигнала и слота. Прокси-объект может использоваться для выполнения эквивалентного для сжатия эквивалента queued_activate на стороне объекта-отправителя.

L2 Второе ограничение не в состоянии полностью переопределить QCoreApplication::compressEvent, так как список событий определен конфиденциально. У нас все еще есть доступ к сжатому событию, и мы все еще можем решить, удалять его или нет, но нет способа перечислить список событий. Таким образом:

  • Доступ к очереди событий может осуществляться неявно путем рекурсивного вызова sendPostedEvents из notify (таким образом, также из eventFilter(), event() или из слотов). Это не вызывает тупик, поскольку QCoreApplication::sendPostedEvents не может (и не имеет) удерживать мьютез цикла событий, пока событие доставляется через sendEvent. События можно фильтровать следующим образом:

    • глобально в переопределенном QCoreApplication::notify,
    • глобально, зарегистрировав QInternal::EventNotifyCallback,
    • локально, присоединяя фильтр событий к объектам,
    • явно локально путем переопределения QObject::event() в целевом классе.

    Повторяющиеся события по-прежнему отправляются в очередь событий. Рекурсивные вызовы notify из sendPostedEvents потребляют довольно много пространства стека (бюджет 1kb на архитектурах с 64-разрядными указателями).

  • Существующие события можно удалить, вызвав QCoreApplication::removePostedEvents, прежде чем отправлять новое событие объекту. К сожалению, выполнение этого в QCoreApplication::compressEvent вызывает тупик, поскольку мьютез очереди событий уже удерживается.

    Пользовательский класс событий, содержащий указатель на объект-получатель, может автоматически вызывать removePostedEvents в конструкторе.

  • Существующие сжатые события, такие как QEvent::Exit, могут быть повторно присвоены.

    Набор этих событий является детальностью реализации и может меняться. Qt не различает эти события, кроме указателя приемника QObject. Для реализации требуется накладные расходы на QObject прокси-сервера для каждого типа (типа события, получателя).

Реализация

Код ниже работает как на Qt 4, так и на Qt 5. В последнем случае обязательно добавьте QT += core-private в файл проекта qmake, чтобы включить в него частные заголовки Qt.

Реализации, не использующие внутренние заголовки Qt, приведены в других ответах:

Существует два пути для удаления событий, выбранных if (true). Введенный путь кода сохраняет последнее событие и, как правило, имеет смысл. Кроме того, вы можете захотеть сохранить самое старое событие - то, что делает код с отключенным кодом.

screenshot

#include <QApplication>
#include <QMap>
#include <QSet>
#include <QMetaMethod>
#include <QMetaObject>
#include <private/qcoreapplication_p.h>
#include <private/qthread_p.h>
#include <private/qobject_p.h>

#include <QWidget>
#include <QPushButton>
#include <QPlainTextEdit>
#include <QSpinBox>
#include <QFormLayout>

// Works on both Qt 4 and Qt 5.

//
// Common Code

/*! Keeps a list of singal indices for one or more meatobject classes.
 * The indices are signal indices as given by QMetaCallEvent.signalId.
 * On Qt 5, those do *not* match QMetaObject::methodIndex since they
 * exclude non-signal methods. */
class SignalList {
    Q_DISABLE_COPY(SignalList)
    typedef QMap<const QMetaObject *, QSet<int> > T;
    T m_data;
    /*! Returns a signal index that is can be compared to QMetaCallEvent.signalId. */
    static int signalIndex(const QMetaMethod & method) {
        Q_ASSERT(method.methodType() == QMetaMethod::Signal);
#if QT_VERSION >= QT_VERSION_CHECK(5,0,0)
        int index = -1;
        const QMetaObject * mobj = method.enclosingMetaObject();
        for (int i = 0; i <= method.methodIndex(); ++i) {
            if (mobj->method(i).methodType() != QMetaMethod::Signal) continue;
            ++ index;
        }
        return index;
#else
        return method.methodIndex();
#endif
    }
public:
    SignalList() {}
    void add(const QMetaMethod & method) {
        m_data[method.enclosingMetaObject()].insert(signalIndex(method));
    }
    void remove(const QMetaMethod & method) {
        T::iterator it = m_data.find(method.enclosingMetaObject());
        if (it != m_data.end()) {
            it->remove(signalIndex(method));
            if (it->empty()) m_data.erase(it);
        }
    }
    bool contains(const QMetaObject * metaObject, int signalId) {
        T::const_iterator it = m_data.find(metaObject);
        return it != m_data.end() && it.value().contains(signalId);
    }
};

//
// Implementation Using Event Compression With Access to Private Qt Headers

struct EventHelper : private QEvent {
    static void clearPostedFlag(QEvent * ev) {
        (&static_cast<EventHelper*>(ev)->t)[1] &= ~0x8001; // Hack to clear QEvent::posted
    }
};

template <class Base> class CompressorApplication : public Base {
    SignalList m_compressedSignals;
public:
    CompressorApplication(int & argc, char ** argv) : Base(argc, argv) {}
    void addCompressedSignal(const QMetaMethod & method) { m_compressedSignals.add(method); }
    void removeCompressedSignal(const QMetaMethod & method) { m_compressedSignals.remove(method); }
protected:
    bool compressEvent(QEvent *event, QObject *receiver, QPostEventList *postedEvents) {
        if (event->type() != QEvent::MetaCall)
            return Base::compressEvent(event, receiver, postedEvents);

        QMetaCallEvent *mce = static_cast<QMetaCallEvent*>(event);
        if (! m_compressedSignals.contains(mce->sender()->metaObject(), mce->signalId())) return false;
        for (QPostEventList::iterator it = postedEvents->begin(); it != postedEvents->end(); ++it) {
            QPostEvent &cur = *it;
            if (cur.receiver != receiver || cur.event == 0 || cur.event->type() != event->type())
                continue;
            QMetaCallEvent *cur_mce = static_cast<QMetaCallEvent*>(cur.event);
            if (cur_mce->sender() != mce->sender() || cur_mce->signalId() != mce->signalId() ||
                    cur_mce->id() != mce->id())
                continue;
            if (true) {
              /* Keep The Newest Call */              
              // We can't merely qSwap the existing posted event with the new one, since QEvent
              // keeps track of whether it has been posted. Deletion of a formerly posted event
              // takes the posted event list mutex and does a useless search of the posted event
              // list upon deletion. We thus clear the QEvent::posted flag before deletion.
              EventHelper::clearPostedFlag(cur.event);
              delete cur.event;
              cur.event = event;
            } else {
              /* Keep the Oldest Call */
              delete event;
            }
            return true;
        }
        return false;
    }
};

//
// Demo GUI

class Signaller : public QObject {
    Q_OBJECT
public:
    Q_SIGNAL void emptySignal();
    Q_SIGNAL void dataSignal(int);
};

class Widget : public QWidget {
    Q_OBJECT
    QPlainTextEdit * m_edit;
    QSpinBox * m_count;
    Signaller m_signaller;
    Q_SLOT void emptySlot() {
        m_edit->appendPlainText("emptySlot invoked");
    }
    Q_SLOT void dataSlot(int n) {
        m_edit->appendPlainText(QString("dataSlot(%1) invoked").arg(n));
    }
    Q_SLOT void sendSignals() {
        m_edit->appendPlainText(QString("\nEmitting %1 signals").arg(m_count->value()));
        for (int i = 0; i < m_count->value(); ++ i) {
            emit m_signaller.emptySignal();
            emit m_signaller.dataSignal(i + 1);
        }
    }
public:
    Widget(QWidget * parent = 0) : QWidget(parent),
        m_edit(new QPlainTextEdit), m_count(new QSpinBox)
    {
        QFormLayout * l = new QFormLayout(this);
        QPushButton * invoke = new QPushButton("Invoke");
        m_edit->setReadOnly(true);
        m_count->setRange(1, 1000);
        l->addRow("Number of slot invocations", m_count);
        l->addRow(invoke);
        l->addRow(m_edit);
#if QT_VERSION >= QT_VERSION_CHECK(5,0,0)
        connect(invoke, &QPushButton::clicked, this, &Widget::sendSignals);
        connect(&m_signaller, &Signaller::emptySignal, this, &Widget::emptySlot, Qt::QueuedConnection);
        connect(&m_signaller, &Signaller::dataSignal, this, &Widget::dataSlot, Qt::QueuedConnection);
#else
        connect(invoke, SIGNAL(clicked()), SLOT(sendSignals()));
        connect(&m_signaller, SIGNAL(emptySignal()), SLOT(emptySlot()), Qt::QueuedConnection);
        connect(&m_signaller, SIGNAL(dataSignal(int)), SLOT(dataSlot(int)), Qt::QueuedConnection);
#endif
    }
};

int main(int argc, char *argv[])
{
    CompressorApplication<QApplication> a(argc, argv);
#if QT_VERSION >= QT_VERSION_CHECK(5,0,0)
    a.addCompressedSignal(QMetaMethod::fromSignal(&Signaller::emptySignal));
    a.addCompressedSignal(QMetaMethod::fromSignal(&Signaller::dataSignal));
#else
    a.addCompressedSignal(Signaller::staticMetaObject.method(Signaller::staticMetaObject.indexOfSignal("emptySignal()")));
    a.addCompressedSignal(Signaller::staticMetaObject.method(Signaller::staticMetaObject.indexOfSignal("dataSignal(int)")));
#endif
    Widget w;
    w.show();
    return a.exec();
}

#include "main.moc"

Ответ 3

Это другой подход. Он не требует изменений для отправителя или объектов-получателей, но требует пользовательский объект CompressorProxy. Это портативно для Qt 4 и Qt 5 и не требует доступа к внутренним компонентам Qt.

Объект компрессора должен быть дочерним объектом целевого объекта - с одним слотом. Таким образом, он отслеживает поток целевого объекта. Поскольку сигналы компрессора прикреплены к целевым слотам, когда они находятся в одном потоке, нет накладных расходов на очереди для вызовов целевого слота.

Магия происходит в методе emitCheck: она вызывает себя рекурсивно.

  • Вызов слота заканчивается на emitCheck.
  • Другие отправленные события отправляются по вызову sendPostedEvents.
  • Если в очереди событий есть дублированные вызовы слотов, они снова будут в emitCheck.
  • После того, как последнее событие в очереди подобрано, а sendPostedEvents больше не перезаписывается, для данного слота флаг reset, так что его прокси-сигнал не будет излучаться более одного раза. Это желаемое поведение сжатия.

Для любого заданного набора вызовов в очереди в очереди на экземпляр CompressorProxy, emitCheck вернет true только ровно один раз для слота, который был вызван несколько раз в проходе через список размещенных событий.

Обратите внимание, что использование стека для рекурсивного вызова в режиме деблокирования составляет около 600 байт на 32-битных архитектурах и вдвое больше, чем в 64-битных архитектурах. В режиме отладки в OS X, используя 64-битную сборку, использование стека для каждой рекурсии составляет ~ 4kb.

screenshot

#include <QApplication>
#include <QWidget>
#include <QPushButton>
#include <QPlainTextEdit>
#include <QSpinBox>
#include <QFormLayout>

class CompressorProxy : public QObject {
    Q_OBJECT
    bool emitCheck(bool & flag) {
        flag = true;
        QCoreApplication::sendPostedEvents(this, QEvent::MetaCall); // recurse
        bool result = flag;
        flag = false;
        return result;
    }

    bool m_slot;
    Q_SLOT void slot() {
        if (emitCheck(m_slot)) emit signal();
    }
    Q_SIGNAL void signal();

    bool m_slot_int;
    Q_SLOT void slot_int(int arg1) {
        if (emitCheck(m_slot_int)) emit signal_int(arg1);
    }
    Q_SIGNAL void signal_int(int);
public:
    // No default constructor, since the proxy must be a child of the
    // target object.
    explicit CompressorProxy(QObject * parent) : QObject(parent) {}
};

//
// Demo GUI

class Signaller : public QObject {
    Q_OBJECT
public:
    Q_SIGNAL void emptySignal();
    Q_SIGNAL void dataSignal(int);
};

class Widget : public QWidget {
    Q_OBJECT
    QPlainTextEdit * m_edit;
    QSpinBox * m_count;
    Signaller m_signaller;
    Q_SLOT void emptySlot() {
        m_edit->appendPlainText("emptySlot invoked");
    }
    Q_SLOT void dataSlot(int n) {
        m_edit->appendPlainText(QString("dataSlot(%1) invoked").arg(n));
    }
    Q_SLOT void sendSignals() {
        m_edit->appendPlainText(QString("\nEmitting %1 signals").arg(m_count->value()));
        for (int i = 0; i < m_count->value(); ++ i) {
            emit m_signaller.emptySignal();
            emit m_signaller.dataSignal(i + 1);
        }
    }
public:
    Widget(QWidget * parent = 0) : QWidget(parent),
        m_edit(new QPlainTextEdit), m_count(new QSpinBox)
    {
        QFormLayout * l = new QFormLayout(this);
        QPushButton * invoke = new QPushButton("Invoke");
        m_edit->setReadOnly(true);
        m_count->setRange(1, 1000);
        l->addRow("Number of slot invocations", m_count);
        l->addRow(invoke);
        l->addRow(m_edit);
        connect(invoke, SIGNAL(clicked()), SLOT(sendSignals()));
        m_edit->appendPlainText(QString("Qt %1").arg(qVersion()));
        CompressorProxy * proxy = new CompressorProxy(this);
        connect(&m_signaller, SIGNAL(emptySignal()), proxy, SLOT(slot()), Qt::QueuedConnection);
        connect(&m_signaller, SIGNAL(dataSignal(int)), proxy, SLOT(slot_int(int)), Qt::QueuedConnection);
        connect(proxy, SIGNAL(signal()), this, SLOT(emptySlot()));
        connect(proxy, SIGNAL(signal_int(int)), this, SLOT(dataSlot(int)));
    }
};

int main(int argc, char *argv[])
{
    QApplication a(argc, argv);
    Widget w;
    w.show();
    return a.exec();
}

#include "main.moc"

Ответ 4

Это еще один подход, переносимый как Qt 4, так и Qt 5, и не требующий доступа к внутренним компонентам Qt (кроме того, что доступно через публичные заголовки). На Qt 5 поддерживаются только соединения Qt 4-стиля. Сжатые объекты (пары-получатель, слот). Это отличается от кортежа (отправителя, получателя, сигнала, слота), когда имеется полный доступ к QMetaCallEvent.

Он использует QObject::qt_metacall, чтобы отследить детали вызова из черного ящика QMetaCallEvent. Рекурсия в sendPostedEvents используется, как и в моем другом не-внутреннем ответе.

Стоит отметить, что API QObject::qt_metacall остался неизменным с по крайней мере Qt 4.0.

screenshot

#include <QApplication>
#include <QWidget>
#include <QPushButton>
#include <QPlainTextEdit>
#include <QSpinBox>
#include <QFormLayout>
#include <QSet>
#include <QMetaMethod>

// Common Code

/*! Keeps a list of method indices for one or more meatobject classes. */
class MethodList {
    Q_DISABLE_COPY(MethodList)
    typedef QMap<const QMetaObject *, QSet<int> > T;
    T m_data;
public:
    MethodList() {}
    template <class T> void add(const char * slot) {
        add(T::staticMetaObject.method(T::staticMetaObject.indexOfSlot(slot)));
    }
    void add(const QMetaMethod & method) {
        Q_ASSERT(method.methodIndex() >= 0);
        m_data[method.enclosingMetaObject()].insert(method.methodIndex());
    }
    void remove(const QMetaMethod & method) {
        T::iterator it = m_data.find(method.enclosingMetaObject());
        if (it != m_data.end()) {
            it->remove(method.methodIndex());
            if (it->empty()) m_data.erase(it);
        }
    }
    bool contains(const QMetaObject * metaObject, int methodId) {
        T::const_iterator it = m_data.find(metaObject);
        return it != m_data.end() && it.value().contains(methodId);
    }
};
Q_GLOBAL_STATIC(MethodList, compressedSlots)

// Compressor

class Compressor : public QObject {
    enum { Idle, Armed, Valid } m_state;
    QMetaObject::Call m_call;
    int m_methodIndex;
    QSet<int> m_armed; // armed method IDs

    int qt_metacall(QMetaObject::Call call, int id, void ** args) {
        if (m_state != Armed) return QObject::qt_metacall(call, id, args);
        m_state = Valid;
        m_call = call;
        m_methodIndex = id;
        return 0;
    }
    bool eventFilter(QObject * target, QEvent * ev) {
        Q_ASSERT(target == parent());
        if (ev->type() == QEvent::MetaCall) {
            m_state = Armed;
            if (QT_VERSION < QT_VERSION_CHECK(5,0,0) || ! *(void**)(ev+1)) {
                // On Qt5, we ensure null QMetaCallEvent::slotObj_ since we can't handle Qt5-style member pointer calls
                Compressor::event(ev); // Use QObject::event() and qt_metacall to extract metacall data
            }
            if (m_state == Armed) m_state = Idle;
            // Only intercept compressed slot calls
            if (m_state != Valid || m_call != QMetaObject::InvokeMetaMethod ||
                    ! compressedSlots()->contains(target->metaObject(), m_methodIndex)) return false;
            int methodIndex = m_methodIndex;
            m_armed.insert(methodIndex);
            QCoreApplication::sendPostedEvents(target, QEvent::MetaCall); // recurse
            if (! m_armed.contains(methodIndex)) return true; // Compress the call
            m_armed.remove(methodIndex);
        }
        return false;
    }
public:
    Compressor(QObject * parent) : QObject(parent), m_state(Idle) {
        parent->installEventFilter(this);
    }
};

//
// Demo GUI

class Signaller : public QObject {
    Q_OBJECT
public:
    Q_SIGNAL void emptySignal();
    Q_SIGNAL void dataSignal(int);
};

class Widget : public QWidget {
    Q_OBJECT
    QPlainTextEdit * m_edit;
    QSpinBox * m_count;
    Signaller m_signaller;
    Q_SLOT void emptySlot() {
        m_edit->appendPlainText("emptySlot invoked");
    }
    Q_SLOT void dataSlot(int n) {
        m_edit->appendPlainText(QString("dataSlot(%1) invoked").arg(n));
    }
    Q_SLOT void sendSignals() {
        m_edit->appendPlainText(QString("\nEmitting %1 signals").arg(m_count->value()));
        for (int i = 0; i < m_count->value(); ++ i) {
            emit m_signaller.emptySignal();
            emit m_signaller.dataSignal(i + 1);
        }
    }
public:
    Widget(QWidget * parent = 0) : QWidget(parent),
        m_edit(new QPlainTextEdit), m_count(new QSpinBox)
    {
        QFormLayout * l = new QFormLayout(this);
        QPushButton * invoke = new QPushButton("Invoke");
        m_edit->setReadOnly(true);
        m_count->setRange(1, 1000);
        l->addRow("Number of slot invocations", m_count);
        l->addRow(invoke);
        l->addRow(m_edit);
        connect(invoke, SIGNAL(clicked()), SLOT(sendSignals()));
        m_edit->appendPlainText(QString("Qt %1").arg(qVersion()));
        connect(&m_signaller, SIGNAL(emptySignal()), SLOT(emptySlot()), Qt::QueuedConnection);
        connect(&m_signaller, SIGNAL(dataSignal(int)), SLOT(dataSlot(int)), Qt::QueuedConnection);
    }
};

int main(int argc, char *argv[])
{
    QApplication a(argc, argv);
    compressedSlots()->add<Widget>("emptySlot()");
    compressedSlots()->add<Widget>("dataSlot(int)");
    Widget w;
    new Compressor(&w);
    w.show();
    return a.exec();
}

#include "main.moc"

Ответ 5

thread_slow 

будет обрабатывать все сигналы, отправленные в цикле событий, если вы использовали соединение с очередью или postEvent

Источник:

Очередное соединение Слот вызывается, когда управление возвращается к циклу событий потока приемника. Слот выполняется в потоке приемника.

QtDoc

Если вы хотите получить более подробную информацию о том, как обрабатывается событие, вы можете посмотреть здесь:

https://qt.gitorious.org/qt/qtbase/source/631c3dbc800bb9b2e3b227c0a09523f0f7eef0b7:src/corelib/thread/qthread_p.h#L127

Как вы можете видеть, события сортируются в порядке приоритета, поэтому, если все ваши события имеют одинаковый приоритет, он сначала вначале.

Это не тривиальная задача, здесь грубая попытка сказать мне, работает ли она.

Я предлагаю в основном хранить события самостоятельно и обрабатывать только последний.

thread_slow.h

int current_val;
bool m_isRunning;

thread_slow.cpp

void enqueue_slot( int val /*or whatever you value is*/ ) {
     // You'll enventually need a a QMutex here if your slot is not call in the thread
     m_current_val = val;
     if( !m_isRunning )
         slowRun();
}

void checkHasReceivedEventSlot() {
    if( m_current_val != -1 ) // Invalid value or a test condition
        slowRun();
}

void slowRun() {
    m_isRunning = true;
    int v = m_current_val;
    m_current_val = -1; // Invalid value

   // Do stuff with v

   // Let the queue fill itself with enqueue_slot calls
   QTimer::singleShot(kTIMEOUT, this, SLOT(checkHasReceivedEventSlot()));
}

При первом вызове enqueue_slot начинается медленный запуск

EDIT:

Чтобы убедиться, что это последнее событие, вы можете сделать следующее:

void checkHasReceivedEventSlot() {
    // Runs enqueue_slot until no more events are in the loop
    while( m_thread->eventDispatcher()->hasPendingEvents() )
         m_thread->eventDispatcher()->processEvents(QEventLoop::AllEvents);

    // m_current_val should hold the last event
    if( m_current_val != -1 ) // Invalid value or a test condition
        slowRun();
}

Ответ 6

Из вопроса: "Если он обработает все сигналы, есть ли способ сделать thread_slow обработать только последний?"

Если вы просто хотите всегда обработать последний сигнал и не возражаете, если обработать несколько дополнительных сигналов, если это не замедлит работу, тогда вы можете попробовать очень простой подход, например, используя обычный QThread::exec() цикл событий. Поместите эти методы слотов в подкласс QObject, который затем переместится в поток:

//slot
void MyClass::publicReceiverSlotForQueuedSignals(QString data)
{
    // Update data every time
    mReceivedData = data;

    // Allow worker method to be queued just once
    if (!mWorkerSlotInvoked) {
        mWorkerSlotInvoked = true;
        QMetaObject::invokeMethod(this, "workerSlot", Qt::QueuedConnection);
        qDebug() << "publicReceiverSlotForQueuedSignals: invoked workerSlot!"
                 << "New data:" << mReceivedData;
    } else {
        qDebug() << "publicReceiverSlotForQueuedSignals: workerSlot already invoked."
                 << "New data:" << mReceivedData;
    }
}

//slot
void MyClass::privateWorkerSlot()
{
    mWorkerSlotInvoked = false;
    qDebug() << "workerSlot for data:" << mReceivedData;
    QThread::msleep(3000);
    qDebug() << "workerSlot returning.";
}

publicReceiverSlotForQueuedSignals проходит очень быстро (qDebug in else, вероятно, самая трудоемкая часть для быстрых вызовов), поэтому на самом деле не важно, сколько сигналов поставлено в очередь. И тогда privateWorkerSlot будет вызываться только один раз за цикл цикла потока этого потока, независимо от того, как медленно он идет.

Также было бы тривиально добавить мьютекс для защиты mReceivedData и mWorkerSlotInvoked в обоих методах слотов (и везде, где вы могли бы их использовать). Затем вы можете сделать прямое подключение к слоту, потому что invokeMethod является потокобезопасным, а мьютекс также будет обрабатывать личные элементы данных MyClass потокобезопасными. Просто убедитесь, что вы скопировали содержимое mReceivedData в локальную переменную и разблокировали мьютекс, прежде чем выполнять трудоемкую обработку.

Примечание: непроверенный код, вероятно, имеет несколько ошибок.

Ответ 7

Вы можете использовать комбинацию DirectConnection и QueueConnection:

  • На стороне вашего работника (thread_slow):

    • Общий слот, предназначенный для вызова вашим поставщиком задач (thread_fast)

      void Working::process()
      {
         if (working)
         {
           printf("Drop a task %p\n", QThread::currentThread()); 
           return;
         }
      
        working = true;        
        emit realWork();
      }
      
    • Функция обработки (медленная): realProcess()

      void Working::realProcess()
      {
          printf("      **** Working ... %p\n",QThread::currentThread()); fflush(stdout);
      
          // Emulate a big processing ...
          usleep(3000*1000);
      
          printf("      **** Done. %p\n",QThread::currentThread());fflush(stdout);
          working = false;
          emit done();
      }
      
    • A QueueConnection от realWork до realProcess

      Working::Working()
      {
          working = false;
          connect(this,SIGNAL(realWork()),this,SLOT(realProcess()),Qt::QueuedConnection);
      }
      
  • На стороне поставщика задач (thread_fast)

    • A startWork() сигнал

      void TaskProv::orderWork()
      {
          emit startWork();
      }
      
    • A DirectConnection в слот рабочего процесса

      QObject::connect(&taskProvider,SIGNAL(startWork()),&worker,SLOT(process()),Qt::DirectConnection);
      

Некоторые примечания:

  • Функция Working::process() будет запущена в thread_fast (даже если она является функцией рабочего члена), но она просто проверяет флаг, поэтому он не должен влиять на время обработки

  • Если учесть возможность возможного дополнительного снижения задачи, вы можете защитить рабочий флаг Workter с помощью мьютекса для более жесткого управления.

  • Это очень похоже на lpapp "Очередь в загруженном слоте из слота мутатора данных", за исключением того, что тип соединения должен быть правильной комбинацией Direct и Queue.