From 0693d0cbbdc055824d85697e1f3765053c4a52e1 Mon Sep 17 00:00:00 2001 From: ZappoMan Date: Thu, 22 Aug 2013 17:09:25 -0700 Subject: [PATCH] moved each client to its own sending thread --- voxel-server/src/NodeWatcher.cpp | 26 ++++++++++++ voxel-server/src/NodeWatcher.h | 23 +++++++++++ voxel-server/src/VoxelNodeData.cpp | 16 +++++++- voxel-server/src/VoxelNodeData.h | 4 ++ voxel-server/src/VoxelSendThread.cpp | 60 ++++++++++++++-------------- voxel-server/src/VoxelSendThread.h | 12 +++--- voxel-server/src/main.cpp | 21 ++++------ 7 files changed, 110 insertions(+), 52 deletions(-) create mode 100644 voxel-server/src/NodeWatcher.cpp create mode 100644 voxel-server/src/NodeWatcher.h diff --git a/voxel-server/src/NodeWatcher.cpp b/voxel-server/src/NodeWatcher.cpp new file mode 100644 index 0000000000..dd3244326a --- /dev/null +++ b/voxel-server/src/NodeWatcher.cpp @@ -0,0 +1,26 @@ +// +// NodeWatcher.h +// voxel-server +// +// Created by Brad Hefta-Gaub on 8/21/13 +// Copyright (c) 2013 High Fidelity, Inc. All rights reserved. +// +// Threaded or non-threaded object for sending voxels to a client +// + +#include +#include "NodeWatcher.h" +#include "VoxelNodeData.h" + +void NodeWatcher::nodeAdded(Node* node) { + // do nothing +} + +void NodeWatcher::nodeKilled(Node* node) { + // Use this to cleanup our node + if (node->getType() == NODE_TYPE_AGENT) { + VoxelNodeData* nodeData = (VoxelNodeData*)node->getLinkedData(); + node->setLinkedData(NULL); + delete nodeData; + } +}; diff --git a/voxel-server/src/NodeWatcher.h b/voxel-server/src/NodeWatcher.h new file mode 100644 index 0000000000..7d67397faa --- /dev/null +++ b/voxel-server/src/NodeWatcher.h @@ -0,0 +1,23 @@ +// +// NodeWatcher.h +// voxel-server +// +// Created by Brad Hefta-Gaub on 8/21/13 +// Copyright (c) 2013 High Fidelity, Inc. All rights reserved. +// +// Threaded or non-threaded object for sending voxels to a client +// + +#ifndef __voxel_server__NodeWatcher__ +#define __voxel_server__NodeWatcher__ + +#include + +/// Voxel server's node watcher, which watches for nodes being killed and cleans up their data and threads +class NodeWatcher : public virtual NodeListHook { +public: + virtual void nodeAdded(Node* node); + virtual void nodeKilled(Node* node); +}; + +#endif // __voxel_server__NodeWatcher__ diff --git a/voxel-server/src/VoxelNodeData.cpp b/voxel-server/src/VoxelNodeData.cpp index c4b8ee8b79..6d0d7d23cf 100644 --- a/voxel-server/src/VoxelNodeData.cpp +++ b/voxel-server/src/VoxelNodeData.cpp @@ -11,6 +11,7 @@ #include "VoxelNodeData.h" #include #include +#include "VoxelSendThread.h" VoxelNodeData::VoxelNodeData(Node* owningNode) : AvatarData(owningNode), @@ -21,11 +22,19 @@ VoxelNodeData::VoxelNodeData(Node* owningNode) : _lastTimeBagEmpty(0), _viewFrustumChanging(false), _viewFrustumJustStoppedChanging(true), - _currentPacketIsColor(true) + _currentPacketIsColor(true), + _voxelSendThread(NULL) { _voxelPacket = new unsigned char[MAX_VOXEL_PACKET_SIZE]; _voxelPacketAt = _voxelPacket; resetVoxelPacket(); + + // Create voxel sending thread... + uint16_t nodeID = getOwningNode()->getNodeID(); + _voxelSendThread = new VoxelSendThread(nodeID); + if (_voxelSendThread) { + _voxelSendThread->initialize(true); + } } @@ -49,6 +58,11 @@ void VoxelNodeData::writeToPacket(unsigned char* buffer, int bytes) { VoxelNodeData::~VoxelNodeData() { delete[] _voxelPacket; + + if (_voxelSendThread) { + _voxelSendThread->terminate(); + delete _voxelSendThread; + } } bool VoxelNodeData::updateCurrentViewFrustum() { diff --git a/voxel-server/src/VoxelNodeData.h b/voxel-server/src/VoxelNodeData.h index 746db6da93..efe28243a7 100644 --- a/voxel-server/src/VoxelNodeData.h +++ b/voxel-server/src/VoxelNodeData.h @@ -18,6 +18,8 @@ #include #include +class VoxelSendThread; // forward declare + class VoxelNodeData : public AvatarData { public: VoxelNodeData(Node* owningNode); @@ -80,6 +82,8 @@ private: bool _viewFrustumChanging; bool _viewFrustumJustStoppedChanging; bool _currentPacketIsColor; + + VoxelSendThread* _voxelSendThread; }; #endif /* defined(__hifi__VoxelNodeData__) */ diff --git a/voxel-server/src/VoxelSendThread.cpp b/voxel-server/src/VoxelSendThread.cpp index d0c9503be7..4c69c7f035 100644 --- a/voxel-server/src/VoxelSendThread.cpp +++ b/voxel-server/src/VoxelSendThread.cpp @@ -15,31 +15,34 @@ #include "VoxelSendThread.h" #include "VoxelServer.h" -VoxelSendThread::VoxelSendThread() { +VoxelSendThread::VoxelSendThread(uint16_t nodeID) : + _nodeID(nodeID) { +} + +VoxelSendThread::~VoxelSendThread() { } bool VoxelSendThread::process() { - - NodeList* nodeList = NodeList::getInstance(); - timeval lastSendTime; + uint64_t lastSendTime = usecTimestampNow(); - gettimeofday(&lastSendTime, NULL); + Node* node = NodeList::getInstance()->nodeWithID(_nodeID); + VoxelNodeData* nodeData = NULL; - for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { - VoxelNodeData* nodeData = (VoxelNodeData*) node->getLinkedData(); + if (node) { + nodeData = (VoxelNodeData*) node->getLinkedData(); + } - // Sometimes the node data has not yet been linked, in which case we can't really do anything - if (nodeData) { - bool viewFrustumChanged = nodeData->updateCurrentViewFrustum(); - if (::debugVoxelSending) { - printf("nodeData->updateCurrentViewFrustum() changed=%s\n", debug::valueOf(viewFrustumChanged)); - } - deepestLevelVoxelDistributor(nodeList, node, nodeData, viewFrustumChanged); + // Sometimes the node data has not yet been linked, in which case we can't really do anything + if (nodeData) { + bool viewFrustumChanged = nodeData->updateCurrentViewFrustum(); + if (::debugVoxelSending) { + printf("nodeData->updateCurrentViewFrustum() changed=%s\n", debug::valueOf(viewFrustumChanged)); } + deepestLevelVoxelDistributor(node, nodeData, viewFrustumChanged); } // dynamically sleep until we need to fire off the next set of voxels - int usecToSleep = VOXEL_SEND_INTERVAL_USECS - (usecTimestampNow() - usecTimestamp(&lastSendTime)); + int usecToSleep = VOXEL_SEND_INTERVAL_USECS - (usecTimestampNow() - lastSendTime); if (usecToSleep > 0) { usleep(usecToSleep); @@ -53,10 +56,8 @@ bool VoxelSendThread::process() { } -void VoxelSendThread::handlePacketSend(NodeList* nodeList, - NodeList::iterator& node, - VoxelNodeData* nodeData, - int& trueBytesSent, int& truePacketsSent) { +void VoxelSendThread::handlePacketSend(Node* node, VoxelNodeData* nodeData, int& trueBytesSent, int& truePacketsSent) { + // If we've got a stats message ready to send, then see if we can piggyback them together if (nodeData->stats.isReadyToSend()) { // Send the stats message to the client @@ -71,16 +72,16 @@ void VoxelSendThread::handlePacketSend(NodeList* nodeList, statsMessageLength += nodeData->getPacketLength(); // actually send it - nodeList->getNodeSocket()->send(node->getActiveSocket(), statsMessage, statsMessageLength); + NodeList::getInstance()->getNodeSocket()->send(node->getActiveSocket(), statsMessage, statsMessageLength); } else { // not enough room in the packet, send two packets - nodeList->getNodeSocket()->send(node->getActiveSocket(), statsMessage, statsMessageLength); - nodeList->getNodeSocket()->send(node->getActiveSocket(), + NodeList::getInstance()->getNodeSocket()->send(node->getActiveSocket(), statsMessage, statsMessageLength); + NodeList::getInstance()->getNodeSocket()->send(node->getActiveSocket(), nodeData->getPacket(), nodeData->getPacketLength()); } } else { // just send the voxel packet - nodeList->getNodeSocket()->send(node->getActiveSocket(), + NodeList::getInstance()->getNodeSocket()->send(node->getActiveSocket(), nodeData->getPacket(), nodeData->getPacketLength()); } // remember to track our stats @@ -91,10 +92,7 @@ void VoxelSendThread::handlePacketSend(NodeList* nodeList, } /// Version of voxel distributor that sends the deepest LOD level at once -void VoxelSendThread::deepestLevelVoxelDistributor(NodeList* nodeList, - NodeList::iterator& node, - VoxelNodeData* nodeData, - bool viewFrustumChanged) { +void VoxelSendThread::deepestLevelVoxelDistributor(Node* node, VoxelNodeData* nodeData, bool viewFrustumChanged) { pthread_mutex_lock(&::treeLock); @@ -120,7 +118,7 @@ void VoxelSendThread::deepestLevelVoxelDistributor(NodeList* nodeList, debug::valueOf(wantColor), debug::valueOf(nodeData->getCurrentPacketIsColor())); } - handlePacketSend(nodeList, node, nodeData, trueBytesSent, truePacketsSent); + handlePacketSend(node, nodeData, trueBytesSent, truePacketsSent); } else { if (::debugVoxelSending) { @@ -247,14 +245,14 @@ void VoxelSendThread::deepestLevelVoxelDistributor(NodeList* nodeList, if (nodeData->getAvailable() >= bytesWritten) { nodeData->writeToPacket(&tempOutputBuffer[0], bytesWritten); } else { - handlePacketSend(nodeList, node, nodeData, trueBytesSent, truePacketsSent); + handlePacketSend(node, nodeData, trueBytesSent, truePacketsSent); packetsSentThisInterval++; nodeData->resetVoxelPacket(); nodeData->writeToPacket(&tempOutputBuffer[0], bytesWritten); } } else { if (nodeData->isPacketWaiting()) { - handlePacketSend(nodeList, node, nodeData, trueBytesSent, truePacketsSent); + handlePacketSend(node, nodeData, trueBytesSent, truePacketsSent); nodeData->resetVoxelPacket(); } packetsSentThisInterval = PACKETS_PER_CLIENT_PER_INTERVAL; // done for now, no nodes left @@ -270,7 +268,7 @@ void VoxelSendThread::deepestLevelVoxelDistributor(NodeList* nodeList, envPacketLength += environmentData[i].getBroadcastData(tempOutputBuffer + envPacketLength); } - nodeList->getNodeSocket()->send(node->getActiveSocket(), tempOutputBuffer, envPacketLength); + NodeList::getInstance()->getNodeSocket()->send(node->getActiveSocket(), tempOutputBuffer, envPacketLength); trueBytesSent += envPacketLength; truePacketsSent++; } diff --git a/voxel-server/src/VoxelSendThread.h b/voxel-server/src/VoxelSendThread.h index 568659982b..1d5af7b012 100644 --- a/voxel-server/src/VoxelSendThread.h +++ b/voxel-server/src/VoxelSendThread.h @@ -20,19 +20,17 @@ /// Threaded processor for sending voxel packets to a single client class VoxelSendThread : public virtual GenericThread { public: - VoxelSendThread(); + VoxelSendThread(uint16_t nodeID); + ~VoxelSendThread(); protected: /// Implements generic processing behavior for this thread. virtual bool process(); private: + uint16_t _nodeID; - void handlePacketSend(NodeList* nodeList, NodeList::iterator& node, VoxelNodeData* nodeData, - int& trueBytesSent, int& truePacketsSent); - - void deepestLevelVoxelDistributor(NodeList* nodeList, NodeList::iterator& node, VoxelNodeData* nodeData, - bool viewFrustumChanged); - + void handlePacketSend(Node* node, VoxelNodeData* nodeData, int& trueBytesSent, int& truePacketsSent); + void deepestLevelVoxelDistributor(Node* node, VoxelNodeData* nodeData, bool viewFrustumChanged); }; #endif // __voxel_server__VoxelSendThread__ diff --git a/voxel-server/src/main.cpp b/voxel-server/src/main.cpp index 493c52845f..131ac9b7a2 100644 --- a/voxel-server/src/main.cpp +++ b/voxel-server/src/main.cpp @@ -23,6 +23,7 @@ #include #include +#include "NodeWatcher.h" #include "VoxelPersistThread.h" #include "VoxelSendThread.h" #include "VoxelServerPacketProcessor.h" @@ -58,9 +59,8 @@ JurisdictionMap* jurisdiction = NULL; JurisdictionSender* jurisdictionSender = NULL; VoxelServerPacketProcessor* voxelServerPacketProcessor = NULL; VoxelPersistThread* voxelPersistThread = NULL; -VoxelSendThread* voxelSendThread = NULL; pthread_mutex_t treeLock; - +NodeWatcher nodeWatcher; // used to cleanup AGENT data when agents are killed void attachVoxelNodeDataToNode(Node* newNode) { if (newNode->getLinkedData() == NULL) { @@ -132,6 +132,9 @@ int main(int argc, const char * argv[]) { NodeList* nodeList = NodeList::createInstance(NODE_TYPE_VOXEL_SERVER, listenPort); setvbuf(stdout, NULL, _IOLBF, 0); + + // tell our NodeList about our desire to get notifications + nodeList->addHook(&nodeWatcher); // Handle Local Domain testing with the --local command line const char* local = "--local"; @@ -245,12 +248,6 @@ int main(int argc, const char * argv[]) { environmentData[2].setAtmosphereOuterRadius(0.1875f * TREE_SCALE * 1.05f); environmentData[2].setScatteringWavelengths(glm::vec3(0.475f, 0.570f, 0.650f)); // swaps red and blue - // Create voxel sending thread... - ::voxelSendThread = new VoxelSendThread(); - if (::voxelSendThread) { - ::voxelSendThread->initialize(true); - } - sockaddr senderAddress; unsigned char *packetData = new unsigned char[MAX_PACKET_SIZE]; @@ -331,11 +328,9 @@ int main(int argc, const char * argv[]) { delete ::voxelPersistThread; } - if (::voxelSendThread) { - ::voxelSendThread->terminate(); - delete ::voxelSendThread; - } - + // tell our NodeList we're done with notifications + nodeList->removeHook(&nodeWatcher); + pthread_mutex_destroy(&::treeLock); return 0;