This commit is contained in:
ZappoMan 2014-01-22 16:22:21 -08:00
commit 6b15033337
12 changed files with 48 additions and 31 deletions

View file

@ -15,7 +15,7 @@
#include "VoxelSystem.h" #include "VoxelSystem.h"
/// Generalized threaded processor for handling received inbound packets. /// Generalized threaded processor for handling received inbound packets.
class VoxelHideShowThread : public virtual GenericThread { class VoxelHideShowThread : public GenericThread {
public: public:
VoxelHideShowThread(VoxelSystem* theSystem); VoxelHideShowThread(VoxelSystem* theSystem);

View file

@ -17,7 +17,7 @@
#include "OctreeServer.h" #include "OctreeServer.h"
/// Threaded processor for sending voxel packets to a single client /// Threaded processor for sending voxel packets to a single client
class OctreeSendThread : public virtual GenericThread { class OctreeSendThread : public GenericThread {
public: public:
OctreeSendThread(const QUuid& nodeUUID, OctreeServer* myServer); OctreeSendThread(const QUuid& nodeUUID, OctreeServer* myServer);

View file

@ -16,13 +16,13 @@
#include "JurisdictionListener.h" #include "JurisdictionListener.h"
JurisdictionListener::JurisdictionListener(NODE_TYPE type, PacketSenderNotify* notify) : JurisdictionListener::JurisdictionListener(NODE_TYPE type, PacketSenderNotify* notify) :
PacketSender(notify, JurisdictionListener::DEFAULT_PACKETS_PER_SECOND) _packetSender(notify, JurisdictionListener::DEFAULT_PACKETS_PER_SECOND)
{ {
_nodeType = type; _nodeType = type;
ReceivedPacketProcessor::_dontSleep = true; // we handle sleeping so this class doesn't need to ReceivedPacketProcessor::_dontSleep = true; // we handle sleeping so this class doesn't need to
// connect(nodeList, &NodeList::nodeKilled, this, &JurisdictionListener::nodeKilled); connect(NodeList::getInstance(), &NodeList::nodeKilled, this, &JurisdictionListener::nodeKilled);
// qDebug("JurisdictionListener::JurisdictionListener(NODE_TYPE type=%c)\n", type); //qDebug("JurisdictionListener::JurisdictionListener(NODE_TYPE type=%c)", type);
} }
void JurisdictionListener::nodeKilled(SharedNodePointer node) { void JurisdictionListener::nodeKilled(SharedNodePointer node) {
@ -32,7 +32,7 @@ void JurisdictionListener::nodeKilled(SharedNodePointer node) {
} }
bool JurisdictionListener::queueJurisdictionRequest() { bool JurisdictionListener::queueJurisdictionRequest() {
//qDebug() << "JurisdictionListener::queueJurisdictionRequest()\n"; //qDebug() << "JurisdictionListener::queueJurisdictionRequest()";
static unsigned char buffer[MAX_PACKET_SIZE]; static unsigned char buffer[MAX_PACKET_SIZE];
unsigned char* bufferOut = &buffer[0]; unsigned char* bufferOut = &buffer[0];
@ -45,15 +45,15 @@ bool JurisdictionListener::queueJurisdictionRequest() {
if (nodeList->getNodeActiveSocketOrPing(node.data()) && if (nodeList->getNodeActiveSocketOrPing(node.data()) &&
node->getType() == getNodeType()) { node->getType() == getNodeType()) {
const HifiSockAddr* nodeAddress = node->getActiveSocket(); const HifiSockAddr* nodeAddress = node->getActiveSocket();
PacketSender::queuePacketForSending(*nodeAddress, bufferOut, sizeOut); _packetSender.queuePacketForSending(*nodeAddress, bufferOut, sizeOut);
nodeCount++; nodeCount++;
} }
} }
if (nodeCount > 0){ if (nodeCount > 0){
setPacketsPerSecond(nodeCount); _packetSender.setPacketsPerSecond(nodeCount);
} else { } else {
setPacketsPerSecond(NO_SERVER_CHECK_RATE); _packetSender.setPacketsPerSecond(NO_SERVER_CHECK_RATE);
} }
// keep going if still running // keep going if still running
@ -61,6 +61,7 @@ bool JurisdictionListener::queueJurisdictionRequest() {
} }
void JurisdictionListener::processPacket(const HifiSockAddr& senderAddress, unsigned char* packetData, ssize_t packetLength) { void JurisdictionListener::processPacket(const HifiSockAddr& senderAddress, unsigned char* packetData, ssize_t packetLength) {
//qDebug() << "JurisdictionListener::processPacket()";
if (packetData[0] == PACKET_TYPE_JURISDICTION) { if (packetData[0] == PACKET_TYPE_JURISDICTION) {
SharedNodePointer node = NodeList::getInstance()->nodeWithAddress(senderAddress); SharedNodePointer node = NodeList::getInstance()->nodeWithAddress(senderAddress);
if (node) { if (node) {
@ -73,12 +74,17 @@ void JurisdictionListener::processPacket(const HifiSockAddr& senderAddress, unsi
} }
bool JurisdictionListener::process() { bool JurisdictionListener::process() {
//qDebug() << "JurisdictionListener::process()";
bool continueProcessing = isStillRunning(); bool continueProcessing = isStillRunning();
// If we're still running, and we don't have any requests waiting to be sent, then queue our jurisdiction requests // If we're still running, and we don't have any requests waiting to be sent, then queue our jurisdiction requests
if (continueProcessing && !hasPacketsToSend()) { if (continueProcessing && !_packetSender.hasPacketsToSend()) {
queueJurisdictionRequest(); queueJurisdictionRequest();
continueProcessing = PacketSender::process(); }
if (continueProcessing) {
//qDebug() << "JurisdictionListener::process() calling _packetSender.process()";
continueProcessing = _packetSender.process();
} }
if (continueProcessing) { if (continueProcessing) {
// NOTE: This will sleep if there are no pending packets to process // NOTE: This will sleep if there are no pending packets to process

View file

@ -22,7 +22,7 @@
/// the PACKET_TYPE_JURISDICTION packets it receives in order to maintain an accurate state of all jurisidictions /// the PACKET_TYPE_JURISDICTION packets it receives in order to maintain an accurate state of all jurisidictions
/// within the domain. As with other ReceivedPacketProcessor classes the user is responsible for reading inbound packets /// within the domain. As with other ReceivedPacketProcessor classes the user is responsible for reading inbound packets
/// and adding them to the processing queue by calling queueReceivedPacket() /// and adding them to the processing queue by calling queueReceivedPacket()
class JurisdictionListener : public PacketSender, public ReceivedPacketProcessor { class JurisdictionListener : public ReceivedPacketProcessor {
public: public:
static const int DEFAULT_PACKETS_PER_SECOND = 1; static const int DEFAULT_PACKETS_PER_SECOND = 1;
static const int NO_SERVER_CHECK_RATE = 60; // if no servers yet detected, keep checking at 60fps static const int NO_SERVER_CHECK_RATE = 60; // if no servers yet detected, keep checking at 60fps
@ -55,5 +55,7 @@ private:
NODE_TYPE _nodeType; NODE_TYPE _nodeType;
bool queueJurisdictionRequest(); bool queueJurisdictionRequest();
PacketSender _packetSender;
}; };
#endif // __shared__JurisdictionListener__ #endif // __shared__JurisdictionListener__

View file

@ -331,16 +331,20 @@ int JurisdictionMap::unpackFromMessage(unsigned char* sourceBuffer, int availabl
// increment to push past the packet header // increment to push past the packet header
int numBytesPacketHeader = numBytesForPacketHeader(sourceBuffer); int numBytesPacketHeader = numBytesForPacketHeader(sourceBuffer);
sourceBuffer += numBytesPacketHeader; sourceBuffer += numBytesPacketHeader;
int remainingBytes = availableBytes - numBytesPacketHeader;
// read the root jurisdiction // read the root jurisdiction
int bytes = 0; int bytes = 0;
memcpy(&bytes, sourceBuffer, sizeof(bytes)); memcpy(&bytes, sourceBuffer, sizeof(bytes));
sourceBuffer += sizeof(bytes); sourceBuffer += sizeof(bytes);
remainingBytes -= sizeof(bytes);
if (bytes > 0) { if (bytes > 0 && bytes <= remainingBytes) {
_rootOctalCode = new unsigned char[bytes]; _rootOctalCode = new unsigned char[bytes];
memcpy(_rootOctalCode, sourceBuffer, bytes); memcpy(_rootOctalCode, sourceBuffer, bytes);
sourceBuffer += bytes; sourceBuffer += bytes;
remainingBytes -= bytes;
// if and only if there's a root jurisdiction, also include the end nodes // if and only if there's a root jurisdiction, also include the end nodes
int endNodeCount = 0; int endNodeCount = 0;
memcpy(&endNodeCount, sourceBuffer, sizeof(endNodeCount)); memcpy(&endNodeCount, sourceBuffer, sizeof(endNodeCount));
@ -349,13 +353,18 @@ int JurisdictionMap::unpackFromMessage(unsigned char* sourceBuffer, int availabl
int bytes = 0; int bytes = 0;
memcpy(&bytes, sourceBuffer, sizeof(bytes)); memcpy(&bytes, sourceBuffer, sizeof(bytes));
sourceBuffer += sizeof(bytes); sourceBuffer += sizeof(bytes);
unsigned char* endNodeCode = new unsigned char[bytes]; remainingBytes -= sizeof(bytes);
memcpy(endNodeCode, sourceBuffer, bytes);
sourceBuffer += bytes;
// if the endNodeCode was 0 length then don't add it if (bytes <= remainingBytes) {
if (bytes > 0) { unsigned char* endNodeCode = new unsigned char[bytes];
_endNodes.push_back(endNodeCode); memcpy(endNodeCode, sourceBuffer, bytes);
sourceBuffer += bytes;
remainingBytes -= bytes;
// if the endNodeCode was 0 length then don't add it
if (bytes > 0) {
_endNodes.push_back(endNodeCode);
}
} }
} }
} }

View file

@ -17,9 +17,9 @@
JurisdictionSender::JurisdictionSender(JurisdictionMap* map, NODE_TYPE type, PacketSenderNotify* notify) : JurisdictionSender::JurisdictionSender(JurisdictionMap* map, NODE_TYPE type, PacketSenderNotify* notify) :
PacketSender(notify, JurisdictionSender::DEFAULT_PACKETS_PER_SECOND),
ReceivedPacketProcessor(), ReceivedPacketProcessor(),
_jurisdictionMap(map) _jurisdictionMap(map),
_packetSender(notify, JurisdictionSender::DEFAULT_PACKETS_PER_SECOND)
{ {
_nodeType = type; _nodeType = type;
} }
@ -66,16 +66,16 @@ bool JurisdictionSender::process() {
if (node->getActiveSocket() != NULL) { if (node->getActiveSocket() != NULL) {
const HifiSockAddr* nodeAddress = node->getActiveSocket(); const HifiSockAddr* nodeAddress = node->getActiveSocket();
queuePacketForSending(*nodeAddress, bufferOut, sizeOut); _packetSender.queuePacketForSending(*nodeAddress, bufferOut, sizeOut);
nodeCount++; nodeCount++;
} }
} }
unlockRequestingNodes(); unlockRequestingNodes();
// set our packets per second to be the number of nodes // set our packets per second to be the number of nodes
setPacketsPerSecond(nodeCount); _packetSender.setPacketsPerSecond(nodeCount);
continueProcessing = PacketSender::process(); continueProcessing = _packetSender.process();
} }
return continueProcessing; return continueProcessing;
} }

View file

@ -21,7 +21,7 @@
/// Will process PACKET_TYPE_JURISDICTION_REQUEST packets and send out PACKET_TYPE_JURISDICTION packets /// Will process PACKET_TYPE_JURISDICTION_REQUEST packets and send out PACKET_TYPE_JURISDICTION packets
/// to requesting parties. As with other ReceivedPacketProcessor classes the user is responsible for reading inbound packets /// to requesting parties. As with other ReceivedPacketProcessor classes the user is responsible for reading inbound packets
/// and adding them to the processing queue by calling queueReceivedPacket() /// and adding them to the processing queue by calling queueReceivedPacket()
class JurisdictionSender : public PacketSender, public ReceivedPacketProcessor { class JurisdictionSender : public ReceivedPacketProcessor {
Q_OBJECT Q_OBJECT
public: public:
static const int DEFAULT_PACKETS_PER_SECOND = 1; static const int DEFAULT_PACKETS_PER_SECOND = 1;
@ -51,5 +51,7 @@ private:
JurisdictionMap* _jurisdictionMap; JurisdictionMap* _jurisdictionMap;
std::queue<QUuid> _nodesRequestingJurisdictions; std::queue<QUuid> _nodesRequestingJurisdictions;
NODE_TYPE _nodeType; NODE_TYPE _nodeType;
PacketSender _packetSender;
}; };
#endif // __shared__JurisdictionSender__ #endif // __shared__JurisdictionSender__

View file

@ -46,7 +46,7 @@ void OctreeScriptingInterface::init() {
} else { } else {
_managedJurisdictionListener = true; _managedJurisdictionListener = true;
_jurisdictionListener = new JurisdictionListener(getServerNodeType()); _jurisdictionListener = new JurisdictionListener(getServerNodeType());
//printf("OctreeScriptingInterface::init() _managedJurisdictionListener=true, creating _jurisdictionListener=%p\n", _jurisdictionListener); //qDebug("OctreeScriptingInterface::init() _managedJurisdictionListener=true, creating _jurisdictionListener=%p", _jurisdictionListener);
_jurisdictionListener->initialize(true); _jurisdictionListener->initialize(true);
} }
@ -55,7 +55,7 @@ void OctreeScriptingInterface::init() {
} else { } else {
_managedPacketSender = true; _managedPacketSender = true;
_packetSender = createPacketSender(); _packetSender = createPacketSender();
//printf("OctreeScriptingInterface::init() _managedPacketSender=true, creating _packetSender=%p\n", _packetSender); //qDebug("OctreeScriptingInterface::init() _managedPacketSender=true, creating _packetSender=%p", _packetSender);
_packetSender->setServerJurisdictions(_jurisdictionListener->getJurisdictions()); _packetSender->setServerJurisdictions(_jurisdictionListener->getJurisdictions());
} }
} }

View file

@ -11,7 +11,6 @@
#include <QtCore/QObject> #include <QtCore/QObject>
#include <JurisdictionListener.h>
#include <OctreeScriptingInterface.h> #include <OctreeScriptingInterface.h>
#include "ParticleEditPacketSender.h" #include "ParticleEditPacketSender.h"

View file

@ -23,7 +23,7 @@ public:
/// Generalized threaded processor for queueing and sending of outbound packets. /// Generalized threaded processor for queueing and sending of outbound packets.
class PacketSender : public virtual GenericThread { class PacketSender : public GenericThread {
public: public:
static const uint64_t USECS_PER_SECOND; static const uint64_t USECS_PER_SECOND;

View file

@ -15,7 +15,7 @@
#include "NetworkPacket.h" #include "NetworkPacket.h"
/// Generalized threaded processor for handling received inbound packets. /// Generalized threaded processor for handling received inbound packets.
class ReceivedPacketProcessor : public virtual GenericThread { class ReceivedPacketProcessor : public GenericThread {
public: public:
ReceivedPacketProcessor(); ReceivedPacketProcessor();

View file

@ -11,7 +11,6 @@
#include <QtCore/QObject> #include <QtCore/QObject>
#include <JurisdictionListener.h>
#include <OctreeScriptingInterface.h> #include <OctreeScriptingInterface.h>
#include "VoxelConstants.h" #include "VoxelConstants.h"