From f5cdb98efb0d67d8a63a8e2da7ad4d207cacc2c4 Mon Sep 17 00:00:00 2001 From: ZappoMan Date: Tue, 11 Mar 2014 20:32:02 -0700 Subject: [PATCH 1/4] switch ReceivedPacketProcessor to using QWaitCondition --- libraries/shared/src/GenericThread.cpp | 2 ++ libraries/shared/src/GenericThread.h | 2 ++ libraries/shared/src/ReceivedPacketProcessor.cpp | 13 ++++++++++--- libraries/shared/src/ReceivedPacketProcessor.h | 6 ++++++ 4 files changed, 20 insertions(+), 3 deletions(-) diff --git a/libraries/shared/src/GenericThread.cpp b/libraries/shared/src/GenericThread.cpp index ff371e0f32..59f2426bdb 100644 --- a/libraries/shared/src/GenericThread.cpp +++ b/libraries/shared/src/GenericThread.cpp @@ -44,6 +44,8 @@ void GenericThread::initialize(bool isThreaded) { void GenericThread::terminate() { if (_isThreaded) { _stopThread = true; + + terminating(); if (_thread) { _thread->wait(); diff --git a/libraries/shared/src/GenericThread.h b/libraries/shared/src/GenericThread.h index 76ceed83cb..1b5b05db5d 100644 --- a/libraries/shared/src/GenericThread.h +++ b/libraries/shared/src/GenericThread.h @@ -33,6 +33,8 @@ public: /// Override this function to do whatever your class actually does, return false to exit thread early. virtual bool process() = 0; + virtual void terminating() { }; // lets your subclass know we're terminating, and it should respond appropriately + bool isThreaded() const { return _isThreaded; } public slots: diff --git a/libraries/shared/src/ReceivedPacketProcessor.cpp b/libraries/shared/src/ReceivedPacketProcessor.cpp index b966109903..419e79881f 100644 --- a/libraries/shared/src/ReceivedPacketProcessor.cpp +++ b/libraries/shared/src/ReceivedPacketProcessor.cpp @@ -16,6 +16,10 @@ ReceivedPacketProcessor::ReceivedPacketProcessor() { _dontSleep = false; } +void ReceivedPacketProcessor::terminating() { + _hasPackets.wakeAll(); +} + void ReceivedPacketProcessor::queueReceivedPacket(const SharedNodePointer& destinationNode, const QByteArray& packet) { // Make sure our Node and NodeList knows we've heard from this node. destinationNode->setLastHeardMicrostamp(usecTimestampNow()); @@ -24,6 +28,9 @@ void ReceivedPacketProcessor::queueReceivedPacket(const SharedNodePointer& desti lock(); _packets.push_back(networkPacket); unlock(); + + // Make sure to wake our actual processing thread because we now have packets for it to process. + _hasPackets.wakeAll(); } bool ReceivedPacketProcessor::process() { @@ -31,11 +38,11 @@ bool ReceivedPacketProcessor::process() { // If a derived class handles process sleeping, like the JurisdiciontListener, then it can set // this _dontSleep member and we will honor that request. if (_packets.size() == 0 && !_dontSleep) { - const quint64 RECEIVED_THREAD_SLEEP_INTERVAL = (1000 * 1000)/60; // check at 60fps - usleep(RECEIVED_THREAD_SLEEP_INTERVAL); + _waitingOnPacketsMutex.lock(); + _hasPackets.wait(&_waitingOnPacketsMutex); + _waitingOnPacketsMutex.unlock(); } while (_packets.size() > 0) { - lock(); // lock to make sure nothing changes on us NetworkPacket& packet = _packets.front(); // get the oldest packet NetworkPacket temporary = packet; // make a copy of the packet in case the vector is resized on us diff --git a/libraries/shared/src/ReceivedPacketProcessor.h b/libraries/shared/src/ReceivedPacketProcessor.h index 043dd6c6c4..4088422561 100644 --- a/libraries/shared/src/ReceivedPacketProcessor.h +++ b/libraries/shared/src/ReceivedPacketProcessor.h @@ -11,6 +11,8 @@ #ifndef __shared__ReceivedPacketProcessor__ #define __shared__ReceivedPacketProcessor__ +#include + #include "GenericThread.h" #include "NetworkPacket.h" @@ -43,11 +45,15 @@ protected: /// Implements generic processing behavior for this thread. virtual bool process(); + virtual void terminating(); + bool _dontSleep; private: std::vector _packets; + QWaitCondition _hasPackets; + QMutex _waitingOnPacketsMutex; }; #endif // __shared__PacketReceiver__ From 8befefb0542feda2f76bcb7301390b35ea8fe548 Mon Sep 17 00:00:00 2001 From: ZappoMan Date: Tue, 11 Mar 2014 20:45:46 -0700 Subject: [PATCH 2/4] first cut at making PacketSender use QWaitCondition --- libraries/shared/src/PacketSender.cpp | 17 ++++++++++++++++- libraries/shared/src/PacketSender.h | 6 ++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/libraries/shared/src/PacketSender.cpp b/libraries/shared/src/PacketSender.cpp index 9d819b96d7..52f8759c8b 100644 --- a/libraries/shared/src/PacketSender.cpp +++ b/libraries/shared/src/PacketSender.cpp @@ -54,6 +54,10 @@ void PacketSender::queuePacketForSending(const SharedNodePointer& destinationNod unlock(); _totalPacketsQueued++; _totalBytesQueued += packet.size(); + + // Make sure to wake our actual processing thread because we now have packets for it to process. + qDebug() << "PacketSender::queuePacketForSending()... wake up, we need to send packets!."; + _hasPackets.wakeAll(); } void PacketSender::setPacketsPerSecond(int packetsPerSecond) { @@ -68,6 +72,10 @@ bool PacketSender::process() { return nonThreadedProcess(); } +void PacketSender::terminating() { + qDebug() << "PacketSender::terminating()... wake up, we need to die."; + _hasPackets.wakeAll(); +} bool PacketSender::threadedProcess() { bool hasSlept = false; @@ -113,7 +121,14 @@ bool PacketSender::threadedProcess() { // we don't want to sleep too long because how ever much we sleep will delay any future unsent // packets that arrive while we're sleeping. So we sleep 1/2 of our target fps interval if (!hasSlept) { - usleep(MINIMAL_SLEEP_INTERVAL); + //usleep(MINIMAL_SLEEP_INTERVAL); + + // wait till we have packets + _waitingOnPacketsMutex.lock(); + qDebug() << "PacketSender::threadedProcess()... waiting on packets to send..."; + _hasPackets.wait(&_waitingOnPacketsMutex); + qDebug() << "PacketSender::threadedProcess()... YIPEEE we're awake..."; + _waitingOnPacketsMutex.unlock(); } return isStillRunning(); diff --git a/libraries/shared/src/PacketSender.h b/libraries/shared/src/PacketSender.h index 7d00e622b9..bd8de9a1b1 100644 --- a/libraries/shared/src/PacketSender.h +++ b/libraries/shared/src/PacketSender.h @@ -11,6 +11,8 @@ #ifndef __shared__PacketSender__ #define __shared__PacketSender__ +#include + #include "GenericThread.h" #include "NetworkPacket.h" #include "NodeList.h" @@ -44,6 +46,7 @@ public: int getPacketsPerSecond() const { return _packetsPerSecond; } virtual bool process(); + virtual void terminating(); /// are there packets waiting in the send queue to be sent bool hasPacketsToSend() const { return _packets.size() > 0; } @@ -113,6 +116,9 @@ private: quint64 _totalPacketsQueued; quint64 _totalBytesQueued; + + QWaitCondition _hasPackets; + QMutex _waitingOnPacketsMutex; }; #endif // __shared__PacketSender__ From 025da315c6293bc41919995397e5c8cc41825164 Mon Sep 17 00:00:00 2001 From: ZappoMan Date: Tue, 11 Mar 2014 21:11:47 -0700 Subject: [PATCH 3/4] removed dontSleep from JurisdictionListener --- libraries/octree/src/JurisdictionListener.cpp | 2 -- libraries/shared/src/PacketSender.cpp | 4 ---- libraries/shared/src/ReceivedPacketProcessor.cpp | 8 +------- libraries/shared/src/ReceivedPacketProcessor.h | 4 +--- 4 files changed, 2 insertions(+), 16 deletions(-) diff --git a/libraries/octree/src/JurisdictionListener.cpp b/libraries/octree/src/JurisdictionListener.cpp index 49cdfac741..c280b48c99 100644 --- a/libraries/octree/src/JurisdictionListener.cpp +++ b/libraries/octree/src/JurisdictionListener.cpp @@ -19,8 +19,6 @@ JurisdictionListener::JurisdictionListener(NodeType_t type) : _nodeType(type), _packetSender(JurisdictionListener::DEFAULT_PACKETS_PER_SECOND) { - ReceivedPacketProcessor::_dontSleep = true; // we handle sleeping so this class doesn't need to - connect(NodeList::getInstance(), &NodeList::nodeKilled, this, &JurisdictionListener::nodeKilled); // tell our NodeList we want to hear about nodes with our node type diff --git a/libraries/shared/src/PacketSender.cpp b/libraries/shared/src/PacketSender.cpp index 52f8759c8b..062976cc37 100644 --- a/libraries/shared/src/PacketSender.cpp +++ b/libraries/shared/src/PacketSender.cpp @@ -56,7 +56,6 @@ void PacketSender::queuePacketForSending(const SharedNodePointer& destinationNod _totalBytesQueued += packet.size(); // Make sure to wake our actual processing thread because we now have packets for it to process. - qDebug() << "PacketSender::queuePacketForSending()... wake up, we need to send packets!."; _hasPackets.wakeAll(); } @@ -73,7 +72,6 @@ bool PacketSender::process() { } void PacketSender::terminating() { - qDebug() << "PacketSender::terminating()... wake up, we need to die."; _hasPackets.wakeAll(); } @@ -125,9 +123,7 @@ bool PacketSender::threadedProcess() { // wait till we have packets _waitingOnPacketsMutex.lock(); - qDebug() << "PacketSender::threadedProcess()... waiting on packets to send..."; _hasPackets.wait(&_waitingOnPacketsMutex); - qDebug() << "PacketSender::threadedProcess()... YIPEEE we're awake..."; _waitingOnPacketsMutex.unlock(); } diff --git a/libraries/shared/src/ReceivedPacketProcessor.cpp b/libraries/shared/src/ReceivedPacketProcessor.cpp index 419e79881f..df7bfad165 100644 --- a/libraries/shared/src/ReceivedPacketProcessor.cpp +++ b/libraries/shared/src/ReceivedPacketProcessor.cpp @@ -12,10 +12,6 @@ #include "ReceivedPacketProcessor.h" #include "SharedUtil.h" -ReceivedPacketProcessor::ReceivedPacketProcessor() { - _dontSleep = false; -} - void ReceivedPacketProcessor::terminating() { _hasPackets.wakeAll(); } @@ -35,9 +31,7 @@ void ReceivedPacketProcessor::queueReceivedPacket(const SharedNodePointer& desti bool ReceivedPacketProcessor::process() { - // If a derived class handles process sleeping, like the JurisdiciontListener, then it can set - // this _dontSleep member and we will honor that request. - if (_packets.size() == 0 && !_dontSleep) { + if (_packets.size() == 0) { _waitingOnPacketsMutex.lock(); _hasPackets.wait(&_waitingOnPacketsMutex); _waitingOnPacketsMutex.unlock(); diff --git a/libraries/shared/src/ReceivedPacketProcessor.h b/libraries/shared/src/ReceivedPacketProcessor.h index 4088422561..7c99218753 100644 --- a/libraries/shared/src/ReceivedPacketProcessor.h +++ b/libraries/shared/src/ReceivedPacketProcessor.h @@ -19,7 +19,7 @@ /// Generalized threaded processor for handling received inbound packets. class ReceivedPacketProcessor : public GenericThread { public: - ReceivedPacketProcessor(); + ReceivedPacketProcessor() { } /// Add packet from network receive thread to the processing queue. /// \param sockaddr& senderAddress the address of the sender @@ -47,8 +47,6 @@ protected: virtual void terminating(); - bool _dontSleep; - private: std::vector _packets; From e72d3127bf85b701ed641e348ce9cc4d5f09decf Mon Sep 17 00:00:00 2001 From: ZappoMan Date: Tue, 11 Mar 2014 21:25:30 -0700 Subject: [PATCH 4/4] fix comment and remove dead code --- libraries/shared/src/PacketSender.cpp | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/libraries/shared/src/PacketSender.cpp b/libraries/shared/src/PacketSender.cpp index 062976cc37..9fac115a39 100644 --- a/libraries/shared/src/PacketSender.cpp +++ b/libraries/shared/src/PacketSender.cpp @@ -115,12 +115,8 @@ bool PacketSender::threadedProcess() { } } - // if threaded and we haven't slept? We want to sleep a little so we don't hog the CPU, but - // we don't want to sleep too long because how ever much we sleep will delay any future unsent - // packets that arrive while we're sleeping. So we sleep 1/2 of our target fps interval + // if threaded and we haven't slept? We want to wait for our consumer to signal us with new packets if (!hasSlept) { - //usleep(MINIMAL_SLEEP_INTERVAL); - // wait till we have packets _waitingOnPacketsMutex.lock(); _hasPackets.wait(&_waitingOnPacketsMutex);