Merge pull request #10632 from Atlante45/fix/node-leak

Fix SharedNodePointer leak
This commit is contained in:
Andrew Meadows 2017-06-14 17:03:52 -07:00 committed by GitHub
commit 9c928f16b2
8 changed files with 23 additions and 16 deletions

View file

@ -47,7 +47,7 @@ void OctreeInboundPacketProcessor::resetStats() {
_singleSenderStats.clear(); _singleSenderStats.clear();
} }
unsigned long OctreeInboundPacketProcessor::getMaxWait() const { uint32_t OctreeInboundPacketProcessor::getMaxWait() const {
// calculate time until next sendNackPackets() // calculate time until next sendNackPackets()
quint64 nextNackTime = _lastNackTime + TOO_LONG_SINCE_LAST_NACK; quint64 nextNackTime = _lastNackTime + TOO_LONG_SINCE_LAST_NACK;
quint64 now = usecTimestampNow(); quint64 now = usecTimestampNow();

View file

@ -80,7 +80,7 @@ protected:
virtual void processPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer sendingNode) override; virtual void processPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer sendingNode) override;
virtual unsigned long getMaxWait() const override; virtual uint32_t getMaxWait() const override;
virtual void preProcess() override; virtual void preProcess() override;
virtual void midProcess() override; virtual void midProcess() override;

View file

@ -584,7 +584,7 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t
return matchingNode; return matchingNode;
} else { } else {
// we didn't have this node, so add them // we didn't have this node, so add them
Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket, permissions, connectionSecret, this); Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket, permissions, connectionSecret);
if (nodeType == NodeType::AudioMixer) { if (nodeType == NodeType::AudioMixer) {
LimitedNodeList::flagTimeForConnectionStep(LimitedNodeList::AddedAudioMixer); LimitedNodeList::flagTimeForConnectionStep(LimitedNodeList::AddedAudioMixer);
@ -617,24 +617,28 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t
} }
// insert the new node and release our read lock // insert the new node and release our read lock
_nodeHash.insert(UUIDNodePair(newNode->getUUID(), newNodePointer)); _nodeHash.emplace(newNode->getUUID(), newNodePointer);
readLocker.unlock(); readLocker.unlock();
qCDebug(networking) << "Added" << *newNode; qCDebug(networking) << "Added" << *newNode;
auto weakPtr = newNodePointer.toWeakRef(); // We don't want the lambdas to hold a strong ref
emit nodeAdded(newNodePointer); emit nodeAdded(newNodePointer);
if (newNodePointer->getActiveSocket()) { if (newNodePointer->getActiveSocket()) {
emit nodeActivated(newNodePointer); emit nodeActivated(newNodePointer);
} else { } else {
connect(newNodePointer.data(), &NetworkPeer::socketActivated, this, [=] { connect(newNodePointer.data(), &NetworkPeer::socketActivated, this, [this, weakPtr] {
emit nodeActivated(newNodePointer); auto sharedPtr = weakPtr.lock();
disconnect(newNodePointer.data(), &NetworkPeer::socketActivated, this, 0); if (sharedPtr) {
emit nodeActivated(sharedPtr);
disconnect(sharedPtr.data(), &NetworkPeer::socketActivated, this, 0);
}
}); });
} }
// Signal when a socket changes, so we can start the hole punch over. // Signal when a socket changes, so we can start the hole punch over.
auto weakPtr = newNodePointer.toWeakRef(); // We don't want the lambda to hold a strong ref connect(newNodePointer.data(), &NetworkPeer::socketUpdated, this, [this, weakPtr] {
connect(newNodePointer.data(), &NetworkPeer::socketUpdated, this, [=] {
emit nodeSocketUpdated(weakPtr); emit nodeSocketUpdated(weakPtr);
}); });

View file

@ -40,7 +40,7 @@ public:
Node(const QUuid& uuid, NodeType_t type, Node(const QUuid& uuid, NodeType_t type,
const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket, const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket,
const NodePermissions& permissions, const QUuid& connectionSecret = QUuid(), const NodePermissions& permissions, const QUuid& connectionSecret = QUuid(),
QObject* parent = 0); QObject* parent = nullptr);
bool operator==(const Node& otherNode) const { return _uuid == otherNode._uuid; } bool operator==(const Node& otherNode) const { return _uuid == otherNode._uuid; }
bool operator!=(const Node& otherNode) const { return !(*this == otherNode); } bool operator!=(const Node& otherNode) const { return !(*this == otherNode); }

View file

@ -20,6 +20,8 @@
class ReceivedPacketProcessor : public GenericThread { class ReceivedPacketProcessor : public GenericThread {
Q_OBJECT Q_OBJECT
public: public:
static const uint64_t MAX_WAIT_TIME { 100 }; // Max wait time in ms
ReceivedPacketProcessor(); ReceivedPacketProcessor();
/// Add packet from network receive thread to the processing queue. /// Add packet from network receive thread to the processing queue.
@ -63,8 +65,8 @@ protected:
/// Implements generic processing behavior for this thread. /// Implements generic processing behavior for this thread.
virtual bool process() override; virtual bool process() override;
/// Determines the timeout of the wait when there are no packets to process. Default value means no timeout /// Determines the timeout of the wait when there are no packets to process. Default value is 100ms to allow for regular event processing.
virtual unsigned long getMaxWait() const { return ULONG_MAX; } virtual uint32_t getMaxWait() const { return MAX_WAIT_TIME; }
/// Override to do work before the packets processing loop. Default does nothing. /// Override to do work before the packets processing loop. Default does nothing.
virtual void preProcess() { } virtual void preProcess() { }

View file

@ -35,14 +35,13 @@ void JurisdictionListener::nodeKilled(SharedNodePointer node) {
} }
bool JurisdictionListener::queueJurisdictionRequest() { bool JurisdictionListener::queueJurisdictionRequest() {
auto packet = NLPacket::create(PacketType::JurisdictionRequest, 0);
auto nodeList = DependencyManager::get<NodeList>(); auto nodeList = DependencyManager::get<NodeList>();
int nodeCount = 0; int nodeCount = 0;
nodeList->eachNode([&](const SharedNodePointer& node) { nodeList->eachNode([&](const SharedNodePointer& node) {
if (node->getType() == getNodeType() && node->getActiveSocket()) { if (node->getType() == getNodeType() && node->getActiveSocket()) {
auto packet = NLPacket::create(PacketType::JurisdictionRequest, 0);
_packetSender.queuePacketForSending(node, std::move(packet)); _packetSender.queuePacketForSending(node, std::move(packet));
nodeCount++; nodeCount++;
} }

View file

@ -41,8 +41,6 @@ bool JurisdictionSender::process() {
// call our ReceivedPacketProcessor base class process so we'll get any pending packets // call our ReceivedPacketProcessor base class process so we'll get any pending packets
if (continueProcessing && (continueProcessing = ReceivedPacketProcessor::process())) { if (continueProcessing && (continueProcessing = ReceivedPacketProcessor::process())) {
auto packet = (_jurisdictionMap) ? _jurisdictionMap->packIntoPacket()
: JurisdictionMap::packEmptyJurisdictionIntoMessage(getNodeType());
int nodeCount = 0; int nodeCount = 0;
lockRequestingNodes(); lockRequestingNodes();
@ -53,6 +51,8 @@ bool JurisdictionSender::process() {
SharedNodePointer node = DependencyManager::get<NodeList>()->nodeWithUUID(nodeUUID); SharedNodePointer node = DependencyManager::get<NodeList>()->nodeWithUUID(nodeUUID);
if (node && node->getActiveSocket()) { if (node && node->getActiveSocket()) {
auto packet = (_jurisdictionMap) ? _jurisdictionMap->packIntoPacket()
: JurisdictionMap::packEmptyJurisdictionIntoMessage(getNodeType());
_packetSender.queuePacketForSending(node, std::move(packet)); _packetSender.queuePacketForSending(node, std::move(packet));
nodeCount++; nodeCount++;
} }

View file

@ -10,6 +10,7 @@
// //
#include <QDebug> #include <QDebug>
#include <QtCore/QCoreApplication>
#include "GenericThread.h" #include "GenericThread.h"
@ -73,6 +74,7 @@ void GenericThread::threadRoutine() {
} }
while (!_stopThread) { while (!_stopThread) {
QCoreApplication::processEvents();
// override this function to do whatever your class actually does, return false to exit thread early // override this function to do whatever your class actually does, return false to exit thread early
if (!process()) { if (!process()) {