diff --git a/libraries/octree/src/JurisdictionListener.cpp b/libraries/octree/src/JurisdictionListener.cpp index 49cdfac741..c280b48c99 100644 --- a/libraries/octree/src/JurisdictionListener.cpp +++ b/libraries/octree/src/JurisdictionListener.cpp @@ -19,8 +19,6 @@ JurisdictionListener::JurisdictionListener(NodeType_t type) : _nodeType(type), _packetSender(JurisdictionListener::DEFAULT_PACKETS_PER_SECOND) { - ReceivedPacketProcessor::_dontSleep = true; // we handle sleeping so this class doesn't need to - connect(NodeList::getInstance(), &NodeList::nodeKilled, this, &JurisdictionListener::nodeKilled); // tell our NodeList we want to hear about nodes with our node type diff --git a/libraries/shared/src/GenericThread.cpp b/libraries/shared/src/GenericThread.cpp index ff371e0f32..59f2426bdb 100644 --- a/libraries/shared/src/GenericThread.cpp +++ b/libraries/shared/src/GenericThread.cpp @@ -44,6 +44,8 @@ void GenericThread::initialize(bool isThreaded) { void GenericThread::terminate() { if (_isThreaded) { _stopThread = true; + + terminating(); if (_thread) { _thread->wait(); diff --git a/libraries/shared/src/GenericThread.h b/libraries/shared/src/GenericThread.h index 76ceed83cb..1b5b05db5d 100644 --- a/libraries/shared/src/GenericThread.h +++ b/libraries/shared/src/GenericThread.h @@ -33,6 +33,8 @@ public: /// Override this function to do whatever your class actually does, return false to exit thread early. virtual bool process() = 0; + virtual void terminating() { }; // lets your subclass know we're terminating, and it should respond appropriately + bool isThreaded() const { return _isThreaded; } public slots: diff --git a/libraries/shared/src/PacketSender.cpp b/libraries/shared/src/PacketSender.cpp index 9d819b96d7..9fac115a39 100644 --- a/libraries/shared/src/PacketSender.cpp +++ b/libraries/shared/src/PacketSender.cpp @@ -54,6 +54,9 @@ void PacketSender::queuePacketForSending(const SharedNodePointer& destinationNod unlock(); _totalPacketsQueued++; _totalBytesQueued += packet.size(); + + // Make sure to wake our actual processing thread because we now have packets for it to process. + _hasPackets.wakeAll(); } void PacketSender::setPacketsPerSecond(int packetsPerSecond) { @@ -68,6 +71,9 @@ bool PacketSender::process() { return nonThreadedProcess(); } +void PacketSender::terminating() { + _hasPackets.wakeAll(); +} bool PacketSender::threadedProcess() { bool hasSlept = false; @@ -109,11 +115,12 @@ bool PacketSender::threadedProcess() { } } - // if threaded and we haven't slept? We want to sleep a little so we don't hog the CPU, but - // we don't want to sleep too long because how ever much we sleep will delay any future unsent - // packets that arrive while we're sleeping. So we sleep 1/2 of our target fps interval + // if threaded and we haven't slept? We want to wait for our consumer to signal us with new packets if (!hasSlept) { - usleep(MINIMAL_SLEEP_INTERVAL); + // wait till we have packets + _waitingOnPacketsMutex.lock(); + _hasPackets.wait(&_waitingOnPacketsMutex); + _waitingOnPacketsMutex.unlock(); } return isStillRunning(); diff --git a/libraries/shared/src/PacketSender.h b/libraries/shared/src/PacketSender.h index 7d00e622b9..bd8de9a1b1 100644 --- a/libraries/shared/src/PacketSender.h +++ b/libraries/shared/src/PacketSender.h @@ -11,6 +11,8 @@ #ifndef __shared__PacketSender__ #define __shared__PacketSender__ +#include + #include "GenericThread.h" #include "NetworkPacket.h" #include "NodeList.h" @@ -44,6 +46,7 @@ public: int getPacketsPerSecond() const { return _packetsPerSecond; } virtual bool process(); + virtual void terminating(); /// are there packets waiting in the send queue to be sent bool hasPacketsToSend() const { return _packets.size() > 0; } @@ -113,6 +116,9 @@ private: quint64 _totalPacketsQueued; quint64 _totalBytesQueued; + + QWaitCondition _hasPackets; + QMutex _waitingOnPacketsMutex; }; #endif // __shared__PacketSender__ diff --git a/libraries/shared/src/ReceivedPacketProcessor.cpp b/libraries/shared/src/ReceivedPacketProcessor.cpp index b966109903..df7bfad165 100644 --- a/libraries/shared/src/ReceivedPacketProcessor.cpp +++ b/libraries/shared/src/ReceivedPacketProcessor.cpp @@ -12,8 +12,8 @@ #include "ReceivedPacketProcessor.h" #include "SharedUtil.h" -ReceivedPacketProcessor::ReceivedPacketProcessor() { - _dontSleep = false; +void ReceivedPacketProcessor::terminating() { + _hasPackets.wakeAll(); } void ReceivedPacketProcessor::queueReceivedPacket(const SharedNodePointer& destinationNode, const QByteArray& packet) { @@ -24,18 +24,19 @@ void ReceivedPacketProcessor::queueReceivedPacket(const SharedNodePointer& desti lock(); _packets.push_back(networkPacket); unlock(); + + // Make sure to wake our actual processing thread because we now have packets for it to process. + _hasPackets.wakeAll(); } bool ReceivedPacketProcessor::process() { - // If a derived class handles process sleeping, like the JurisdiciontListener, then it can set - // this _dontSleep member and we will honor that request. - if (_packets.size() == 0 && !_dontSleep) { - const quint64 RECEIVED_THREAD_SLEEP_INTERVAL = (1000 * 1000)/60; // check at 60fps - usleep(RECEIVED_THREAD_SLEEP_INTERVAL); + if (_packets.size() == 0) { + _waitingOnPacketsMutex.lock(); + _hasPackets.wait(&_waitingOnPacketsMutex); + _waitingOnPacketsMutex.unlock(); } while (_packets.size() > 0) { - lock(); // lock to make sure nothing changes on us NetworkPacket& packet = _packets.front(); // get the oldest packet NetworkPacket temporary = packet; // make a copy of the packet in case the vector is resized on us diff --git a/libraries/shared/src/ReceivedPacketProcessor.h b/libraries/shared/src/ReceivedPacketProcessor.h index 043dd6c6c4..7c99218753 100644 --- a/libraries/shared/src/ReceivedPacketProcessor.h +++ b/libraries/shared/src/ReceivedPacketProcessor.h @@ -11,13 +11,15 @@ #ifndef __shared__ReceivedPacketProcessor__ #define __shared__ReceivedPacketProcessor__ +#include + #include "GenericThread.h" #include "NetworkPacket.h" /// Generalized threaded processor for handling received inbound packets. class ReceivedPacketProcessor : public GenericThread { public: - ReceivedPacketProcessor(); + ReceivedPacketProcessor() { } /// Add packet from network receive thread to the processing queue. /// \param sockaddr& senderAddress the address of the sender @@ -43,11 +45,13 @@ protected: /// Implements generic processing behavior for this thread. virtual bool process(); - bool _dontSleep; + virtual void terminating(); private: std::vector _packets; + QWaitCondition _hasPackets; + QMutex _waitingOnPacketsMutex; }; #endif // __shared__PacketReceiver__