diff --git a/animation-server/src/main.cpp b/animation-server/src/main.cpp index b20ebad37f..eb1e3546dd 100644 --- a/animation-server/src/main.cpp +++ b/animation-server/src/main.cpp @@ -17,8 +17,10 @@ #include #include #include +#include #include #include +#include #include #ifdef _WIN32 @@ -49,23 +51,10 @@ bool wantLocalDomain = false; unsigned long packetsSent = 0; unsigned long bytesSent = 0; -static void sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail) { - unsigned char* bufferOut; - int sizeOut; - - if (createVoxelEditMessage(type, 0, 1, &detail, bufferOut, sizeOut)){ - ::packetsSent++; - ::bytesSent += sizeOut; +JurisdictionListener* jurisdictionListener = NULL; +VoxelEditPacketSender* voxelEditPacketSender = NULL; - if (::shouldShowPacketsPerSecond) { - printf("sending packet of size=%d\n",sizeOut); - } - - NodeList::getInstance()->broadcastToNodes(bufferOut, sizeOut, &NODE_TYPE_VOXEL_SERVER, 1); - delete[] bufferOut; - } -} glm::vec3 rotatePoint(glm::vec3 point, float angle) { // First, create the quaternion based on this angle of rotation @@ -141,8 +130,6 @@ const BugPart bugParts[VOXELS_PER_BUG] = { static void renderMovingBug() { VoxelDetail details[VOXELS_PER_BUG]; - unsigned char* bufferOut; - int sizeOut; // Generate voxels for where bug used to be for (int i = 0; i < VOXELS_PER_BUG; i++) { @@ -163,17 +150,7 @@ static void renderMovingBug() { // send the "erase message" first... PACKET_TYPE message = PACKET_TYPE_ERASE_VOXEL; - if (createVoxelEditMessage(message, 0, VOXELS_PER_BUG, (VoxelDetail*)&details, bufferOut, sizeOut)){ - - ::packetsSent++; - ::bytesSent += sizeOut; - - if (::shouldShowPacketsPerSecond) { - printf("sending packet of size=%d\n", sizeOut); - } - NodeList::getInstance()->broadcastToNodes(bufferOut, sizeOut, &NODE_TYPE_VOXEL_SERVER, 1); - delete[] bufferOut; - } + ::voxelEditPacketSender->queueVoxelEditMessages(message, VOXELS_PER_BUG, (VoxelDetail*)&details); // Move the bug... if (moveBugInLine) { @@ -233,17 +210,7 @@ static void renderMovingBug() { // send the "create message" ... message = PACKET_TYPE_SET_VOXEL_DESTRUCTIVE; - if (createVoxelEditMessage(message, 0, VOXELS_PER_BUG, (VoxelDetail*)&details, bufferOut, sizeOut)){ - - ::packetsSent++; - ::bytesSent += sizeOut; - - if (::shouldShowPacketsPerSecond) { - printf("sending packet of size=%d\n", sizeOut); - } - NodeList::getInstance()->broadcastToNodes(bufferOut, sizeOut, &NODE_TYPE_VOXEL_SERVER, 1); - delete[] bufferOut; - } + ::voxelEditPacketSender->queueVoxelEditMessages(message, VOXELS_PER_BUG, (VoxelDetail*)&details); } @@ -279,7 +246,7 @@ static void sendVoxelBlinkMessage() { PACKET_TYPE message = PACKET_TYPE_SET_VOXEL_DESTRUCTIVE; - sendVoxelEditMessage(message, detail); + ::voxelEditPacketSender->sendVoxelEditMessage(message, detail); } bool stringOfLightsInitialized = false; @@ -297,8 +264,6 @@ static void sendBlinkingStringOfLights() { PACKET_TYPE message = PACKET_TYPE_SET_VOXEL_DESTRUCTIVE; // we're a bully! float lightScale = STRING_OF_LIGHTS_SIZE; static VoxelDetail details[LIGHTS_PER_SEGMENT]; - unsigned char* bufferOut; - int sizeOut; // first initialized the string of lights if needed... if (!stringOfLightsInitialized) { @@ -338,19 +303,7 @@ static void sendBlinkingStringOfLights() { details[indexInSegment].blue = offColor[2]; } - // send entire segment at once - if (createVoxelEditMessage(message, 0, LIGHTS_PER_SEGMENT, (VoxelDetail*)&details, bufferOut, sizeOut)){ - - ::packetsSent++; - ::bytesSent += sizeOut; - - if (::shouldShowPacketsPerSecond) { - printf("sending packet of size=%d\n",sizeOut); - } - NodeList::getInstance()->broadcastToNodes(bufferOut, sizeOut, &NODE_TYPE_VOXEL_SERVER, 1); - delete[] bufferOut; - } - + ::voxelEditPacketSender->queueVoxelEditMessages(message, LIGHTS_PER_SEGMENT, (VoxelDetail*)&details); } stringOfLightsInitialized = true; } else { @@ -381,17 +334,7 @@ static void sendBlinkingStringOfLights() { details[1].blue = onColor[2]; // send both changes in same message - if (createVoxelEditMessage(message, 0, 2, (VoxelDetail*)&details, bufferOut, sizeOut)){ - - ::packetsSent++; - ::bytesSent += sizeOut; - - if (::shouldShowPacketsPerSecond) { - printf("sending packet of size=%d\n",sizeOut); - } - NodeList::getInstance()->broadcastToNodes(bufferOut, sizeOut, &NODE_TYPE_VOXEL_SERVER, 1); - delete[] bufferOut; - } + ::voxelEditPacketSender->queueVoxelEditMessages(message, 2, (VoxelDetail*)&details); } } @@ -427,8 +370,6 @@ void sendDanceFloor() { PACKET_TYPE message = PACKET_TYPE_SET_VOXEL_DESTRUCTIVE; // we're a bully! float lightScale = DANCE_FLOOR_LIGHT_SIZE; static VoxelDetail details[DANCE_FLOOR_VOXELS_PER_PACKET]; - unsigned char* bufferOut; - int sizeOut; // first initialized the billboard of lights if needed... if (!::danceFloorInitialized) { @@ -505,16 +446,7 @@ void sendDanceFloor() { } if (item == DANCE_FLOOR_VOXELS_PER_PACKET - 1) { - if (createVoxelEditMessage(message, 0, DANCE_FLOOR_VOXELS_PER_PACKET, - (VoxelDetail*)&details, bufferOut, sizeOut)){ - ::packetsSent++; - ::bytesSent += sizeOut; - if (::shouldShowPacketsPerSecond) { - printf("sending packet of size=%d\n", sizeOut); - } - NodeList::getInstance()->broadcastToNodes(bufferOut, sizeOut, &NODE_TYPE_VOXEL_SERVER, 1); - delete[] bufferOut; - } + ::voxelEditPacketSender->queueVoxelEditMessages(message, DANCE_FLOOR_VOXELS_PER_PACKET, (VoxelDetail*)&details); } } } @@ -554,8 +486,6 @@ static void sendBillboard() { PACKET_TYPE message = PACKET_TYPE_SET_VOXEL_DESTRUCTIVE; // we're a bully! float lightScale = BILLBOARD_LIGHT_SIZE; static VoxelDetail details[VOXELS_PER_PACKET]; - unsigned char* bufferOut; - int sizeOut; // first initialized the billboard of lights if needed... if (!billboardInitialized) { @@ -603,15 +533,7 @@ static void sendBillboard() { } if (item == VOXELS_PER_PACKET - 1) { - if (createVoxelEditMessage(message, 0, VOXELS_PER_PACKET, (VoxelDetail*)&details, bufferOut, sizeOut)){ - ::packetsSent++; - ::bytesSent += sizeOut; - if (::shouldShowPacketsPerSecond) { - printf("sending packet of size=%d\n", sizeOut); - } - NodeList::getInstance()->broadcastToNodes(bufferOut, sizeOut, &NODE_TYPE_VOXEL_SERVER, 1); - delete[] bufferOut; - } + ::voxelEditPacketSender->queueVoxelEditMessages(message, VOXELS_PER_PACKET, (VoxelDetail*)&details); } } } @@ -634,8 +556,6 @@ void doBuildStreet() { PACKET_TYPE message = PACKET_TYPE_SET_VOXEL_DESTRUCTIVE; // we're a bully! static VoxelDetail details[BRICKS_PER_PACKET]; - unsigned char* bufferOut; - int sizeOut; for (int z = 0; z < ROAD_LENGTH; z++) { for (int x = 0; x < ROAD_WIDTH; x++) { @@ -656,15 +576,7 @@ void doBuildStreet() { details[item].blue = randomTone; if (item == BRICKS_PER_PACKET - 1) { - if (createVoxelEditMessage(message, 0, BRICKS_PER_PACKET, (VoxelDetail*)&details, bufferOut, sizeOut)){ - ::packetsSent++; - ::bytesSent += sizeOut; - if (true || ::shouldShowPacketsPerSecond) { - printf("building road sending packet of size=%d\n", sizeOut); - } - NodeList::getInstance()->broadcastToNodes(bufferOut, sizeOut, &NODE_TYPE_VOXEL_SERVER, 1); - delete[] bufferOut; - } + ::voxelEditPacketSender->queueVoxelEditMessages(message, BRICKS_PER_PACKET, (VoxelDetail*)&details); } } } @@ -705,6 +617,10 @@ void* animateVoxels(void* args) { doBuildStreet(); } + if (::voxelEditPacketSender) { + ::voxelEditPacketSender->flushQueue(); + } + uint64_t end = usecTimestampNow(); uint64_t elapsedSeconds = (end - ::start) / 1000000; if (::shouldShowPacketsPerSecond) { @@ -735,21 +651,27 @@ int main(int argc, const char * argv[]) // Handle Local Domain testing with the --local command line const char* NO_BILLBOARD = "--NoBillboard"; ::includeBillboard = !cmdOptionExists(argc, argv, NO_BILLBOARD); + printf("includeBillboard=%s\n", debug::valueOf(::includeBillboard)); const char* NO_BORDER_TRACER = "--NoBorderTracer"; ::includeBorderTracer = !cmdOptionExists(argc, argv, NO_BORDER_TRACER); + printf("includeBorderTracer=%s\n", debug::valueOf(::includeBorderTracer)); const char* NO_MOVING_BUG = "--NoMovingBug"; ::includeMovingBug = !cmdOptionExists(argc, argv, NO_MOVING_BUG); + printf("includeMovingBug=%s\n", debug::valueOf(::includeMovingBug)); const char* INCLUDE_BLINKING_VOXEL = "--includeBlinkingVoxel"; ::includeBlinkingVoxel = cmdOptionExists(argc, argv, INCLUDE_BLINKING_VOXEL); + printf("includeBlinkingVoxel=%s\n", debug::valueOf(::includeBlinkingVoxel)); const char* NO_DANCE_FLOOR = "--NoDanceFloor"; ::includeDanceFloor = !cmdOptionExists(argc, argv, NO_DANCE_FLOOR); + printf("includeDanceFloor=%s\n", debug::valueOf(::includeDanceFloor)); const char* BUILD_STREET = "--BuildStreet"; ::buildStreet = cmdOptionExists(argc, argv, BUILD_STREET); + printf("buildStreet=%s\n", debug::valueOf(::buildStreet)); // Handle Local Domain testing with the --local command line const char* showPPS = "--showPPS"; @@ -771,6 +693,21 @@ int main(int argc, const char * argv[]) nodeList->linkedDataCreateCallback = NULL; // do we need a callback? nodeList->startSilentNodeRemovalThread(); + // Create our JurisdictionListener so we'll know where to send edit packets + ::jurisdictionListener = new JurisdictionListener(); + if (::jurisdictionListener) { + ::jurisdictionListener->initialize(true); + } + + // Create out VoxelEditPacketSender + ::voxelEditPacketSender = new VoxelEditPacketSender; + if (::voxelEditPacketSender) { + ::voxelEditPacketSender->initialize(true); + if (::jurisdictionListener) { + ::voxelEditPacketSender->setVoxelServerJurisdictions(::jurisdictionListener->getJurisdictions()); + } + } + srand((unsigned)time(0)); pthread_t animateVoxelThread; @@ -795,11 +732,27 @@ int main(int argc, const char * argv[]) // Nodes sending messages to us... if (nodeList->getNodeSocket()->receive(&nodePublicAddress, packetData, &receivedBytes) && packetVersionMatch(packetData)) { + + if (packetData[0] == PACKET_TYPE_VOXEL_JURISDICTION) { + if (::jurisdictionListener) { + ::jurisdictionListener->queueReceivedPacket(nodePublicAddress, packetData, receivedBytes); + } + } NodeList::getInstance()->processNodeData(&nodePublicAddress, packetData, receivedBytes); } } pthread_join(animateVoxelThread, NULL); + + if (::jurisdictionListener) { + ::jurisdictionListener->terminate(); + delete ::jurisdictionListener; + } + + if (::voxelEditPacketSender) { + ::voxelEditPacketSender->terminate(); + delete ::voxelEditPacketSender; + } return 0; } diff --git a/domain-server/src/main.cpp b/domain-server/src/main.cpp index 13865cbacc..51eea6b838 100644 --- a/domain-server/src/main.cpp +++ b/domain-server/src/main.cpp @@ -136,7 +136,7 @@ int main(int argc, const char * argv[]) // this is not the node themselves // and this is an node of a type in the passed node types of interest // or the node did not pass us any specific types they are interested in - + if (memchr(SOLO_NODE_TYPES, node->getType(), sizeof(SOLO_NODE_TYPES)) == NULL) { // this is an node of which there can be multiple, just add them to the packet // don't send avatar nodes to other avatars, that will come from avatar mixer diff --git a/interface/src/Application.cpp b/interface/src/Application.cpp index dbb2170150..67fe5caf16 100644 --- a/interface/src/Application.cpp +++ b/interface/src/Application.cpp @@ -123,7 +123,7 @@ Application::Application(int& argc, char** argv, timeval &startup_time) : _audio(&_audioScope, STARTUP_JITTER_SAMPLES), #endif _stopNetworkReceiveThread(false), - _voxelProcessor(this), + _voxelProcessor(), _voxelEditSender(this), _packetCount(0), _packetsPerSecond(0), @@ -229,6 +229,9 @@ Application::Application(int& argc, char** argv, timeval &startup_time) : _glWidget->setMouseTracking(true); // initialization continues in initializeGL when OpenGL context is ready + + // Tell our voxel edit sender about our known jurisdictions + _voxelEditSender.setVoxelServerJurisdictions(&_voxelServerJurisdictions); } Application::~Application() { @@ -1112,6 +1115,7 @@ void Application::setFullscreen(bool fullscreen) { } void Application::setRenderVoxels(bool voxelRender) { + _voxelEditSender.setShouldSend(voxelRender); if (!voxelRender) { doKillLocalVoxels(); } @@ -3154,7 +3158,7 @@ void* Application::networkReceive(void* args) { case PACKET_TYPE_VOXEL_STATS: case PACKET_TYPE_ENVIRONMENT_DATA: { // add this packet to our list of voxel packets and process them on the voxel processing - app->_voxelProcessor.queuePacket(senderAddress, app->_incomingPacket, bytesReceived); + app->_voxelProcessor.queueReceivedPacket(senderAddress, app->_incomingPacket, bytesReceived); break; } case PACKET_TYPE_BULK_AVATAR_DATA: @@ -3184,3 +3188,7 @@ void* Application::networkReceive(void* args) { } return NULL; } + +void Application::packetSentNotification(ssize_t length) { + _bandwidthMeter.outputStream(BandwidthMeter::VOXELS).updateValue(length); +} diff --git a/interface/src/Application.h b/interface/src/Application.h index a4ae01b730..f85e5136f7 100644 --- a/interface/src/Application.h +++ b/interface/src/Application.h @@ -74,7 +74,7 @@ static const float NODE_KILLED_RED = 1.0f; static const float NODE_KILLED_GREEN = 0.0f; static const float NODE_KILLED_BLUE = 0.0f; -class Application : public QApplication, public NodeListHook { +class Application : public QApplication, public NodeListHook, public PacketSenderNotify { Q_OBJECT friend class VoxelPacketProcessor; @@ -105,8 +105,6 @@ public: const glm::vec3 getMouseVoxelWorldCoordinates(const VoxelDetail _mouseVoxel); - void updateParticleSystem(float deltaTime); - QGLWidget* getGLWidget() { return _glWidget; } Avatar* getAvatar() { return &_myAvatar; } Audio* getAudio() { return &_audio; } @@ -134,6 +132,7 @@ public: virtual void nodeAdded(Node* node); virtual void nodeKilled(Node* node); + virtual void packetSentNotification(ssize_t length); public slots: void sendAvatarFaceVideoMessage(int frameCount, const QByteArray& data); @@ -351,7 +350,7 @@ private: VoxelSceneStats _voxelSceneStats; int parseVoxelStats(unsigned char* messageData, ssize_t messageLength, sockaddr senderAddress); - std::map _voxelServerJurisdictions; + NodeToJurisdictionMap _voxelServerJurisdictions; std::vector _voxelFades; }; diff --git a/interface/src/VoxelPacketProcessor.cpp b/interface/src/VoxelPacketProcessor.cpp index 40101b9b77..5d05253e36 100644 --- a/interface/src/VoxelPacketProcessor.cpp +++ b/interface/src/VoxelPacketProcessor.cpp @@ -14,19 +14,17 @@ #include "Menu.h" #include "VoxelPacketProcessor.h" -VoxelPacketProcessor::VoxelPacketProcessor(Application* app) : - _app(app) { -} - void VoxelPacketProcessor::processPacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength) { PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings), "VoxelPacketProcessor::processPacket()"); ssize_t messageLength = packetLength; + Application* app = Application::getInstance(); + // check to see if the UI thread asked us to kill the voxel tree. since we're the only thread allowed to do that - if (_app->_wantToKillLocalVoxels) { - _app->_voxels.killLocalVoxels(); - _app->_wantToKillLocalVoxels = false; + if (app->_wantToKillLocalVoxels) { + app->_voxels.killLocalVoxels(); + app->_wantToKillLocalVoxels = false; } // note: PACKET_TYPE_VOXEL_STATS can have PACKET_TYPE_VOXEL_DATA or PACKET_TYPE_VOXEL_DATA_MONOCHROME @@ -34,7 +32,7 @@ void VoxelPacketProcessor::processPacket(sockaddr& senderAddress, unsigned char* // then process any remaining bytes as if it was another packet if (packetData[0] == PACKET_TYPE_VOXEL_STATS) { - int statsMessageLength = _app->parseVoxelStats(packetData, messageLength, senderAddress); + int statsMessageLength = app->parseVoxelStats(packetData, messageLength, senderAddress); if (messageLength > statsMessageLength) { packetData += statsMessageLength; messageLength -= statsMessageLength; @@ -51,11 +49,11 @@ void VoxelPacketProcessor::processPacket(sockaddr& senderAddress, unsigned char* if (voxelServer && socketMatch(voxelServer->getActiveSocket(), &senderAddress)) { voxelServer->lock(); if (packetData[0] == PACKET_TYPE_ENVIRONMENT_DATA) { - _app->_environment.parseData(&senderAddress, packetData, messageLength); + app->_environment.parseData(&senderAddress, packetData, messageLength); } else { - _app->_voxels.setDataSourceID(voxelServer->getNodeID()); - _app->_voxels.parseData(packetData, messageLength); - _app->_voxels.setDataSourceID(UNKNOWN_NODE_ID); + app->_voxels.setDataSourceID(voxelServer->getNodeID()); + app->_voxels.parseData(packetData, messageLength); + app->_voxels.setDataSourceID(UNKNOWN_NODE_ID); } voxelServer->unlock(); } diff --git a/interface/src/VoxelPacketProcessor.h b/interface/src/VoxelPacketProcessor.h index f55daf5aba..87aae397d5 100644 --- a/interface/src/VoxelPacketProcessor.h +++ b/interface/src/VoxelPacketProcessor.h @@ -13,17 +13,10 @@ #include -class Application; - -/// Handles processing of incoming voxel packets for the interface application. +/// Handles processing of incoming voxel packets for the interface application. As with other ReceivedPacketProcessor classes +/// the user is responsible for reading inbound packets and adding them to the processing queue by calling queueReceivedPacket() class VoxelPacketProcessor : public ReceivedPacketProcessor { -public: - VoxelPacketProcessor(Application* app); - protected: virtual void processPacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength); - -private: - Application* _app; }; #endif // __shared__VoxelPacketProcessor__ diff --git a/interface/src/ui/VoxelStatsDialog.cpp b/interface/src/ui/VoxelStatsDialog.cpp index bfe93ec119..52b32ef7a5 100644 --- a/interface/src/ui/VoxelStatsDialog.cpp +++ b/interface/src/ui/VoxelStatsDialog.cpp @@ -30,9 +30,10 @@ VoxelStatsDialog::VoxelStatsDialog(QWidget* parent, VoxelSceneStats* model) : this->QDialog::setLayout(form); // Setup labels - for (int i = 0; i < VoxelSceneStats::ITEM_COUNT; ++i) { - VoxelSceneStats::ItemInfo& itemInfo = _model->getItemInfo(i); - QLabel* label = _labels[i] = new QLabel(); + for (int i = 0; i < (int)VoxelSceneStats::ITEM_COUNT; i++) { + VoxelSceneStats::Item item = (VoxelSceneStats::Item)(i); + VoxelSceneStats::ItemInfo& itemInfo = _model->getItemInfo(item); + QLabel* label = _labels[item] = new QLabel(); label->setAlignment(Qt::AlignRight); // Set foreground color to 62.5% brightness of the meter (otherwise will be hard to read on the bright background) @@ -56,9 +57,10 @@ void VoxelStatsDialog::paintEvent(QPaintEvent* event) { // Update labels char strBuf[256]; - for (int i = 0; i < VoxelSceneStats::ITEM_COUNT; ++i) { - QLabel* label = _labels[i]; - snprintf(strBuf, sizeof(strBuf), "%s", _model->getItemValue(i)); + for (int i = 0; i < (int)VoxelSceneStats::ITEM_COUNT; i++) { + VoxelSceneStats::Item item = (VoxelSceneStats::Item)(i); + QLabel* label = _labels[item]; + snprintf(strBuf, sizeof(strBuf), "%s", _model->getItemValue(item)); label->setText(strBuf); } diff --git a/libraries/shared/src/GenericThread.h b/libraries/shared/src/GenericThread.h index 2d4c90a469..e363d16178 100644 --- a/libraries/shared/src/GenericThread.h +++ b/libraries/shared/src/GenericThread.h @@ -30,16 +30,19 @@ public: /// If you're running in non-threaded mode, you must call this regularly void* threadRoutine(); -protected: /// Override this function to do whatever your class actually does, return false to exit thread early. virtual bool process() = 0; +protected: + /// Locks all the resources of the thread. void lock() { pthread_mutex_lock(&_mutex); } /// Unlocks all the resources of the thread. void unlock() { pthread_mutex_unlock(&_mutex); } + bool isStillRunning() const { return !_stopThread; } + private: pthread_mutex_t _mutex; diff --git a/libraries/shared/src/PacketHeaders.h b/libraries/shared/src/PacketHeaders.h index 3d49b6b066..2f4f8d2196 100644 --- a/libraries/shared/src/PacketHeaders.h +++ b/libraries/shared/src/PacketHeaders.h @@ -38,6 +38,8 @@ const PACKET_TYPE PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY = 'C'; const PACKET_TYPE PACKET_TYPE_REQUEST_ASSIGNMENT = 'r'; const PACKET_TYPE PACKET_TYPE_SEND_ASSIGNMENT = 's'; const PACKET_TYPE PACKET_TYPE_VOXEL_STATS = '#'; +const PACKET_TYPE PACKET_TYPE_VOXEL_JURISDICTION = 'J'; +const PACKET_TYPE PACKET_TYPE_VOXEL_JURISDICTION_REQUEST = 'j'; typedef char PACKET_VERSION; diff --git a/libraries/shared/src/PacketSender.cpp b/libraries/shared/src/PacketSender.cpp index 4c150454a3..511b01541f 100644 --- a/libraries/shared/src/PacketSender.cpp +++ b/libraries/shared/src/PacketSender.cpp @@ -10,18 +10,23 @@ #include -const uint64_t SEND_INTERVAL_USECS = 1000 * 5; // no more than 200pps... should be settable - #include "NodeList.h" #include "PacketSender.h" #include "SharedUtil.h" -PacketSender::PacketSender() { - _lastSendTime = usecTimestampNow(); +const int PacketSender::DEFAULT_PACKETS_PER_SECOND = 200; +const int PacketSender::MINIMUM_PACKETS_PER_SECOND = 1; + + +PacketSender::PacketSender(PacketSenderNotify* notify, int packetsPerSecond) : + _packetsPerSecond(packetsPerSecond), + _lastSendTime(usecTimestampNow()), + _notify(notify) +{ } -void PacketSender::queuePacket(sockaddr& address, unsigned char* packetData, ssize_t packetLength) { +void PacketSender::queuePacketForSending(sockaddr& address, unsigned char* packetData, ssize_t packetLength) { NetworkPacket packet(address, packetData, packetLength); lock(); _packets.push_back(packet); @@ -29,9 +34,11 @@ void PacketSender::queuePacket(sockaddr& address, unsigned char* packetData, ssi } bool PacketSender::process() { + uint64_t USECS_PER_SECOND = 1000 * 1000; + uint64_t SEND_INTERVAL_USECS = (_packetsPerSecond == 0) ? USECS_PER_SECOND : (USECS_PER_SECOND / _packetsPerSecond); + if (_packets.size() == 0) { - const uint64_t SEND_THREAD_SLEEP_INTERVAL = (1000 * 1000)/60; // check at 60fps - usleep(SEND_THREAD_SLEEP_INTERVAL); + usleep(SEND_INTERVAL_USECS); } while (_packets.size() > 0) { NetworkPacket& packet = _packets.front(); @@ -40,6 +47,10 @@ bool PacketSender::process() { UDPSocket* nodeSocket = NodeList::getInstance()->getNodeSocket(); nodeSocket->send(&packet.getAddress(), packet.getData(), packet.getLength()); + + if (_notify) { + _notify->packetSentNotification(packet.getLength()); + } lock(); _packets.erase(_packets.begin()); @@ -55,5 +66,5 @@ bool PacketSender::process() { } } - return true; // keep running till they terminate us + return isStillRunning(); // keep running till they terminate us } diff --git a/libraries/shared/src/PacketSender.h b/libraries/shared/src/PacketSender.h index 5a1a63695f..3a07444f2b 100644 --- a/libraries/shared/src/PacketSender.h +++ b/libraries/shared/src/PacketSender.h @@ -14,25 +14,46 @@ #include "GenericThread.h" #include "NetworkPacket.h" -/// Generalized threaded processor for queueing and sending of outbound packets. -class PacketSender : public GenericThread { +/// Notification Hook for packets being sent by a PacketSender +class PacketSenderNotify { public: + virtual void packetSentNotification(ssize_t length) = 0; +}; - PacketSender(); + +/// Generalized threaded processor for queueing and sending of outbound packets. +class PacketSender : public virtual GenericThread { +public: + static const int DEFAULT_PACKETS_PER_SECOND; + static const int MINIMUM_PACKETS_PER_SECOND; + + PacketSender(PacketSenderNotify* notify = NULL, int packetsPerSecond = DEFAULT_PACKETS_PER_SECOND); /// Add packet to outbound queue. /// \param sockaddr& address the destination address /// \param packetData pointer to data /// \param ssize_t packetLength size of data /// \thread any thread, typically the application thread - void queuePacket(sockaddr& address, unsigned char* packetData, ssize_t packetLength); + void queuePacketForSending(sockaddr& address, unsigned char* packetData, ssize_t packetLength); -private: + void setPacketsPerSecond(int packetsPerSecond) { _packetsPerSecond = std::min(MINIMUM_PACKETS_PER_SECOND, packetsPerSecond); } + int getPacketsPerSecond() const { return _packetsPerSecond; } + + void setPacketSenderNotify(PacketSenderNotify* notify) { _notify = notify; } + PacketSenderNotify* getPacketSenderNotify() const { return _notify; } + virtual bool process(); +protected: + int _packetsPerSecond; + + bool hasPacketsToSend() const { return _packets.size() > 0; } + int packetsToSendCount() const { return _packets.size(); } + +private: std::vector _packets; uint64_t _lastSendTime; - + PacketSenderNotify* _notify; }; #endif // __shared__PacketSender__ diff --git a/libraries/shared/src/ReceivedPacketProcessor.cpp b/libraries/shared/src/ReceivedPacketProcessor.cpp index 3b6ccf5a98..dce65b586e 100644 --- a/libraries/shared/src/ReceivedPacketProcessor.cpp +++ b/libraries/shared/src/ReceivedPacketProcessor.cpp @@ -8,9 +8,17 @@ // Threaded or non-threaded packet receiver. // +#include "NodeList.h" #include "ReceivedPacketProcessor.h" +#include "SharedUtil.h" + +void ReceivedPacketProcessor::queueReceivedPacket(sockaddr& address, unsigned char* packetData, ssize_t packetLength) { + // Make sure our Node and NodeList knows we've heard from this node. + Node* node = NodeList::getInstance()->nodeWithAddress(&address); + if (node) { + node->setLastHeardMicrostamp(usecTimestampNow()); + } -void ReceivedPacketProcessor::queuePacket(sockaddr& address, unsigned char* packetData, ssize_t packetLength) { NetworkPacket packet(address, packetData, packetLength); lock(); _packets.push_back(packet); @@ -30,5 +38,5 @@ bool ReceivedPacketProcessor::process() { _packets.erase(_packets.begin()); unlock(); } - return true; // keep running till they terminate us + return isStillRunning(); // keep running till they terminate us } diff --git a/libraries/shared/src/ReceivedPacketProcessor.h b/libraries/shared/src/ReceivedPacketProcessor.h index 0ea1aba2c1..f6d49cbb09 100644 --- a/libraries/shared/src/ReceivedPacketProcessor.h +++ b/libraries/shared/src/ReceivedPacketProcessor.h @@ -15,7 +15,7 @@ #include "NetworkPacket.h" /// Generalized threaded processor for handling received inbound packets. -class ReceivedPacketProcessor : public GenericThread { +class ReceivedPacketProcessor : public virtual GenericThread { public: /// Add packet from network receive thread to the processing queue. @@ -23,7 +23,7 @@ public: /// \param packetData pointer to received data /// \param ssize_t packetLength size of received data /// \thread network receive thread - void queuePacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength); + void queueReceivedPacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength); protected: /// Callback for processing of recieved packets. Implement this to process the incoming packets. @@ -35,6 +35,13 @@ protected: /// Implements generic processing behavior for this thread. virtual bool process(); + + /// Are there received packets waiting to be processed + bool hasPacketsToProcess() const { return _packets.size() > 0; } + + /// How many received packets waiting are to be processed + int packetsToProcessCount() const { return _packets.size(); } + private: std::vector _packets; diff --git a/libraries/shared/src/SharedUtil.cpp b/libraries/shared/src/SharedUtil.cpp index cf2374b84e..c34663c077 100644 --- a/libraries/shared/src/SharedUtil.cpp +++ b/libraries/shared/src/SharedUtil.cpp @@ -255,6 +255,38 @@ bool createVoxelEditMessage(unsigned char command, short int sequence, return success; } +/// encodes the voxel details portion of a voxel edit message +bool encodeVoxelEditMessageDetails(unsigned char command, int voxelCount, VoxelDetail* voxelDetails, + unsigned char* bufferOut, int sizeIn, int& sizeOut) { + + bool success = true; // assume the best + unsigned char* copyAt = bufferOut; + sizeOut = 0; + + for (int i = 0; i < voxelCount && success; i++) { + // get the coded voxel + unsigned char* voxelData = pointToVoxel(voxelDetails[i].x,voxelDetails[i].y,voxelDetails[i].z, + voxelDetails[i].s,voxelDetails[i].red,voxelDetails[i].green,voxelDetails[i].blue); + + int lengthOfVoxelData = bytesRequiredForCodeLength(*voxelData)+SIZE_OF_COLOR_DATA; + + // make sure we have room to copy this voxel + if (sizeOut + lengthOfVoxelData > sizeIn) { + success = false; + } else { + // add it to our message + memcpy(copyAt, voxelData, lengthOfVoxelData); + copyAt += lengthOfVoxelData; + sizeOut += lengthOfVoxelData; + } + // cleanup + delete[] voxelData; + } + + return success; +} + + ////////////////////////////////////////////////////////////////////////////////////////// // Function: pointToVoxel() // Description: Given a universal point with location x,y,z this will return the voxel diff --git a/libraries/shared/src/SharedUtil.h b/libraries/shared/src/SharedUtil.h index 49e8ae6f02..32553b967b 100644 --- a/libraries/shared/src/SharedUtil.h +++ b/libraries/shared/src/SharedUtil.h @@ -81,9 +81,15 @@ struct VoxelDetail { }; unsigned char* pointToVoxel(float x, float y, float z, float s, unsigned char r = 0, unsigned char g = 0, unsigned char b = 0); + +// Creates a full Voxel edit message, including command header, sequence, and details bool createVoxelEditMessage(unsigned char command, short int sequence, int voxelCount, VoxelDetail* voxelDetails, unsigned char*& bufferOut, int& sizeOut); +/// encodes the voxel details portion of a voxel edit message +bool encodeVoxelEditMessageDetails(unsigned char command, int voxelCount, VoxelDetail* voxelDetails, + unsigned char* bufferOut, int sizeIn, int& sizeOut); + #ifdef _WIN32 void usleep(int waitTime); #endif diff --git a/libraries/voxels/src/JurisdictionListener.cpp b/libraries/voxels/src/JurisdictionListener.cpp new file mode 100644 index 0000000000..ea8bea3a2d --- /dev/null +++ b/libraries/voxels/src/JurisdictionListener.cpp @@ -0,0 +1,90 @@ +// +// JurisdictionListener.cpp +// shared +// +// Created by Brad Hefta-Gaub on 8/12/13. +// Copyright (c) 2013 High Fidelity, Inc. All rights reserved. +// +// Threaded or non-threaded jurisdiction Sender for the Application +// + +#include + +#include +#include +#include +#include "JurisdictionListener.h" + + +JurisdictionListener::JurisdictionListener(PacketSenderNotify* notify) : + PacketSender(notify, JurisdictionListener::DEFAULT_PACKETS_PER_SECOND) +{ + NodeList* nodeList = NodeList::getInstance(); + nodeList->addHook(this); +} + +JurisdictionListener::~JurisdictionListener() { + NodeList* nodeList = NodeList::getInstance(); + nodeList->removeHook(this); +} + +void JurisdictionListener::nodeAdded(Node* node) { + // nothing to do. But need to implement it. +} + +void JurisdictionListener::nodeKilled(Node* node) { + _jurisdictions.erase(_jurisdictions.find(node->getNodeID())); +} + +bool JurisdictionListener::queueJurisdictionRequest() { + static unsigned char buffer[MAX_PACKET_SIZE]; + unsigned char* bufferOut = &buffer[0]; + ssize_t sizeOut = populateTypeAndVersion(bufferOut, PACKET_TYPE_VOXEL_JURISDICTION_REQUEST); + int nodeCount = 0; + + NodeList* nodeList = NodeList::getInstance(); + for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { + + // only send to the NodeTypes that are interested in our jurisdiction details + const int numNodeTypes = 1; + const NODE_TYPE nodeTypes[numNodeTypes] = { NODE_TYPE_VOXEL_SERVER }; + if (node->getActiveSocket() != NULL && memchr(nodeTypes, node->getType(), numNodeTypes)) { + sockaddr* nodeAddress = node->getActiveSocket(); + PacketSender::queuePacketForSending(*nodeAddress, bufferOut, sizeOut); + nodeCount++; + } + } + + // set our packets per second to be the number of nodes + setPacketsPerSecond(nodeCount); + + // keep going if still running + return isStillRunning(); +} + +void JurisdictionListener::processPacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength) { + if (packetData[0] == PACKET_TYPE_VOXEL_JURISDICTION) { + Node* node = NodeList::getInstance()->nodeWithAddress(&senderAddress); + if (node) { + uint16_t nodeID = node->getNodeID(); + JurisdictionMap map; + map.unpackFromMessage(packetData, packetLength); + _jurisdictions[nodeID] = map; + } + } +} + +bool JurisdictionListener::process() { + bool continueProcessing = isStillRunning(); + + // If we're still running, and we don't have any requests waiting to be sent, then queue our jurisdiction requests + if (continueProcessing && !hasPacketsToSend()) { + queueJurisdictionRequest(); + continueProcessing = PacketSender::process(); + } + if (continueProcessing) { + // NOTE: This will sleep if there are no pending packets to process + continueProcessing = ReceivedPacketProcessor::process(); + } + return continueProcessing; +} diff --git a/libraries/voxels/src/JurisdictionListener.h b/libraries/voxels/src/JurisdictionListener.h new file mode 100644 index 0000000000..0a614446ed --- /dev/null +++ b/libraries/voxels/src/JurisdictionListener.h @@ -0,0 +1,55 @@ +// +// JurisdictionListener.h +// shared +// +// Created by Brad Hefta-Gaub on 8/12/13. +// Copyright (c) 2013 High Fidelity, Inc. All rights reserved. +// +// Voxel Packet Sender +// + +#ifndef __shared__JurisdictionListener__ +#define __shared__JurisdictionListener__ + +#include +#include +#include + +#include "JurisdictionMap.h" + +/// Sends out PACKET_TYPE_VOXEL_JURISDICTION_REQUEST packets to all voxel servers and then listens for and processes +/// the PACKET_TYPE_VOXEL_JURISDICTION packets it receives in order to maintain an accurate state of all jurisidictions +/// within the domain. As with other ReceivedPacketProcessor classes the user is responsible for reading inbound packets +/// and adding them to the processing queue by calling queueReceivedPacket() +class JurisdictionListener : public NodeListHook, public PacketSender, public ReceivedPacketProcessor { +public: + static const int DEFAULT_PACKETS_PER_SECOND = 1; + + JurisdictionListener(PacketSenderNotify* notify = NULL); + ~JurisdictionListener(); + + virtual bool process(); + + NodeToJurisdictionMap* getJurisdictions() { return &_jurisdictions; }; + + /// Called by NodeList to inform us that a node has been added. + void nodeAdded(Node* node); + /// Called by NodeList to inform us that a node has been killed. + void nodeKilled(Node* node); + +protected: + /// Callback for processing of received packets. Will process any queued PACKET_TYPE_VOXEL_JURISDICTION and update the + /// jurisdiction map member variable + /// \param sockaddr& senderAddress the address of the sender + /// \param packetData pointer to received data + /// \param ssize_t packetLength size of received data + /// \thread "this" individual processing thread + virtual void processPacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength); + +private: + NodeToJurisdictionMap _jurisdictions; + + bool queueJurisdictionRequest(); + +}; +#endif // __shared__JurisdictionListener__ diff --git a/libraries/voxels/src/JurisdictionMap.cpp b/libraries/voxels/src/JurisdictionMap.cpp index 360f74093d..94e589c801 100644 --- a/libraries/voxels/src/JurisdictionMap.cpp +++ b/libraries/voxels/src/JurisdictionMap.cpp @@ -9,6 +9,9 @@ #include #include #include +#include + +#include #include "JurisdictionMap.h" #include "VoxelNode.h" @@ -179,6 +182,18 @@ bool JurisdictionMap::readFromFile(const char* filename) { return true; } +void JurisdictionMap::displayDebugDetails() { + QString rootNodeValue = octalCodeToHexString(_rootOctalCode); + + qDebug() << "root:" << rootNodeValue << "\n"; + + for (int i = 0; i < _endNodes.size(); i++) { + QString value = octalCodeToHexString(_endNodes[i]); + qDebug() << "End node[" << i << "]: " << rootNodeValue << "\n"; + } +} + + bool JurisdictionMap::writeToFile(const char* filename) { QString settingsFile(filename); QSettings settings(settingsFile, QSettings::IniFormat); @@ -197,3 +212,95 @@ bool JurisdictionMap::writeToFile(const char* filename) { settings.endGroup(); return true; } + +int JurisdictionMap::packEmptyJurisdictionIntoMessage(unsigned char* destinationBuffer, int availableBytes) { + unsigned char* bufferStart = destinationBuffer; + + int headerLength = populateTypeAndVersion(destinationBuffer, PACKET_TYPE_VOXEL_JURISDICTION); + destinationBuffer += headerLength; + + // No root or end node details to pack! + int bytes = 0; + memcpy(destinationBuffer, &bytes, sizeof(bytes)); + destinationBuffer += sizeof(bytes); + + return destinationBuffer - bufferStart; // includes header! +} + +int JurisdictionMap::packIntoMessage(unsigned char* destinationBuffer, int availableBytes) { + unsigned char* bufferStart = destinationBuffer; + + int headerLength = populateTypeAndVersion(destinationBuffer, PACKET_TYPE_VOXEL_JURISDICTION); + destinationBuffer += headerLength; + + // add the root jurisdiction + if (_rootOctalCode) { + int bytes = bytesRequiredForCodeLength(numberOfThreeBitSectionsInCode(_rootOctalCode)); + memcpy(destinationBuffer, &bytes, sizeof(bytes)); + destinationBuffer += sizeof(bytes); + memcpy(destinationBuffer, _rootOctalCode, bytes); + destinationBuffer += bytes; + + // if and only if there's a root jurisdiction, also include the end nodes + int endNodeCount = _endNodes.size(); + memcpy(destinationBuffer, &endNodeCount, sizeof(endNodeCount)); + destinationBuffer += sizeof(endNodeCount); + + for (int i=0; i < endNodeCount; i++) { + unsigned char* endNodeCode = _endNodes[i]; + int bytes = 0; + if (endNodeCode) { + bytes = bytesRequiredForCodeLength(numberOfThreeBitSectionsInCode(endNodeCode)); + } + memcpy(destinationBuffer, &bytes, sizeof(bytes)); + destinationBuffer += sizeof(bytes); + memcpy(destinationBuffer, endNodeCode, bytes); + destinationBuffer += bytes; + } + } else { + int bytes = 0; + memcpy(destinationBuffer, &bytes, sizeof(bytes)); + destinationBuffer += sizeof(bytes); + } + + return destinationBuffer - bufferStart; // includes header! +} + +int JurisdictionMap::unpackFromMessage(unsigned char* sourceBuffer, int availableBytes) { + clear(); + unsigned char* startPosition = sourceBuffer; + + // increment to push past the packet header + int numBytesPacketHeader = numBytesForPacketHeader(sourceBuffer); + sourceBuffer += numBytesPacketHeader; + + // read the root jurisdiction + int bytes = 0; + memcpy(&bytes, sourceBuffer, sizeof(bytes)); + sourceBuffer += sizeof(bytes); + + if (bytes > 0) { + _rootOctalCode = new unsigned char[bytes]; + memcpy(_rootOctalCode, sourceBuffer, bytes); + sourceBuffer += bytes; + // if and only if there's a root jurisdiction, also include the end nodes + int endNodeCount = 0; + memcpy(&endNodeCount, sourceBuffer, sizeof(endNodeCount)); + sourceBuffer += sizeof(endNodeCount); + for (int i=0; i < endNodeCount; i++) { + int bytes = 0; + memcpy(&bytes, sourceBuffer, sizeof(bytes)); + sourceBuffer += sizeof(bytes); + unsigned char* endNodeCode = new unsigned char[bytes]; + memcpy(endNodeCode, sourceBuffer, bytes); + sourceBuffer += bytes; + + // if the endNodeCode was 0 length then don't add it + if (bytes > 0) { + _endNodes.push_back(endNodeCode); + } + } + } + + return sourceBuffer - startPosition; // includes header! +} diff --git a/libraries/voxels/src/JurisdictionMap.h b/libraries/voxels/src/JurisdictionMap.h index 2b72596f76..a3e96933a9 100644 --- a/libraries/voxels/src/JurisdictionMap.h +++ b/libraries/voxels/src/JurisdictionMap.h @@ -9,6 +9,8 @@ #ifndef __hifi__JurisdictionMap__ #define __hifi__JurisdictionMap__ +#include +#include #include #include @@ -49,6 +51,14 @@ public: int getEndNodeCount() const { return _endNodes.size(); } void copyContents(unsigned char* rootCodeIn, const std::vector& endNodesIn); + + int unpackFromMessage(unsigned char* sourceBuffer, int availableBytes); + int packIntoMessage(unsigned char* destinationBuffer, int availableBytes); + + /// Available to pack an empty or unknown jurisdiction into a network packet, used when no JurisdictionMap is available + static int packEmptyJurisdictionIntoMessage(unsigned char* destinationBuffer, int availableBytes); + + void displayDebugDetails(); private: void copyContents(const JurisdictionMap& other); // use assignment instead @@ -59,6 +69,11 @@ private: std::vector _endNodes; }; +/// Map between node IDs and their reported JurisdictionMap. Typically used by classes that need to know which nodes are +/// managing which jurisdictions. +typedef std::map NodeToJurisdictionMap; + + #endif /* defined(__hifi__JurisdictionMap__) */ diff --git a/libraries/voxels/src/JurisdictionSender.cpp b/libraries/voxels/src/JurisdictionSender.cpp new file mode 100644 index 0000000000..df48c27c47 --- /dev/null +++ b/libraries/voxels/src/JurisdictionSender.cpp @@ -0,0 +1,76 @@ +// +// JurisdictionSender.cpp +// shared +// +// Created by Brad Hefta-Gaub on 8/12/13. +// Copyright (c) 2013 High Fidelity, Inc. All rights reserved. +// +// Threaded or non-threaded jurisdiction Sender for the Application +// + +#include + +#include +#include +#include +#include "JurisdictionSender.h" + + +JurisdictionSender::JurisdictionSender(JurisdictionMap* map, PacketSenderNotify* notify) : + PacketSender(notify, JurisdictionSender::DEFAULT_PACKETS_PER_SECOND), + ReceivedPacketProcessor(), + _jurisdictionMap(map) +{ +} + +void JurisdictionSender::processPacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength) { + if (packetData[0] == PACKET_TYPE_VOXEL_JURISDICTION_REQUEST) { + Node* node = NodeList::getInstance()->nodeWithAddress(&senderAddress); + if (node) { + uint16_t nodeID = node->getNodeID(); + lock(); + _nodesRequestingJurisdictions.insert(nodeID); + unlock(); + } + } +} + +bool JurisdictionSender::process() { + bool continueProcessing = isStillRunning(); + + // call our ReceivedPacketProcessor base class process so we'll get any pending packets + if (continueProcessing && (continueProcessing = ReceivedPacketProcessor::process())) { + // add our packet to our own queue, then let the PacketSender class do the rest of the work. + static unsigned char buffer[MAX_PACKET_SIZE]; + unsigned char* bufferOut = &buffer[0]; + ssize_t sizeOut = 0; + + if (_jurisdictionMap) { + sizeOut = _jurisdictionMap->packIntoMessage(bufferOut, MAX_PACKET_SIZE); + } else { + sizeOut = JurisdictionMap::packEmptyJurisdictionIntoMessage(bufferOut, MAX_PACKET_SIZE); + } + int nodeCount = 0; + + for (std::set::iterator nodeIterator = _nodesRequestingJurisdictions.begin(); + nodeIterator != _nodesRequestingJurisdictions.end(); nodeIterator++) { + + uint16_t nodeID = *nodeIterator; + Node* node = NodeList::getInstance()->nodeWithID(nodeID); + + if (node->getActiveSocket() != NULL) { + sockaddr* nodeAddress = node->getActiveSocket(); + queuePacketForSending(*nodeAddress, bufferOut, sizeOut); + nodeCount++; + // remove it from the set + _nodesRequestingJurisdictions.erase(nodeIterator); + } + } + + // set our packets per second to be the number of nodes + setPacketsPerSecond(nodeCount); + + continueProcessing = PacketSender::process(); + } + return continueProcessing; +} diff --git a/libraries/voxels/src/JurisdictionSender.h b/libraries/voxels/src/JurisdictionSender.h new file mode 100644 index 0000000000..34f5a9f06f --- /dev/null +++ b/libraries/voxels/src/JurisdictionSender.h @@ -0,0 +1,40 @@ +// +// JurisdictionSender.h +// shared +// +// Created by Brad Hefta-Gaub on 8/12/13. +// Copyright (c) 2013 High Fidelity, Inc. All rights reserved. +// +// Voxel Packet Sender +// + +#ifndef __shared__JurisdictionSender__ +#define __shared__JurisdictionSender__ + +#include + +#include +#include +#include "JurisdictionMap.h" + +/// Will process PACKET_TYPE_VOXEL_JURISDICTION_REQUEST packets and send out PACKET_TYPE_VOXEL_JURISDICTION packets +/// to requesting parties. As with other ReceivedPacketProcessor classes the user is responsible for reading inbound packets +/// and adding them to the processing queue by calling queueReceivedPacket() +class JurisdictionSender : public PacketSender, public ReceivedPacketProcessor { +public: + static const int DEFAULT_PACKETS_PER_SECOND = 1; + + JurisdictionSender(JurisdictionMap* map, PacketSenderNotify* notify = NULL); + + void setJurisdiction(JurisdictionMap* map) { _jurisdictionMap = map; } + + virtual bool process(); + +protected: + virtual void processPacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength); + +private: + JurisdictionMap* _jurisdictionMap; + std::set _nodesRequestingJurisdictions; +}; +#endif // __shared__JurisdictionSender__ diff --git a/interface/src/VoxelEditPacketSender.cpp b/libraries/voxels/src/VoxelEditPacketSender.cpp similarity index 73% rename from interface/src/VoxelEditPacketSender.cpp rename to libraries/voxels/src/VoxelEditPacketSender.cpp index 3716e16208..5853652688 100644 --- a/interface/src/VoxelEditPacketSender.cpp +++ b/libraries/voxels/src/VoxelEditPacketSender.cpp @@ -10,33 +10,30 @@ #include -#include "Application.h" -#include "Menu.h" +#include +#include #include "VoxelEditPacketSender.h" -VoxelEditPacketSender::VoxelEditPacketSender(Application* app) : - _app(app) -{ + +VoxelEditPacketSender::VoxelEditPacketSender(PacketSenderNotify* notify) : + PacketSender(notify), + _shouldSend(true), + _voxelServerJurisdictions(NULL) { } void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail) { - - // if the app has Voxels disabled, we don't do any of this... - if (!Menu::getInstance()->isOptionChecked(MenuOption::Voxels)) { + // allows app to disable sending if for example voxels have been disabled + if (!_shouldSend) { return; // bail early } unsigned char* bufferOut; int sizeOut; - int totalBytesSent = 0; if (createVoxelEditMessage(type, 0, 1, &detail, bufferOut, sizeOut)){ actuallySendMessage(UNKNOWN_NODE_ID, bufferOut, sizeOut); // sends to all servers... not ideal! delete[] bufferOut; } - - // Tell the application's bandwidth meters about what we've sent - _app->_bandwidthMeter.outputStream(BandwidthMeter::VOXELS).updateValue(totalBytesSent); } void VoxelEditPacketSender::actuallySendMessage(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut) { @@ -46,12 +43,32 @@ void VoxelEditPacketSender::actuallySendMessage(uint16_t nodeID, unsigned char* if (node->getActiveSocket() != NULL && node->getType() == NODE_TYPE_VOXEL_SERVER && ((node->getNodeID() == nodeID) || (nodeID == (uint16_t)UNKNOWN_NODE_ID)) ) { sockaddr* nodeAddress = node->getActiveSocket(); - queuePacket(*nodeAddress, bufferOut, sizeOut); + queuePacketForSending(*nodeAddress, bufferOut, sizeOut); } } } +void VoxelEditPacketSender::queueVoxelEditMessages(PACKET_TYPE type, int numberOfDetails, VoxelDetail* details) { + if (!_shouldSend) { + return; // bail early + } + + for (int i = 0; i < numberOfDetails; i++) { + static unsigned char bufferOut[MAX_PACKET_SIZE]; + int sizeOut = 0; + + if (encodeVoxelEditMessageDetails(type, 1, &details[i], &bufferOut[0], MAX_PACKET_SIZE, sizeOut)) { + queueVoxelEditMessage(type, bufferOut, sizeOut); + } + } +} + void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length) { + + if (!_shouldSend) { + return; // bail early + } + // We want to filter out edit messages for voxel servers based on the server's Jurisdiction // But we can't really do that with a packed message, since each edit message could be destined // for a different voxel server... So we need to actually manage multiple queued packets... one @@ -60,12 +77,16 @@ void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned cha for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { // only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER if (node->getActiveSocket() != NULL && node->getType() == NODE_TYPE_VOXEL_SERVER) { - - // we need to get the jurisdiction for this - // here we need to get the "pending packet" for this server uint16_t nodeID = node->getNodeID(); - const JurisdictionMap& map = _app->_voxelServerJurisdictions[nodeID]; - if (map.isMyJurisdiction(codeColorBuffer, CHECK_NODE_ONLY) == JurisdictionMap::WITHIN) { + bool isMyJurisdiction = true; + + if (_voxelServerJurisdictions) { + // we need to get the jurisdiction for this + // here we need to get the "pending packet" for this server + const JurisdictionMap& map = (*_voxelServerJurisdictions)[nodeID]; + isMyJurisdiction = (map.isMyJurisdiction(codeColorBuffer, CHECK_NODE_ONLY) == JurisdictionMap::WITHIN); + } + if (isMyJurisdiction) { EditPacketBuffer& packetBuffer = _pendingEditPackets[nodeID]; packetBuffer._nodeID = nodeID; diff --git a/interface/src/VoxelEditPacketSender.h b/libraries/voxels/src/VoxelEditPacketSender.h similarity index 68% rename from interface/src/VoxelEditPacketSender.h rename to libraries/voxels/src/VoxelEditPacketSender.h index d3058828e3..e07bd11baa 100644 --- a/interface/src/VoxelEditPacketSender.h +++ b/libraries/voxels/src/VoxelEditPacketSender.h @@ -1,6 +1,6 @@ // // VoxelEditPacketSender.h -// interface +// shared // // Created by Brad Hefta-Gaub on 8/12/13. // Copyright (c) 2013 High Fidelity, Inc. All rights reserved. @@ -13,8 +13,7 @@ #include #include // for VoxelDetail - -class Application; +#include "JurisdictionMap.h" /// Used for construction of edit voxel packets class EditPacketBuffer { @@ -29,7 +28,7 @@ public: /// Threaded processor for queueing and sending of outbound edit voxel packets. class VoxelEditPacketSender : public PacketSender { public: - VoxelEditPacketSender(Application* app); + VoxelEditPacketSender(PacketSenderNotify* notify = NULL); /// Send voxel edit message immediately void sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail); @@ -38,15 +37,28 @@ public: /// node or nodes the packet should be sent to. void queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length); + /// Queues an array of several voxel edit messages. Will potentially send a pending multi-command packet. Determines + /// which voxel-server node or nodes the packet should be sent to. + void queueVoxelEditMessages(PACKET_TYPE type, int numberOfDetails, VoxelDetail* details); + /// flushes all queued packets for all nodes void flushQueue(); + bool getShouldSend() const { return _shouldSend; } + void setShouldSend(bool shouldSend) { _shouldSend = shouldSend; } + + void setVoxelServerJurisdictions(NodeToJurisdictionMap* voxelServerJurisdictions) { + _voxelServerJurisdictions = voxelServerJurisdictions; + } + private: + bool _shouldSend; void actuallySendMessage(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut); void initializePacket(EditPacketBuffer& packetBuffer, PACKET_TYPE type); void flushQueue(EditPacketBuffer& packetBuffer); // flushes specific queued packet - Application* _app; std::map _pendingEditPackets; + + NodeToJurisdictionMap* _voxelServerJurisdictions; }; #endif // __shared__VoxelEditPacketSender__ diff --git a/libraries/voxels/src/VoxelSceneStats.cpp b/libraries/voxels/src/VoxelSceneStats.cpp index 02027a5ee4..1ebc4015e0 100644 --- a/libraries/voxels/src/VoxelSceneStats.cpp +++ b/libraries/voxels/src/VoxelSceneStats.cpp @@ -543,7 +543,7 @@ VoxelSceneStats::ItemInfo VoxelSceneStats::_ITEMS[] = { { "Mode" , greenish }, }; -char* VoxelSceneStats::getItemValue(int item) { +char* VoxelSceneStats::getItemValue(Item item) { const uint64_t USECS_PER_SECOND = 1000 * 1000; int calcFPS, calcAverageFPS, calculatedKBPS; switch(item) { diff --git a/libraries/voxels/src/VoxelSceneStats.h b/libraries/voxels/src/VoxelSceneStats.h index feb8b81edc..0883de45e1 100644 --- a/libraries/voxels/src/VoxelSceneStats.h +++ b/libraries/voxels/src/VoxelSceneStats.h @@ -16,42 +16,83 @@ class VoxelNode; +/// Collects statistics for calculating and sending a scene from a voxel server to an interface client class VoxelSceneStats { public: VoxelSceneStats(); ~VoxelSceneStats(); void reset(); + + /// Call when beginning the computation of a scene. Initializes internal structures void sceneStarted(bool fullScene, bool moving, VoxelNode* root, JurisdictionMap* jurisdictionMap); + + /// Call when the computation of a scene is completed. Finalizes internal structures void sceneCompleted(); void printDebugDetails(); + + /// Track that a packet was sent as part of the scene. void packetSent(int bytes); + /// Tracks the beginning of an encode pass during scene calculation. void encodeStarted(); + + /// Tracks the ending of an encode pass during scene calculation. void encodeStopped(); + /// Track that a node was traversed as part of computation of a scene. void traversed(const VoxelNode* node); + + /// Track that a node was skipped as part of computation of a scene due to being beyond the LOD distance. void skippedDistance(const VoxelNode* node); + + /// Track that a node was skipped as part of computation of a scene due to being out of view. void skippedOutOfView(const VoxelNode* node); + + /// Track that a node was skipped as part of computation of a scene due to previously being in view while in delta sending void skippedWasInView(const VoxelNode* node); + + /// Track that a node was skipped as part of computation of a scene due to not having changed since last full scene sent void skippedNoChange(const VoxelNode* node); + + /// Track that a node was skipped as part of computation of a scene due to being occluded void skippedOccluded(const VoxelNode* node); + + /// Track that a node's color was was sent as part of computation of a scene void colorSent(const VoxelNode* node); + + /// Track that a node was due to be sent, but didn't fit in the packet and was moved to next packet void didntFit(const VoxelNode* node); + + /// Track that the color bitmask was was sent as part of computation of a scene void colorBitsWritten(); + + /// Track that the exists in tree bitmask was was sent as part of computation of a scene void existsBitsWritten(); + + /// Track that the exists in packet bitmask was was sent as part of computation of a scene void existsInPacketBitsWritten(); + + /// Fix up tracking statistics in case where bitmasks were removed for some reason void childBitsRemoved(bool includesExistsBits, bool includesColors); + /// Pack the details of the statistics into a buffer for sending as a network packet int packIntoMessage(unsigned char* destinationBuffer, int availableBytes); + + /// Unpack the details of the statistics from a buffer typically received as a network packet int unpackFromMessage(unsigned char* sourceBuffer, int availableBytes); + /// Indicates that a scene has been completed and the statistics are ready to be sent bool isReadyToSend() const { return _isReadyToSend; } + + /// Mark that the scene statistics have been sent void markAsSent() { _isReadyToSend = false; } + unsigned char* getStatsMessage() { return &_statsMessage[0]; } int getStatsMessageLength() const { return _statsMessageLength; } - enum { + /// List of various items tracked by VoxelSceneStats which can be accessed via getItemInfo() and getItemValue() + enum Item { ITEM_ELAPSED, ITEM_ENCODE, ITEM_PACKETS, @@ -71,16 +112,24 @@ public: ITEM_COUNT }; - // Meta information about each stats item + /// Meta information about each stats item struct ItemInfo { char const* const caption; unsigned colorRGBA; }; - ItemInfo& getItemInfo(int item) { return _ITEMS[item]; }; - char* getItemValue(int item); + /// Returns details about items tracked by VoxelSceneStats + /// \param Item item The item from the stats you're interested in. + ItemInfo& getItemInfo(Item item) { return _ITEMS[item]; }; + + /// Returns a UI formatted value of an item tracked by VoxelSceneStats + /// \param Item item The item from the stats you're interested in. + char* getItemValue(Item item); + /// Returns OctCode for root node of the jurisdiction of this particular voxel server unsigned char* getJurisdictionRoot() const { return _jurisdictionRoot; } + + /// Returns list of OctCodes for end nodes of the jurisdiction of this particular voxel server const std::vector& getJurisdictionEndNodes() const { return _jurisdictionEndNodes; } private: diff --git a/voxel-server/src/main.cpp b/voxel-server/src/main.cpp index c7e266e430..3446ebe9c1 100644 --- a/voxel-server/src/main.cpp +++ b/voxel-server/src/main.cpp @@ -21,6 +21,8 @@ #include #include +#include + #ifdef _WIN32 #include "Syssocket.h" #include "Systime.h" @@ -73,6 +75,7 @@ EnvironmentData environmentData[3]; int receivedPacketCount = 0; JurisdictionMap* jurisdiction = NULL; +JurisdictionSender* jurisdictionSender = NULL; void randomlyFillVoxelTree(int levelsToGo, VoxelNode *currentRootNode) { // randomly generate children for this node @@ -400,7 +403,7 @@ void persistVoxelsWhenDirty() { } } -void *distributeVoxelsToListeners(void *args) { +void* distributeVoxelsToListeners(void* args) { NodeList* nodeList = NodeList::getInstance(); timeval lastSendTime; @@ -408,7 +411,6 @@ void *distributeVoxelsToListeners(void *args) { while (true) { gettimeofday(&lastSendTime, NULL); - // enumerate the nodes to send 3 packets to each for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { VoxelNodeData* nodeData = (VoxelNodeData*) node->getLinkedData(); @@ -482,11 +484,10 @@ int main(int argc, const char * argv[]) { } if (jurisdictionRoot || jurisdictionEndNodes) { - jurisdiction = new JurisdictionMap(jurisdictionRoot, jurisdictionEndNodes); + ::jurisdiction = new JurisdictionMap(jurisdictionRoot, jurisdictionEndNodes); } } - // should we send environments? Default is yes, but this command line suppresses sending const char* DUMP_VOXELS_ON_MOVE = "--dumpVoxelsOnMove"; ::dumpVoxelsOnMove = cmdOptionExists(argc, argv, DUMP_VOXELS_ON_MOVE); @@ -509,6 +510,9 @@ int main(int argc, const char * argv[]) { NodeList* nodeList = NodeList::createInstance(NODE_TYPE_VOXEL_SERVER, listenPort); setvbuf(stdout, NULL, _IOLBF, 0); + const NODE_TYPE nodeTypes[] = { NODE_TYPE_AGENT, NODE_TYPE_ANIMATION_SERVER }; + NodeList::getInstance()->setNodeTypesOfInterest(&nodeTypes[0], sizeof(nodeTypes)); + // Handle Local Domain testing with the --local command line const char* local = "--local"; ::wantLocalDomain = cmdOptionExists(argc, argv,local); @@ -655,8 +659,13 @@ int main(int argc, const char * argv[]) { timeval lastDomainServerCheckIn = {}; + // set up our jurisdiction broadcaster... + ::jurisdictionSender = new JurisdictionSender(::jurisdiction); + if (::jurisdictionSender) { + ::jurisdictionSender->initialize(true); + } + // loop to send to nodes requesting data - while (true) { // send a check in packet to the domain server if DOMAIN_SERVER_CHECK_IN_USECS has elapsed @@ -735,12 +744,25 @@ int main(int argc, const char * argv[]) { voxelData += voxelDataSize; atByte += voxelDataSize; } + + // Make sure our Node and NodeList knows we've heard from this node. + Node* node = NodeList::getInstance()->nodeWithAddress(&nodePublicAddress); + if (node) { + node->setLastHeardMicrostamp(usecTimestampNow()); + } + } else if (packetData[0] == PACKET_TYPE_ERASE_VOXEL) { // Send these bits off to the VoxelTree class to process them pthread_mutex_lock(&::treeLock); serverTree.processRemoveVoxelBitstream((unsigned char*)packetData, receivedBytes); pthread_mutex_unlock(&::treeLock); + + // Make sure our Node and NodeList knows we've heard from this node. + Node* node = NodeList::getInstance()->nodeWithAddress(&nodePublicAddress); + if (node) { + node->setLastHeardMicrostamp(usecTimestampNow()); + } } else if (packetData[0] == PACKET_TYPE_Z_COMMAND) { // the Z command is a special command that allows the sender to send the voxel server high level semantic @@ -774,6 +796,12 @@ int main(int argc, const char * argv[]) { printf("rebroadcasting Z message to connected nodes... nodeList.broadcastToNodes()\n"); nodeList->broadcastToNodes(packetData, receivedBytes, &NODE_TYPE_AGENT, 1); } + + // Make sure our Node and NodeList knows we've heard from this node. + Node* node = NodeList::getInstance()->nodeWithAddress(&nodePublicAddress); + if (node) { + node->setLastHeardMicrostamp(usecTimestampNow()); + } } else if (packetData[0] == PACKET_TYPE_HEAD_DATA) { // If we got a PACKET_TYPE_HEAD_DATA, then we're talking to an NODE_TYPE_AVATAR, and we // need to make sure we have it in our nodeList. @@ -789,6 +817,14 @@ int main(int argc, const char * argv[]) { } else if (packetData[0] == PACKET_TYPE_PING) { // If the packet is a ping, let processNodeData handle it. nodeList->processNodeData(&nodePublicAddress, packetData, receivedBytes); + } else if (packetData[0] == PACKET_TYPE_DOMAIN) { + nodeList->processNodeData(&nodePublicAddress, packetData, receivedBytes); + } else if (packetData[0] == PACKET_TYPE_VOXEL_JURISDICTION_REQUEST) { + if (::jurisdictionSender) { + jurisdictionSender->queueReceivedPacket(nodePublicAddress, packetData, receivedBytes); + } + } else { + printf("unknown packet ignored... packetData[0]=%c\n", packetData[0]); } } } @@ -796,8 +832,13 @@ int main(int argc, const char * argv[]) { pthread_join(sendVoxelThread, NULL); pthread_mutex_destroy(&::treeLock); - if (jurisdiction) { - delete jurisdiction; + if (::jurisdiction) { + delete ::jurisdiction; + } + + if (::jurisdictionSender) { + jurisdictionSender->terminate(); + delete ::jurisdictionSender; } return 0;