From e64664c0d18dce2215e6818afc6ad6446f7bcc44 Mon Sep 17 00:00:00 2001 From: ZappoMan Date: Mon, 19 Aug 2013 11:05:29 -0700 Subject: [PATCH] first cut at JurisdictionSender and related changes --- animation-server/src/main.cpp | 6 +++ domain-server/src/main.cpp | 14 +++++- libraries/shared/src/GenericThread.h | 3 +- libraries/shared/src/NodeList.cpp | 1 + libraries/shared/src/PacketSender.cpp | 24 +++++++--- libraries/shared/src/PacketSender.h | 17 +++++-- libraries/voxels/src/JurisdictionMap.cpp | 12 +++-- voxel-server/src/main.cpp | 58 +++++++++++++++++++++--- 8 files changed, 113 insertions(+), 22 deletions(-) diff --git a/animation-server/src/main.cpp b/animation-server/src/main.cpp index b20ebad37f..1031a71479 100644 --- a/animation-server/src/main.cpp +++ b/animation-server/src/main.cpp @@ -735,21 +735,27 @@ int main(int argc, const char * argv[]) // Handle Local Domain testing with the --local command line const char* NO_BILLBOARD = "--NoBillboard"; ::includeBillboard = !cmdOptionExists(argc, argv, NO_BILLBOARD); + printf("includeBillboard=%s\n",debug::valueOf(::includeBillboard)); const char* NO_BORDER_TRACER = "--NoBorderTracer"; ::includeBorderTracer = !cmdOptionExists(argc, argv, NO_BORDER_TRACER); + printf("includeBorderTracer=%s\n",debug::valueOf(::includeBorderTracer)); const char* NO_MOVING_BUG = "--NoMovingBug"; ::includeMovingBug = !cmdOptionExists(argc, argv, NO_MOVING_BUG); + printf("includeMovingBug=%s\n",debug::valueOf(::includeMovingBug)); const char* INCLUDE_BLINKING_VOXEL = "--includeBlinkingVoxel"; ::includeBlinkingVoxel = cmdOptionExists(argc, argv, INCLUDE_BLINKING_VOXEL); + printf("includeBlinkingVoxel=%s\n",debug::valueOf(::includeBlinkingVoxel)); const char* NO_DANCE_FLOOR = "--NoDanceFloor"; ::includeDanceFloor = !cmdOptionExists(argc, argv, NO_DANCE_FLOOR); + printf("includeDanceFloor=%s\n",debug::valueOf(::includeDanceFloor)); const char* BUILD_STREET = "--BuildStreet"; ::buildStreet = cmdOptionExists(argc, argv, BUILD_STREET); + printf("buildStreet=%s\n",debug::valueOf(::buildStreet)); // Handle Local Domain testing with the --local command line const char* showPPS = "--showPPS"; diff --git a/domain-server/src/main.cpp b/domain-server/src/main.cpp index 13865cbacc..054b877488 100644 --- a/domain-server/src/main.cpp +++ b/domain-server/src/main.cpp @@ -127,6 +127,14 @@ int main(int argc, const char * argv[]) unsigned char* nodeTypesOfInterest = packetData + numBytesSenderHeader + sizeof(NODE_TYPE) + numBytesSocket + sizeof(unsigned char); int numInterestTypes = *(nodeTypesOfInterest - 1); + +/* +printf("line 131 newNode=%d type=%c numInterestTypes=%d\n", newNode->getNodeID(), newNode->getType(), numInterestTypes); +for (int t = 0; t < numInterestTypes; t++) { + printf("line 133 t=%d type=%c\n", t, nodeTypesOfInterest[t]); + +} +*/ if (numInterestTypes > 0) { // if the node has sent no types of interest, assume they want nothing but their own ID back @@ -136,15 +144,19 @@ int main(int argc, const char * argv[]) // this is not the node themselves // and this is an node of a type in the passed node types of interest // or the node did not pass us any specific types they are interested in - + +//printf("line 140 node=%d type=%c\n", node->getNodeID(), node->getType()); if (memchr(SOLO_NODE_TYPES, node->getType(), sizeof(SOLO_NODE_TYPES)) == NULL) { // this is an node of which there can be multiple, just add them to the packet // don't send avatar nodes to other avatars, that will come from avatar mixer +//printf("line 144 node=%d type=%c\n", node->getNodeID(), node->getType()); if (nodeType != NODE_TYPE_AGENT || node->getType() != NODE_TYPE_AGENT) { +//printf("line 146 node=%d type=%c\n", node->getNodeID(), node->getType()); currentBufferPos = addNodeToBroadcastPacket(currentBufferPos, &(*node)); } } else { +//printf("line 151 node=%d type=%c\n", node->getNodeID(), node->getType()); // solo node, we need to only send newest if (newestSoloNodes[node->getType()] == NULL || newestSoloNodes[node->getType()]->getWakeMicrostamp() < node->getWakeMicrostamp()) { diff --git a/libraries/shared/src/GenericThread.h b/libraries/shared/src/GenericThread.h index 2d4c90a469..e2dea19ea8 100644 --- a/libraries/shared/src/GenericThread.h +++ b/libraries/shared/src/GenericThread.h @@ -30,10 +30,11 @@ public: /// If you're running in non-threaded mode, you must call this regularly void* threadRoutine(); -protected: /// Override this function to do whatever your class actually does, return false to exit thread early. virtual bool process() = 0; +protected: + /// Locks all the resources of the thread. void lock() { pthread_mutex_lock(&_mutex); } diff --git a/libraries/shared/src/NodeList.cpp b/libraries/shared/src/NodeList.cpp index 23b39df205..0aecf1e52d 100644 --- a/libraries/shared/src/NodeList.cpp +++ b/libraries/shared/src/NodeList.cpp @@ -411,6 +411,7 @@ Node* NodeList::addOrUpdateNode(sockaddr* publicSocket, sockaddr* localSocket, c } else { if (node->getType() == NODE_TYPE_AUDIO_MIXER || + //node->getType() == NODE_TYPE_ANIMATION_SERVER || node->getType() == NODE_TYPE_VOXEL_SERVER) { // until the Audio class also uses our nodeList, we need to update // the lastRecvTimeUsecs for the audio mixer so it doesn't get killed and re-added continously diff --git a/libraries/shared/src/PacketSender.cpp b/libraries/shared/src/PacketSender.cpp index ecbe15359e..eebe0acb24 100644 --- a/libraries/shared/src/PacketSender.cpp +++ b/libraries/shared/src/PacketSender.cpp @@ -10,14 +10,19 @@ #include -const uint64_t SEND_INTERVAL_USECS = 1000 * 5; // no more than 200pps... should be settable - #include "NodeList.h" #include "PacketSender.h" #include "SharedUtil.h" -PacketSender::PacketSender(PacketSenderNotify* notify) : _notify(notify) { - _lastSendTime = usecTimestampNow(); +const int PacketSender::DEFAULT_PACKETS_PER_SECOND = 200; +const int PacketSender::MINIMUM_PACKETS_PER_SECOND = 1; + + +PacketSender::PacketSender(PacketSenderNotify* notify, int packetsPerSecond) : + _packetsPerSecond(packetsPerSecond), + _lastSendTime(usecTimestampNow()), + _notify(notify) +{ } @@ -29,9 +34,14 @@ void PacketSender::queuePacket(sockaddr& address, unsigned char* packetData, ssi } bool PacketSender::process() { +//printf("PacketSender::process() packets pending=%ld _packetsPerSecond=%d\n",_packets.size(),_packetsPerSecond); + + + uint64_t USECS_PER_SECOND = 1000 * 1000; + uint64_t SEND_INTERVAL_USECS = (_packetsPerSecond == 0) ? USECS_PER_SECOND : (USECS_PER_SECOND / _packetsPerSecond); + if (_packets.size() == 0) { - const uint64_t SEND_THREAD_SLEEP_INTERVAL = (1000 * 1000)/60; // check at 60fps - usleep(SEND_THREAD_SLEEP_INTERVAL); + usleep(SEND_INTERVAL_USECS); } while (_packets.size() > 0) { NetworkPacket& packet = _packets.front(); @@ -39,7 +49,9 @@ bool PacketSender::process() { // send the packet through the NodeList... UDPSocket* nodeSocket = NodeList::getInstance()->getNodeSocket(); +//printf("sending a packet... length=%lu\n",packet.getLength()); nodeSocket->send(&packet.getAddress(), packet.getData(), packet.getLength()); + if (_notify) { _notify->packetSentNotification(packet.getLength()); } diff --git a/libraries/shared/src/PacketSender.h b/libraries/shared/src/PacketSender.h index bd2ab99407..7b2edacc44 100644 --- a/libraries/shared/src/PacketSender.h +++ b/libraries/shared/src/PacketSender.h @@ -24,8 +24,10 @@ public: /// Generalized threaded processor for queueing and sending of outbound packets. class PacketSender : public GenericThread { public: + static const int DEFAULT_PACKETS_PER_SECOND; + static const int MINIMUM_PACKETS_PER_SECOND; - PacketSender(PacketSenderNotify* notify = NULL); + PacketSender(PacketSenderNotify* notify = NULL, int packetsPerSecond = DEFAULT_PACKETS_PER_SECOND); /// Add packet to outbound queue. /// \param sockaddr& address the destination address @@ -34,14 +36,21 @@ public: /// \thread any thread, typically the application thread void queuePacket(sockaddr& address, unsigned char* packetData, ssize_t packetLength); -private: + void setPacketsPerSecond(int packetsPerSecond) { _packetsPerSecond = std::min(MINIMUM_PACKETS_PER_SECOND, packetsPerSecond); } + int getPacketsPerSecond() const { return _packetsPerSecond; } + + void setPacketSenderNotify(PacketSenderNotify* notify) { _notify = notify; } + PacketSenderNotify* getPacketSenderNotify() const { return _notify; } + virtual bool process(); +protected: + int _packetsPerSecond; + +private: std::vector _packets; uint64_t _lastSendTime; - PacketSenderNotify* _notify; - }; #endif // __shared__PacketSender__ diff --git a/libraries/voxels/src/JurisdictionMap.cpp b/libraries/voxels/src/JurisdictionMap.cpp index 5040f5a857..1f6ce09da5 100644 --- a/libraries/voxels/src/JurisdictionMap.cpp +++ b/libraries/voxels/src/JurisdictionMap.cpp @@ -217,13 +217,15 @@ int JurisdictionMap::packIntoMessage(unsigned char* destinationBuffer, int avail // if and only if there's a root jurisdiction, also include the end nodes int endNodeCount = _endNodes.size(); - memcpy(destinationBuffer, &endNodeCount, sizeof(endNodeCount)); destinationBuffer += sizeof(endNodeCount); for (int i=0; i < endNodeCount; i++) { unsigned char* endNodeCode = _endNodes[i]; - int bytes = bytesRequiredForCodeLength(numberOfThreeBitSectionsInCode(endNodeCode)); + int bytes = 0; + if (endNodeCode) { + bytes = bytesRequiredForCodeLength(numberOfThreeBitSectionsInCode(endNodeCode)); + } memcpy(destinationBuffer, &bytes, sizeof(bytes)); destinationBuffer += sizeof(bytes); memcpy(destinationBuffer, endNodeCode, bytes); @@ -266,7 +268,11 @@ int JurisdictionMap::unpackFromMessage(unsigned char* sourceBuffer, int availabl unsigned char* endNodeCode = new unsigned char[bytes]; memcpy(endNodeCode, sourceBuffer, bytes); sourceBuffer += bytes; - _endNodes.push_back(endNodeCode); + + // if the endNodeCode was 0 length then don't add it + if (bytes > 0) { + _endNodes.push_back(endNodeCode); + } } } diff --git a/voxel-server/src/main.cpp b/voxel-server/src/main.cpp index c7e266e430..2c15a98fa1 100644 --- a/voxel-server/src/main.cpp +++ b/voxel-server/src/main.cpp @@ -21,6 +21,8 @@ #include #include +#include + #ifdef _WIN32 #include "Syssocket.h" #include "Systime.h" @@ -73,6 +75,7 @@ EnvironmentData environmentData[3]; int receivedPacketCount = 0; JurisdictionMap* jurisdiction = NULL; +JurisdictionSender* jurisdictionSender = NULL; void randomlyFillVoxelTree(int levelsToGo, VoxelNode *currentRootNode) { // randomly generate children for this node @@ -400,7 +403,7 @@ void persistVoxelsWhenDirty() { } } -void *distributeVoxelsToListeners(void *args) { +void* distributeVoxelsToListeners(void* args) { NodeList* nodeList = NodeList::getInstance(); timeval lastSendTime; @@ -408,7 +411,6 @@ void *distributeVoxelsToListeners(void *args) { while (true) { gettimeofday(&lastSendTime, NULL); - // enumerate the nodes to send 3 packets to each for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { VoxelNodeData* nodeData = (VoxelNodeData*) node->getLinkedData(); @@ -482,11 +484,10 @@ int main(int argc, const char * argv[]) { } if (jurisdictionRoot || jurisdictionEndNodes) { - jurisdiction = new JurisdictionMap(jurisdictionRoot, jurisdictionEndNodes); + ::jurisdiction = new JurisdictionMap(jurisdictionRoot, jurisdictionEndNodes); } } - // should we send environments? Default is yes, but this command line suppresses sending const char* DUMP_VOXELS_ON_MOVE = "--dumpVoxelsOnMove"; ::dumpVoxelsOnMove = cmdOptionExists(argc, argv, DUMP_VOXELS_ON_MOVE); @@ -509,6 +510,10 @@ int main(int argc, const char * argv[]) { NodeList* nodeList = NodeList::createInstance(NODE_TYPE_VOXEL_SERVER, listenPort); setvbuf(stdout, NULL, _IOLBF, 0); + const int numNodeTypes = 2; + const NODE_TYPE nodeTypes[numNodeTypes] = { NODE_TYPE_AGENT, NODE_TYPE_ANIMATION_SERVER }; + NodeList::getInstance()->setNodeTypesOfInterest(&nodeTypes[0], numNodeTypes); + // Handle Local Domain testing with the --local command line const char* local = "--local"; ::wantLocalDomain = cmdOptionExists(argc, argv,local); @@ -655,8 +660,11 @@ int main(int argc, const char * argv[]) { timeval lastDomainServerCheckIn = {}; + // set up our jurisdiction broadcaster... + ::jurisdictionSender = new JurisdictionSender(::jurisdiction); + jurisdictionSender->initialize(true); + // loop to send to nodes requesting data - while (true) { // send a check in packet to the domain server if DOMAIN_SERVER_CHECK_IN_USECS has elapsed @@ -735,12 +743,30 @@ int main(int argc, const char * argv[]) { voxelData += voxelDataSize; atByte += voxelDataSize; } + + //printf("PACKET_TYPE_SET_VOXEL ...\n"); + + // Make sure our Node and NodeList knows we've heard from this node. + Node* node = NodeList::getInstance()->nodeWithAddress(&nodePublicAddress); + if (node) { + //printf("PACKET_TYPE_SET_VOXEL node->setLastHeardMicrostamp(usecTimestampNow());\n"); + node->setLastHeardMicrostamp(usecTimestampNow()); + } + } else if (packetData[0] == PACKET_TYPE_ERASE_VOXEL) { // Send these bits off to the VoxelTree class to process them pthread_mutex_lock(&::treeLock); serverTree.processRemoveVoxelBitstream((unsigned char*)packetData, receivedBytes); pthread_mutex_unlock(&::treeLock); + + // Make sure our Node and NodeList knows we've heard from this node. + Node* node = NodeList::getInstance()->nodeWithAddress(&nodePublicAddress); + if (node) { + //printf("PACKET_TYPE_ERASE_VOXEL node->setLastHeardMicrostamp(usecTimestampNow());\n"); + node->setLastHeardMicrostamp(usecTimestampNow()); + } + } else if (packetData[0] == PACKET_TYPE_Z_COMMAND) { // the Z command is a special command that allows the sender to send the voxel server high level semantic @@ -774,6 +800,14 @@ int main(int argc, const char * argv[]) { printf("rebroadcasting Z message to connected nodes... nodeList.broadcastToNodes()\n"); nodeList->broadcastToNodes(packetData, receivedBytes, &NODE_TYPE_AGENT, 1); } + + // Make sure our Node and NodeList knows we've heard from this node. + Node* node = NodeList::getInstance()->nodeWithAddress(&nodePublicAddress); + if (node) { + printf("PACKET_TYPE_Z_COMMAND node->setLastHeardMicrostamp(usecTimestampNow());\n"); + node->setLastHeardMicrostamp(usecTimestampNow()); + } + } else if (packetData[0] == PACKET_TYPE_HEAD_DATA) { // If we got a PACKET_TYPE_HEAD_DATA, then we're talking to an NODE_TYPE_AVATAR, and we // need to make sure we have it in our nodeList. @@ -789,6 +823,11 @@ int main(int argc, const char * argv[]) { } else if (packetData[0] == PACKET_TYPE_PING) { // If the packet is a ping, let processNodeData handle it. nodeList->processNodeData(&nodePublicAddress, packetData, receivedBytes); + } else if (packetData[0] == PACKET_TYPE_DOMAIN) { + //printf("PACKET_TYPE_DOMAIN packet\n"); + nodeList->processNodeData(&nodePublicAddress, packetData, receivedBytes); + } else { + printf("unknown packet ignored... packetData[0]=%c\n", packetData[0]); } } } @@ -796,8 +835,13 @@ int main(int argc, const char * argv[]) { pthread_join(sendVoxelThread, NULL); pthread_mutex_destroy(&::treeLock); - if (jurisdiction) { - delete jurisdiction; + if (::jurisdiction) { + delete ::jurisdiction; + } + + if (::jurisdictionSender) { + jurisdictionSender->terminate(); + delete ::jurisdictionSender; } return 0;