Merge pull request #2271 from ZappoMan/QWaitCondition

Fix runaway JurisdictionListener thread and other improvements
This commit is contained in:
AndrewMeadows 2014-03-12 11:54:48 -07:00
commit b1f4edb3a5
7 changed files with 36 additions and 16 deletions

View file

@ -19,8 +19,6 @@ JurisdictionListener::JurisdictionListener(NodeType_t type) :
_nodeType(type), _nodeType(type),
_packetSender(JurisdictionListener::DEFAULT_PACKETS_PER_SECOND) _packetSender(JurisdictionListener::DEFAULT_PACKETS_PER_SECOND)
{ {
ReceivedPacketProcessor::_dontSleep = true; // we handle sleeping so this class doesn't need to
connect(NodeList::getInstance(), &NodeList::nodeKilled, this, &JurisdictionListener::nodeKilled); connect(NodeList::getInstance(), &NodeList::nodeKilled, this, &JurisdictionListener::nodeKilled);
// tell our NodeList we want to hear about nodes with our node type // tell our NodeList we want to hear about nodes with our node type

View file

@ -45,6 +45,8 @@ void GenericThread::terminate() {
if (_isThreaded) { if (_isThreaded) {
_stopThread = true; _stopThread = true;
terminating();
if (_thread) { if (_thread) {
_thread->wait(); _thread->wait();
_thread->deleteLater(); _thread->deleteLater();

View file

@ -33,6 +33,8 @@ public:
/// Override this function to do whatever your class actually does, return false to exit thread early. /// Override this function to do whatever your class actually does, return false to exit thread early.
virtual bool process() = 0; virtual bool process() = 0;
virtual void terminating() { }; // lets your subclass know we're terminating, and it should respond appropriately
bool isThreaded() const { return _isThreaded; } bool isThreaded() const { return _isThreaded; }
public slots: public slots:

View file

@ -54,6 +54,9 @@ void PacketSender::queuePacketForSending(const SharedNodePointer& destinationNod
unlock(); unlock();
_totalPacketsQueued++; _totalPacketsQueued++;
_totalBytesQueued += packet.size(); _totalBytesQueued += packet.size();
// Make sure to wake our actual processing thread because we now have packets for it to process.
_hasPackets.wakeAll();
} }
void PacketSender::setPacketsPerSecond(int packetsPerSecond) { void PacketSender::setPacketsPerSecond(int packetsPerSecond) {
@ -68,6 +71,9 @@ bool PacketSender::process() {
return nonThreadedProcess(); return nonThreadedProcess();
} }
void PacketSender::terminating() {
_hasPackets.wakeAll();
}
bool PacketSender::threadedProcess() { bool PacketSender::threadedProcess() {
bool hasSlept = false; bool hasSlept = false;
@ -109,11 +115,12 @@ bool PacketSender::threadedProcess() {
} }
} }
// if threaded and we haven't slept? We want to sleep a little so we don't hog the CPU, but // if threaded and we haven't slept? We want to wait for our consumer to signal us with new packets
// we don't want to sleep too long because how ever much we sleep will delay any future unsent
// packets that arrive while we're sleeping. So we sleep 1/2 of our target fps interval
if (!hasSlept) { if (!hasSlept) {
usleep(MINIMAL_SLEEP_INTERVAL); // wait till we have packets
_waitingOnPacketsMutex.lock();
_hasPackets.wait(&_waitingOnPacketsMutex);
_waitingOnPacketsMutex.unlock();
} }
return isStillRunning(); return isStillRunning();

View file

@ -11,6 +11,8 @@
#ifndef __shared__PacketSender__ #ifndef __shared__PacketSender__
#define __shared__PacketSender__ #define __shared__PacketSender__
#include <QWaitCondition>
#include "GenericThread.h" #include "GenericThread.h"
#include "NetworkPacket.h" #include "NetworkPacket.h"
#include "NodeList.h" #include "NodeList.h"
@ -44,6 +46,7 @@ public:
int getPacketsPerSecond() const { return _packetsPerSecond; } int getPacketsPerSecond() const { return _packetsPerSecond; }
virtual bool process(); virtual bool process();
virtual void terminating();
/// are there packets waiting in the send queue to be sent /// are there packets waiting in the send queue to be sent
bool hasPacketsToSend() const { return _packets.size() > 0; } bool hasPacketsToSend() const { return _packets.size() > 0; }
@ -113,6 +116,9 @@ private:
quint64 _totalPacketsQueued; quint64 _totalPacketsQueued;
quint64 _totalBytesQueued; quint64 _totalBytesQueued;
QWaitCondition _hasPackets;
QMutex _waitingOnPacketsMutex;
}; };
#endif // __shared__PacketSender__ #endif // __shared__PacketSender__

View file

@ -12,8 +12,8 @@
#include "ReceivedPacketProcessor.h" #include "ReceivedPacketProcessor.h"
#include "SharedUtil.h" #include "SharedUtil.h"
ReceivedPacketProcessor::ReceivedPacketProcessor() { void ReceivedPacketProcessor::terminating() {
_dontSleep = false; _hasPackets.wakeAll();
} }
void ReceivedPacketProcessor::queueReceivedPacket(const SharedNodePointer& destinationNode, const QByteArray& packet) { void ReceivedPacketProcessor::queueReceivedPacket(const SharedNodePointer& destinationNode, const QByteArray& packet) {
@ -24,18 +24,19 @@ void ReceivedPacketProcessor::queueReceivedPacket(const SharedNodePointer& desti
lock(); lock();
_packets.push_back(networkPacket); _packets.push_back(networkPacket);
unlock(); unlock();
// Make sure to wake our actual processing thread because we now have packets for it to process.
_hasPackets.wakeAll();
} }
bool ReceivedPacketProcessor::process() { bool ReceivedPacketProcessor::process() {
// If a derived class handles process sleeping, like the JurisdiciontListener, then it can set if (_packets.size() == 0) {
// this _dontSleep member and we will honor that request. _waitingOnPacketsMutex.lock();
if (_packets.size() == 0 && !_dontSleep) { _hasPackets.wait(&_waitingOnPacketsMutex);
const quint64 RECEIVED_THREAD_SLEEP_INTERVAL = (1000 * 1000)/60; // check at 60fps _waitingOnPacketsMutex.unlock();
usleep(RECEIVED_THREAD_SLEEP_INTERVAL);
} }
while (_packets.size() > 0) { while (_packets.size() > 0) {
lock(); // lock to make sure nothing changes on us lock(); // lock to make sure nothing changes on us
NetworkPacket& packet = _packets.front(); // get the oldest packet NetworkPacket& packet = _packets.front(); // get the oldest packet
NetworkPacket temporary = packet; // make a copy of the packet in case the vector is resized on us NetworkPacket temporary = packet; // make a copy of the packet in case the vector is resized on us

View file

@ -11,13 +11,15 @@
#ifndef __shared__ReceivedPacketProcessor__ #ifndef __shared__ReceivedPacketProcessor__
#define __shared__ReceivedPacketProcessor__ #define __shared__ReceivedPacketProcessor__
#include <QWaitCondition>
#include "GenericThread.h" #include "GenericThread.h"
#include "NetworkPacket.h" #include "NetworkPacket.h"
/// Generalized threaded processor for handling received inbound packets. /// Generalized threaded processor for handling received inbound packets.
class ReceivedPacketProcessor : public GenericThread { class ReceivedPacketProcessor : public GenericThread {
public: public:
ReceivedPacketProcessor(); ReceivedPacketProcessor() { }
/// Add packet from network receive thread to the processing queue. /// Add packet from network receive thread to the processing queue.
/// \param sockaddr& senderAddress the address of the sender /// \param sockaddr& senderAddress the address of the sender
@ -43,11 +45,13 @@ protected:
/// Implements generic processing behavior for this thread. /// Implements generic processing behavior for this thread.
virtual bool process(); virtual bool process();
bool _dontSleep; virtual void terminating();
private: private:
std::vector<NetworkPacket> _packets; std::vector<NetworkPacket> _packets;
QWaitCondition _hasPackets;
QMutex _waitingOnPacketsMutex;
}; };
#endif // __shared__PacketReceiver__ #endif // __shared__PacketReceiver__