moved each client to its own sending thread

This commit is contained in:
ZappoMan 2013-08-22 17:09:25 -07:00
parent 07a2bedfb0
commit 0693d0cbbd
7 changed files with 110 additions and 52 deletions

View file

@ -0,0 +1,26 @@
//
// NodeWatcher.h
// voxel-server
//
// Created by Brad Hefta-Gaub on 8/21/13
// Copyright (c) 2013 High Fidelity, Inc. All rights reserved.
//
// Threaded or non-threaded object for sending voxels to a client
//
#include <NodeList.h>
#include "NodeWatcher.h"
#include "VoxelNodeData.h"
void NodeWatcher::nodeAdded(Node* node) {
// do nothing
}
void NodeWatcher::nodeKilled(Node* node) {
// Use this to cleanup our node
if (node->getType() == NODE_TYPE_AGENT) {
VoxelNodeData* nodeData = (VoxelNodeData*)node->getLinkedData();
node->setLinkedData(NULL);
delete nodeData;
}
};

View file

@ -0,0 +1,23 @@
//
// NodeWatcher.h
// voxel-server
//
// Created by Brad Hefta-Gaub on 8/21/13
// Copyright (c) 2013 High Fidelity, Inc. All rights reserved.
//
// Threaded or non-threaded object for sending voxels to a client
//
#ifndef __voxel_server__NodeWatcher__
#define __voxel_server__NodeWatcher__
#include <NodeList.h>
/// Voxel server's node watcher, which watches for nodes being killed and cleans up their data and threads
class NodeWatcher : public virtual NodeListHook {
public:
virtual void nodeAdded(Node* node);
virtual void nodeKilled(Node* node);
};
#endif // __voxel_server__NodeWatcher__

View file

@ -11,6 +11,7 @@
#include "VoxelNodeData.h" #include "VoxelNodeData.h"
#include <cstring> #include <cstring>
#include <cstdio> #include <cstdio>
#include "VoxelSendThread.h"
VoxelNodeData::VoxelNodeData(Node* owningNode) : VoxelNodeData::VoxelNodeData(Node* owningNode) :
AvatarData(owningNode), AvatarData(owningNode),
@ -21,11 +22,19 @@ VoxelNodeData::VoxelNodeData(Node* owningNode) :
_lastTimeBagEmpty(0), _lastTimeBagEmpty(0),
_viewFrustumChanging(false), _viewFrustumChanging(false),
_viewFrustumJustStoppedChanging(true), _viewFrustumJustStoppedChanging(true),
_currentPacketIsColor(true) _currentPacketIsColor(true),
_voxelSendThread(NULL)
{ {
_voxelPacket = new unsigned char[MAX_VOXEL_PACKET_SIZE]; _voxelPacket = new unsigned char[MAX_VOXEL_PACKET_SIZE];
_voxelPacketAt = _voxelPacket; _voxelPacketAt = _voxelPacket;
resetVoxelPacket(); resetVoxelPacket();
// Create voxel sending thread...
uint16_t nodeID = getOwningNode()->getNodeID();
_voxelSendThread = new VoxelSendThread(nodeID);
if (_voxelSendThread) {
_voxelSendThread->initialize(true);
}
} }
@ -49,6 +58,11 @@ void VoxelNodeData::writeToPacket(unsigned char* buffer, int bytes) {
VoxelNodeData::~VoxelNodeData() { VoxelNodeData::~VoxelNodeData() {
delete[] _voxelPacket; delete[] _voxelPacket;
if (_voxelSendThread) {
_voxelSendThread->terminate();
delete _voxelSendThread;
}
} }
bool VoxelNodeData::updateCurrentViewFrustum() { bool VoxelNodeData::updateCurrentViewFrustum() {

View file

@ -18,6 +18,8 @@
#include <VoxelNodeBag.h> #include <VoxelNodeBag.h>
#include <VoxelSceneStats.h> #include <VoxelSceneStats.h>
class VoxelSendThread; // forward declare
class VoxelNodeData : public AvatarData { class VoxelNodeData : public AvatarData {
public: public:
VoxelNodeData(Node* owningNode); VoxelNodeData(Node* owningNode);
@ -80,6 +82,8 @@ private:
bool _viewFrustumChanging; bool _viewFrustumChanging;
bool _viewFrustumJustStoppedChanging; bool _viewFrustumJustStoppedChanging;
bool _currentPacketIsColor; bool _currentPacketIsColor;
VoxelSendThread* _voxelSendThread;
}; };
#endif /* defined(__hifi__VoxelNodeData__) */ #endif /* defined(__hifi__VoxelNodeData__) */

View file

@ -15,31 +15,34 @@
#include "VoxelSendThread.h" #include "VoxelSendThread.h"
#include "VoxelServer.h" #include "VoxelServer.h"
VoxelSendThread::VoxelSendThread() { VoxelSendThread::VoxelSendThread(uint16_t nodeID) :
_nodeID(nodeID) {
}
VoxelSendThread::~VoxelSendThread() {
} }
bool VoxelSendThread::process() { bool VoxelSendThread::process() {
uint64_t lastSendTime = usecTimestampNow();
NodeList* nodeList = NodeList::getInstance();
timeval lastSendTime;
gettimeofday(&lastSendTime, NULL); Node* node = NodeList::getInstance()->nodeWithID(_nodeID);
VoxelNodeData* nodeData = NULL;
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { if (node) {
VoxelNodeData* nodeData = (VoxelNodeData*) node->getLinkedData(); nodeData = (VoxelNodeData*) node->getLinkedData();
}
// Sometimes the node data has not yet been linked, in which case we can't really do anything // Sometimes the node data has not yet been linked, in which case we can't really do anything
if (nodeData) { if (nodeData) {
bool viewFrustumChanged = nodeData->updateCurrentViewFrustum(); bool viewFrustumChanged = nodeData->updateCurrentViewFrustum();
if (::debugVoxelSending) { if (::debugVoxelSending) {
printf("nodeData->updateCurrentViewFrustum() changed=%s\n", debug::valueOf(viewFrustumChanged)); printf("nodeData->updateCurrentViewFrustum() changed=%s\n", debug::valueOf(viewFrustumChanged));
}
deepestLevelVoxelDistributor(nodeList, node, nodeData, viewFrustumChanged);
} }
deepestLevelVoxelDistributor(node, nodeData, viewFrustumChanged);
} }
// dynamically sleep until we need to fire off the next set of voxels // dynamically sleep until we need to fire off the next set of voxels
int usecToSleep = VOXEL_SEND_INTERVAL_USECS - (usecTimestampNow() - usecTimestamp(&lastSendTime)); int usecToSleep = VOXEL_SEND_INTERVAL_USECS - (usecTimestampNow() - lastSendTime);
if (usecToSleep > 0) { if (usecToSleep > 0) {
usleep(usecToSleep); usleep(usecToSleep);
@ -53,10 +56,8 @@ bool VoxelSendThread::process() {
} }
void VoxelSendThread::handlePacketSend(NodeList* nodeList, void VoxelSendThread::handlePacketSend(Node* node, VoxelNodeData* nodeData, int& trueBytesSent, int& truePacketsSent) {
NodeList::iterator& node,
VoxelNodeData* nodeData,
int& trueBytesSent, int& truePacketsSent) {
// If we've got a stats message ready to send, then see if we can piggyback them together // If we've got a stats message ready to send, then see if we can piggyback them together
if (nodeData->stats.isReadyToSend()) { if (nodeData->stats.isReadyToSend()) {
// Send the stats message to the client // Send the stats message to the client
@ -71,16 +72,16 @@ void VoxelSendThread::handlePacketSend(NodeList* nodeList,
statsMessageLength += nodeData->getPacketLength(); statsMessageLength += nodeData->getPacketLength();
// actually send it // actually send it
nodeList->getNodeSocket()->send(node->getActiveSocket(), statsMessage, statsMessageLength); NodeList::getInstance()->getNodeSocket()->send(node->getActiveSocket(), statsMessage, statsMessageLength);
} else { } else {
// not enough room in the packet, send two packets // not enough room in the packet, send two packets
nodeList->getNodeSocket()->send(node->getActiveSocket(), statsMessage, statsMessageLength); NodeList::getInstance()->getNodeSocket()->send(node->getActiveSocket(), statsMessage, statsMessageLength);
nodeList->getNodeSocket()->send(node->getActiveSocket(), NodeList::getInstance()->getNodeSocket()->send(node->getActiveSocket(),
nodeData->getPacket(), nodeData->getPacketLength()); nodeData->getPacket(), nodeData->getPacketLength());
} }
} else { } else {
// just send the voxel packet // just send the voxel packet
nodeList->getNodeSocket()->send(node->getActiveSocket(), NodeList::getInstance()->getNodeSocket()->send(node->getActiveSocket(),
nodeData->getPacket(), nodeData->getPacketLength()); nodeData->getPacket(), nodeData->getPacketLength());
} }
// remember to track our stats // remember to track our stats
@ -91,10 +92,7 @@ void VoxelSendThread::handlePacketSend(NodeList* nodeList,
} }
/// Version of voxel distributor that sends the deepest LOD level at once /// Version of voxel distributor that sends the deepest LOD level at once
void VoxelSendThread::deepestLevelVoxelDistributor(NodeList* nodeList, void VoxelSendThread::deepestLevelVoxelDistributor(Node* node, VoxelNodeData* nodeData, bool viewFrustumChanged) {
NodeList::iterator& node,
VoxelNodeData* nodeData,
bool viewFrustumChanged) {
pthread_mutex_lock(&::treeLock); pthread_mutex_lock(&::treeLock);
@ -120,7 +118,7 @@ void VoxelSendThread::deepestLevelVoxelDistributor(NodeList* nodeList,
debug::valueOf(wantColor), debug::valueOf(nodeData->getCurrentPacketIsColor())); debug::valueOf(wantColor), debug::valueOf(nodeData->getCurrentPacketIsColor()));
} }
handlePacketSend(nodeList, node, nodeData, trueBytesSent, truePacketsSent); handlePacketSend(node, nodeData, trueBytesSent, truePacketsSent);
} else { } else {
if (::debugVoxelSending) { if (::debugVoxelSending) {
@ -247,14 +245,14 @@ void VoxelSendThread::deepestLevelVoxelDistributor(NodeList* nodeList,
if (nodeData->getAvailable() >= bytesWritten) { if (nodeData->getAvailable() >= bytesWritten) {
nodeData->writeToPacket(&tempOutputBuffer[0], bytesWritten); nodeData->writeToPacket(&tempOutputBuffer[0], bytesWritten);
} else { } else {
handlePacketSend(nodeList, node, nodeData, trueBytesSent, truePacketsSent); handlePacketSend(node, nodeData, trueBytesSent, truePacketsSent);
packetsSentThisInterval++; packetsSentThisInterval++;
nodeData->resetVoxelPacket(); nodeData->resetVoxelPacket();
nodeData->writeToPacket(&tempOutputBuffer[0], bytesWritten); nodeData->writeToPacket(&tempOutputBuffer[0], bytesWritten);
} }
} else { } else {
if (nodeData->isPacketWaiting()) { if (nodeData->isPacketWaiting()) {
handlePacketSend(nodeList, node, nodeData, trueBytesSent, truePacketsSent); handlePacketSend(node, nodeData, trueBytesSent, truePacketsSent);
nodeData->resetVoxelPacket(); nodeData->resetVoxelPacket();
} }
packetsSentThisInterval = PACKETS_PER_CLIENT_PER_INTERVAL; // done for now, no nodes left packetsSentThisInterval = PACKETS_PER_CLIENT_PER_INTERVAL; // done for now, no nodes left
@ -270,7 +268,7 @@ void VoxelSendThread::deepestLevelVoxelDistributor(NodeList* nodeList,
envPacketLength += environmentData[i].getBroadcastData(tempOutputBuffer + envPacketLength); envPacketLength += environmentData[i].getBroadcastData(tempOutputBuffer + envPacketLength);
} }
nodeList->getNodeSocket()->send(node->getActiveSocket(), tempOutputBuffer, envPacketLength); NodeList::getInstance()->getNodeSocket()->send(node->getActiveSocket(), tempOutputBuffer, envPacketLength);
trueBytesSent += envPacketLength; trueBytesSent += envPacketLength;
truePacketsSent++; truePacketsSent++;
} }

View file

@ -20,19 +20,17 @@
/// Threaded processor for sending voxel packets to a single client /// Threaded processor for sending voxel packets to a single client
class VoxelSendThread : public virtual GenericThread { class VoxelSendThread : public virtual GenericThread {
public: public:
VoxelSendThread(); VoxelSendThread(uint16_t nodeID);
~VoxelSendThread();
protected: protected:
/// Implements generic processing behavior for this thread. /// Implements generic processing behavior for this thread.
virtual bool process(); virtual bool process();
private: private:
uint16_t _nodeID;
void handlePacketSend(NodeList* nodeList, NodeList::iterator& node, VoxelNodeData* nodeData, void handlePacketSend(Node* node, VoxelNodeData* nodeData, int& trueBytesSent, int& truePacketsSent);
int& trueBytesSent, int& truePacketsSent); void deepestLevelVoxelDistributor(Node* node, VoxelNodeData* nodeData, bool viewFrustumChanged);
void deepestLevelVoxelDistributor(NodeList* nodeList, NodeList::iterator& node, VoxelNodeData* nodeData,
bool viewFrustumChanged);
}; };
#endif // __voxel_server__VoxelSendThread__ #endif // __voxel_server__VoxelSendThread__

View file

@ -23,6 +23,7 @@
#include <PerfStat.h> #include <PerfStat.h>
#include <JurisdictionSender.h> #include <JurisdictionSender.h>
#include "NodeWatcher.h"
#include "VoxelPersistThread.h" #include "VoxelPersistThread.h"
#include "VoxelSendThread.h" #include "VoxelSendThread.h"
#include "VoxelServerPacketProcessor.h" #include "VoxelServerPacketProcessor.h"
@ -58,9 +59,8 @@ JurisdictionMap* jurisdiction = NULL;
JurisdictionSender* jurisdictionSender = NULL; JurisdictionSender* jurisdictionSender = NULL;
VoxelServerPacketProcessor* voxelServerPacketProcessor = NULL; VoxelServerPacketProcessor* voxelServerPacketProcessor = NULL;
VoxelPersistThread* voxelPersistThread = NULL; VoxelPersistThread* voxelPersistThread = NULL;
VoxelSendThread* voxelSendThread = NULL;
pthread_mutex_t treeLock; pthread_mutex_t treeLock;
NodeWatcher nodeWatcher; // used to cleanup AGENT data when agents are killed
void attachVoxelNodeDataToNode(Node* newNode) { void attachVoxelNodeDataToNode(Node* newNode) {
if (newNode->getLinkedData() == NULL) { if (newNode->getLinkedData() == NULL) {
@ -132,6 +132,9 @@ int main(int argc, const char * argv[]) {
NodeList* nodeList = NodeList::createInstance(NODE_TYPE_VOXEL_SERVER, listenPort); NodeList* nodeList = NodeList::createInstance(NODE_TYPE_VOXEL_SERVER, listenPort);
setvbuf(stdout, NULL, _IOLBF, 0); setvbuf(stdout, NULL, _IOLBF, 0);
// tell our NodeList about our desire to get notifications
nodeList->addHook(&nodeWatcher);
// Handle Local Domain testing with the --local command line // Handle Local Domain testing with the --local command line
const char* local = "--local"; const char* local = "--local";
@ -245,12 +248,6 @@ int main(int argc, const char * argv[]) {
environmentData[2].setAtmosphereOuterRadius(0.1875f * TREE_SCALE * 1.05f); environmentData[2].setAtmosphereOuterRadius(0.1875f * TREE_SCALE * 1.05f);
environmentData[2].setScatteringWavelengths(glm::vec3(0.475f, 0.570f, 0.650f)); // swaps red and blue environmentData[2].setScatteringWavelengths(glm::vec3(0.475f, 0.570f, 0.650f)); // swaps red and blue
// Create voxel sending thread...
::voxelSendThread = new VoxelSendThread();
if (::voxelSendThread) {
::voxelSendThread->initialize(true);
}
sockaddr senderAddress; sockaddr senderAddress;
unsigned char *packetData = new unsigned char[MAX_PACKET_SIZE]; unsigned char *packetData = new unsigned char[MAX_PACKET_SIZE];
@ -331,11 +328,9 @@ int main(int argc, const char * argv[]) {
delete ::voxelPersistThread; delete ::voxelPersistThread;
} }
if (::voxelSendThread) { // tell our NodeList we're done with notifications
::voxelSendThread->terminate(); nodeList->removeHook(&nodeWatcher);
delete ::voxelSendThread;
}
pthread_mutex_destroy(&::treeLock); pthread_mutex_destroy(&::treeLock);
return 0; return 0;