drop packets on floor when shutting down

This commit is contained in:
Stephen Birarda 2015-05-08 11:57:25 -07:00
parent fe1b8cc52b
commit 3d06a86670
5 changed files with 53 additions and 30 deletions

View file

@ -9,11 +9,15 @@
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "PacketHeaders.h"
#include "SharedUtil.h"
#include "OctreeQueryNode.h"
#include <cstring>
#include <cstdio>
#include <PacketHeaders.h>
#include <SharedUtil.h>
#include <UUID.h>
#include "OctreeSendThread.h"
OctreeQueryNode::OctreeQueryNode() :
@ -92,7 +96,7 @@ void OctreeQueryNode::sendThreadFinished() {
}
void OctreeQueryNode::initializeOctreeSendThread(OctreeServer* myServer, const SharedNodePointer& node) {
_octreeSendThread = new OctreeSendThread(myServer, node);
_octreeSendThread = new OctreeSendThread(myServer, node);
// we want to be notified when the thread finishes
connect(_octreeSendThread, &GenericThread::finished, this, &OctreeQueryNode::sendThreadFinished);

View file

@ -30,9 +30,14 @@ OctreeSendThread::OctreeSendThread(OctreeServer* myServer, const SharedNodePoint
_isShuttingDown(false)
{
QString safeServerName("Octree");
// set our QThread object name so we can identify this thread while debugging
setObjectName(QString("Octree Send Thread (%1)").arg(uuidStringWithoutCurlyBraces(node->getUUID())));
if (_myServer) {
safeServerName = _myServer->getMyServerName();
}
qDebug() << qPrintable(safeServerName) << "server [" << _myServer << "]: client connected "
"- starting sending thread [" << this << "]";

View file

@ -836,36 +836,42 @@ void OctreeServer::parsePayload() {
void OctreeServer::readPendingDatagram(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr) {
auto nodeList = DependencyManager::get<NodeList>();
// If we know we're shutting down we just drop these packets on the floor.
// This stops us from initializing send threads we just shut down.
if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
PacketType packetType = packetTypeForPacket(receivedPacket);
SharedNodePointer matchingNode = nodeList->sendingNodeForPacket(receivedPacket);
if (packetType == getMyQueryMessageType()) {
// If we got a query packet, then we're talking to an agent, and we
// need to make sure we have it in our nodeList.
if (matchingNode) {
nodeList->updateNodeWithDataFromPacket(matchingNode, receivedPacket);
OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData();
if (nodeData && !nodeData->isOctreeSendThreadInitalized()) {
nodeData->initializeOctreeSendThread(this, matchingNode);
if (!_isShuttingDown) {
if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
PacketType packetType = packetTypeForPacket(receivedPacket);
SharedNodePointer matchingNode = nodeList->sendingNodeForPacket(receivedPacket);
if (packetType == getMyQueryMessageType()) {
// If we got a query packet, then we're talking to an agent, and we
// need to make sure we have it in our nodeList.
if (matchingNode) {
nodeList->updateNodeWithDataFromPacket(matchingNode, receivedPacket);
OctreeQueryNode* nodeData = (OctreeQueryNode*) matchingNode->getLinkedData();
if (nodeData && !nodeData->isOctreeSendThreadInitalized()) {
nodeData->initializeOctreeSendThread(this, matchingNode);
}
}
}
} else if (packetType == PacketTypeOctreeDataNack) {
// If we got a nack packet, then we're talking to an agent, and we
// need to make sure we have it in our nodeList.
if (matchingNode) {
OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData();
if (nodeData) {
nodeData->parseNackPacket(receivedPacket);
} else if (packetType == PacketTypeOctreeDataNack) {
// If we got a nack packet, then we're talking to an agent, and we
// need to make sure we have it in our nodeList.
if (matchingNode) {
OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData();
if (nodeData) {
nodeData->parseNackPacket(receivedPacket);
}
}
} else if (packetType == PacketTypeJurisdictionRequest) {
_jurisdictionSender->queueReceivedPacket(matchingNode, receivedPacket);
} else if (_octreeInboundPacketProcessor && getOctree()->handlesEditPacketType(packetType)) {
_octreeInboundPacketProcessor->queueReceivedPacket(matchingNode, receivedPacket);
} else {
// let processNodeData handle it.
DependencyManager::get<NodeList>()->processNodeData(senderSockAddr, receivedPacket);
}
} else if (packetType == PacketTypeJurisdictionRequest) {
_jurisdictionSender->queueReceivedPacket(matchingNode, receivedPacket);
} else if (_octreeInboundPacketProcessor && getOctree()->handlesEditPacketType(packetType)) {
_octreeInboundPacketProcessor->queueReceivedPacket(matchingNode, receivedPacket);
} else {
// let processNodeData handle it.
DependencyManager::get<NodeList>()->processNodeData(senderSockAddr, receivedPacket);
}
}
}
@ -1214,6 +1220,9 @@ void OctreeServer::forceNodeShutdown(SharedNodePointer node) {
void OctreeServer::aboutToFinish() {
qDebug() << qPrintable(_safeServerName) << "server STARTING about to finish...";
_isShuttingDown = true;
qDebug() << qPrintable(_safeServerName) << "inform Octree Inbound Packet Processor that we are shutting down...";
// we're going down - set the NodeList linkedDataCallback to NULL so we do not create any more OctreeSendThreads

View file

@ -146,12 +146,14 @@ protected:
QString getStatusLink();
void setupDatagramProcessingThread();
int _argc;
const char** _argv;
char** _parsedArgV;
QJsonObject _settings;
bool _isShuttingDown = false;
HTTPManager* _httpManager;
int _statusPort;
QString _statusHost;

View file

@ -32,6 +32,9 @@ void GenericThread::initialize(bool isThreaded) {
if (_isThreaded) {
_thread = new QThread(this);
// match the thread name to our object name
_thread->setObjectName(objectName());
// when the worker thread is started, call our engine's run..
connect(_thread, SIGNAL(started()), this, SLOT(threadRoutine()));