Merge pull request #2950 from wangyix/flighttime_fix_pull

Fix for unreasonable flight times
This commit is contained in:
Brad Hefta-Gaub 2014-05-29 14:43:15 -07:00
commit 2c8da2bda7
18 changed files with 115 additions and 27 deletions

View file

@ -86,7 +86,7 @@ bool ModelServer::hasSpecialPacketToSend(const SharedNodePointer& node) {
return shouldSendDeletedModels;
}
int ModelServer::sendSpecialPacket(const SharedNodePointer& node) {
int ModelServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodePointer& node) {
unsigned char outputBuffer[MAX_PACKET_SIZE];
size_t packetLength = 0;
@ -100,12 +100,13 @@ int ModelServer::sendSpecialPacket(const SharedNodePointer& node) {
// TODO: is it possible to send too many of these packets? what if you deleted 1,000,000 models?
while (hasMoreToSend) {
hasMoreToSend = tree->encodeModelsDeletedSince(deletedModelsSentAt,
hasMoreToSend = tree->encodeModelsDeletedSince(queryNode->getSequenceNumber(), deletedModelsSentAt,
outputBuffer, MAX_PACKET_SIZE, packetLength);
//qDebug() << "sending PacketType_MODEL_ERASE packetLength:" << packetLength;
NodeList::getInstance()->writeDatagram((char*) outputBuffer, packetLength, SharedNodePointer(node));
queryNode->incrementSequenceNumber();
}
nodeData->setLastDeletedModelsSentAt(deletePacketSentAt);

View file

@ -37,7 +37,7 @@ public:
// subclass may implement these method
virtual void beforeRun();
virtual bool hasSpecialPacketToSend(const SharedNodePointer& node);
virtual int sendSpecialPacket(const SharedNodePointer& node);
virtual int sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodePointer& node);
virtual void modelCreated(const ModelItem& newModel, const SharedNodePointer& senderNode);

View file

@ -362,7 +362,3 @@ void OctreeQueryNode::dumpOutOfView() {
}
}
}
void OctreeQueryNode::incrementSequenceNumber() {
_sequenceNumber++;
}

View file

@ -100,7 +100,9 @@ public:
void forceNodeShutdown();
bool isShuttingDown() const { return _isShuttingDown; }
void incrementSequenceNumber();
void incrementSequenceNumber() { _sequenceNumber++; }
OCTREE_PACKET_SEQUENCE getSequenceNumber() const { return _sequenceNumber; }
private slots:
void sendThreadFinished();

View file

@ -142,12 +142,16 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
}
const unsigned char* messageData = nodeData->getPacket();
int numBytesPacketHeader = numBytesForPacketHeader(reinterpret_cast<const char*>(messageData));
const unsigned char* dataAt = messageData + numBytesPacketHeader;
dataAt += sizeof(OCTREE_PACKET_FLAGS);
OCTREE_PACKET_SEQUENCE sequence = (*(OCTREE_PACKET_SEQUENCE*)dataAt);
dataAt += sizeof(OCTREE_PACKET_SEQUENCE);
OCTREE_PACKET_SENT_TIME timestamp = (*(OCTREE_PACKET_SENT_TIME*)dataAt);
dataAt += sizeof(OCTREE_PACKET_SENT_TIME);
// If we've got a stats message ready to send, then see if we can piggyback them together
if (nodeData->stats.isReadyToSend() && !nodeData->isShuttingDown()) {
@ -529,7 +533,8 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
// send the environment packet
// TODO: should we turn this into a while loop to better handle sending multiple special packets
if (_myServer->hasSpecialPacketToSend(_node) && !nodeData->isShuttingDown()) {
trueBytesSent += _myServer->sendSpecialPacket(_node);
trueBytesSent += _myServer->sendSpecialPacket(nodeData, _node);
nodeData->resetOctreePacket(); // because nodeData's _sequenceNumber has changed
truePacketsSent++;
packetsSentThisInterval++;
}

View file

@ -72,7 +72,7 @@ public:
// subclass may implement these method
virtual void beforeRun() { };
virtual bool hasSpecialPacketToSend(const SharedNodePointer& node) { return false; }
virtual int sendSpecialPacket(const SharedNodePointer& node) { return 0; }
virtual int sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodePointer& node) { return 0; }
static void attachQueryNodeToNode(Node* newNode);

View file

@ -86,7 +86,7 @@ bool ParticleServer::hasSpecialPacketToSend(const SharedNodePointer& node) {
return shouldSendDeletedParticles;
}
int ParticleServer::sendSpecialPacket(const SharedNodePointer& node) {
int ParticleServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodePointer& node) {
unsigned char outputBuffer[MAX_PACKET_SIZE];
size_t packetLength = 0;
@ -100,12 +100,13 @@ int ParticleServer::sendSpecialPacket(const SharedNodePointer& node) {
// TODO: is it possible to send too many of these packets? what if you deleted 1,000,000 particles?
while (hasMoreToSend) {
hasMoreToSend = tree->encodeParticlesDeletedSince(deletedParticlesSentAt,
hasMoreToSend = tree->encodeParticlesDeletedSince(queryNode->getSequenceNumber(), deletedParticlesSentAt,
outputBuffer, MAX_PACKET_SIZE, packetLength);
//qDebug() << "sending PacketType_PARTICLE_ERASE packetLength:" << packetLength;
NodeList::getInstance()->writeDatagram((char*) outputBuffer, packetLength, SharedNodePointer(node));
queryNode->incrementSequenceNumber();
}
nodeData->setLastDeletedParticlesSentAt(deletePacketSentAt);

View file

@ -37,7 +37,7 @@ public:
// subclass may implement these method
virtual void beforeRun();
virtual bool hasSpecialPacketToSend(const SharedNodePointer& node);
virtual int sendSpecialPacket(const SharedNodePointer& node);
virtual int sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodePointer& node);
virtual void particleCreated(const Particle& newParticle, const SharedNodePointer& senderNode);

View file

@ -40,9 +40,34 @@ bool VoxelServer::hasSpecialPacketToSend(const SharedNodePointer& node) {
return shouldSendEnvironments;
}
int VoxelServer::sendSpecialPacket(const SharedNodePointer& node) {
int VoxelServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodePointer& node) {
unsigned char* copyAt = _tempOutputBuffer;
int numBytesPacketHeader = populatePacketHeader(reinterpret_cast<char*>(_tempOutputBuffer), PacketTypeEnvironmentData);
copyAt += numBytesPacketHeader;
int envPacketLength = numBytesPacketHeader;
// pack in flags
OCTREE_PACKET_FLAGS flags = 0;
OCTREE_PACKET_FLAGS* flagsAt = (OCTREE_PACKET_FLAGS*)copyAt;
*flagsAt = flags;
copyAt += sizeof(OCTREE_PACKET_FLAGS);
envPacketLength += sizeof(OCTREE_PACKET_FLAGS);
// pack in sequence number
OCTREE_PACKET_SEQUENCE* sequenceAt = (OCTREE_PACKET_SEQUENCE*)copyAt;
*sequenceAt = queryNode->getSequenceNumber();
copyAt += sizeof(OCTREE_PACKET_SEQUENCE);
envPacketLength += sizeof(OCTREE_PACKET_SEQUENCE);
// pack in timestamp
OCTREE_PACKET_SENT_TIME now = usecTimestampNow();
OCTREE_PACKET_SENT_TIME* timeAt = (OCTREE_PACKET_SENT_TIME*)copyAt;
*timeAt = now;
copyAt += sizeof(OCTREE_PACKET_SENT_TIME);
envPacketLength += sizeof(OCTREE_PACKET_SENT_TIME);
int environmentsToSend = getSendMinimalEnvironment() ? 1 : getEnvironmentDataCount();
for (int i = 0; i < environmentsToSend; i++) {
@ -50,6 +75,8 @@ int VoxelServer::sendSpecialPacket(const SharedNodePointer& node) {
}
NodeList::getInstance()->writeDatagram((char*) _tempOutputBuffer, envPacketLength, SharedNodePointer(node));
queryNode->incrementSequenceNumber();
return envPacketLength;
}

View file

@ -46,7 +46,7 @@ public:
// subclass may implement these method
virtual void beforeRun();
virtual bool hasSpecialPacketToSend(const SharedNodePointer& node);
virtual int sendSpecialPacket(const SharedNodePointer& node);
virtual int sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodePointer& node);
private:
bool _sendEnvironments;

View file

@ -160,6 +160,11 @@ bool Environment::findCapsulePenetration(const glm::vec3& start, const glm::vec3
int Environment::parseData(const HifiSockAddr& senderAddress, const QByteArray& packet) {
// push past the packet header
int bytesRead = numBytesForPacketHeader(packet);
// push past flags, sequence, timestamp
bytesRead += sizeof(OCTREE_PACKET_FLAGS);
bytesRead += sizeof(OCTREE_PACKET_SEQUENCE);
bytesRead += sizeof(OCTREE_PACKET_SENT_TIME);
// get the lock for the duration of the call
QMutexLocker locker(&_mutex);

View file

@ -39,7 +39,7 @@ void VoxelPacketProcessor::processPacket(const SharedNodePointer& sendingNode, c
}
PacketType voxelPacketType = packetTypeForPacket(mutablePacket);
// note: PacketType_OCTREE_STATS can have PacketType_VOXEL_DATA
// immediately following them inside the same packet. So, we process the PacketType_OCTREE_STATS first
// then process any remaining bytes as if it was another packet
@ -81,6 +81,7 @@ void VoxelPacketProcessor::processPacket(const SharedNodePointer& sendingNode, c
if (Menu::getInstance()->isOptionChecked(MenuOption::Voxels)) {
app->trackIncomingVoxelPacket(mutablePacket, sendingNode, wasStatsPacket);
if (sendingNode) {

View file

@ -545,8 +545,8 @@ bool ModelTree::hasModelsDeletedSince(quint64 sinceTime) {
}
// sinceTime is an in/out parameter - it will be side effected with the last time sent out
bool ModelTree::encodeModelsDeletedSince(quint64& sinceTime, unsigned char* outputBuffer, size_t maxLength,
size_t& outputLength) {
bool ModelTree::encodeModelsDeletedSince(OCTREE_PACKET_SEQUENCE sequenceNumber, quint64& sinceTime, unsigned char* outputBuffer,
size_t maxLength, size_t& outputLength) {
bool hasMoreToSend = true;
@ -555,6 +555,26 @@ bool ModelTree::encodeModelsDeletedSince(quint64& sinceTime, unsigned char* outp
copyAt += numBytesPacketHeader;
outputLength = numBytesPacketHeader;
// pack in flags
OCTREE_PACKET_FLAGS flags = 0;
OCTREE_PACKET_FLAGS* flagsAt = (OCTREE_PACKET_FLAGS*)copyAt;
*flagsAt = flags;
copyAt += sizeof(OCTREE_PACKET_FLAGS);
outputLength += sizeof(OCTREE_PACKET_FLAGS);
// pack in sequence number
OCTREE_PACKET_SEQUENCE* sequenceAt = (OCTREE_PACKET_SEQUENCE*)copyAt;
*sequenceAt = sequenceNumber;
copyAt += sizeof(OCTREE_PACKET_SEQUENCE);
outputLength += sizeof(OCTREE_PACKET_SEQUENCE);
// pack in timestamp
OCTREE_PACKET_SENT_TIME now = usecTimestampNow();
OCTREE_PACKET_SENT_TIME* timeAt = (OCTREE_PACKET_SENT_TIME*)copyAt;
*timeAt = now;
copyAt += sizeof(OCTREE_PACKET_SENT_TIME);
outputLength += sizeof(OCTREE_PACKET_SENT_TIME);
uint16_t numberOfIds = 0; // placeholder for now
unsigned char* numberOfIDsAt = copyAt;
memcpy(copyAt, &numberOfIds, sizeof(numberOfIds));
@ -642,6 +662,10 @@ void ModelTree::processEraseMessage(const QByteArray& dataByteArray, const Share
size_t processedBytes = numBytesPacketHeader;
dataAt += numBytesPacketHeader;
dataAt += sizeof(OCTREE_PACKET_FLAGS);
dataAt += sizeof(OCTREE_PACKET_SEQUENCE);
dataAt += sizeof(OCTREE_PACKET_SENT_TIME);
uint16_t numberOfIds = 0; // placeholder for now
memcpy(&numberOfIds, dataAt, sizeof(numberOfIds));
dataAt += sizeof(numberOfIds);

View file

@ -74,7 +74,7 @@ public:
bool hasAnyDeletedModels() const { return _recentlyDeletedModelItemIDs.size() > 0; }
bool hasModelsDeletedSince(quint64 sinceTime);
bool encodeModelsDeletedSince(quint64& sinceTime, unsigned char* packetData, size_t maxLength, size_t& outputLength);
bool encodeModelsDeletedSince(OCTREE_PACKET_SEQUENCE sequenceNumber, quint64& sinceTime, unsigned char* packetData, size_t maxLength, size_t& outputLength);
void forgetModelsDeletedBefore(quint64 sinceTime);
void processEraseMessage(const QByteArray& dataByteArray, const SharedNodePointer& sourceNode);

View file

@ -52,7 +52,7 @@ PacketVersion versionForPacketType(PacketType type) {
case PacketTypeAvatarIdentity:
return 1;
case PacketTypeEnvironmentData:
return 1;
return 2;
case PacketTypeDomainList:
case PacketTypeDomainListRequest:
return 3;
@ -66,8 +66,12 @@ PacketVersion versionForPacketType(PacketType type) {
return 1;
case PacketTypeParticleData:
return 1;
case PacketTypeParticleErase:
return 1;
case PacketTypeModelData:
return 2;
return 2;
case PacketTypeModelErase:
return 1;
default:
return 0;
}

View file

@ -858,7 +858,7 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet,
//bool packetIsCompressed = oneAtBit(flags, PACKET_IS_COMPRESSED_BIT);
OCTREE_PACKET_SENT_TIME arrivedAt = usecTimestampNow();
int flightTime = arrivedAt - sentAt + nodeClockSkewUsec;
qint64 flightTime = arrivedAt - sentAt + nodeClockSkewUsec;
if (wantExtraDebugging) {
qDebug() << "sentAt:" << sentAt << " usecs";
@ -866,7 +866,7 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet,
qDebug() << "nodeClockSkewUsec:" << nodeClockSkewUsec << " usecs";
qDebug() << "flightTime:" << flightTime << " usecs";
}
// Guard against possible corrupted packets... with bad timestamps
const int MAX_RESONABLE_FLIGHT_TIME = 200 * USECS_PER_SECOND; // 200 seconds is more than enough time for a packet to arrive
const int MIN_RESONABLE_FLIGHT_TIME = 0;
@ -985,6 +985,6 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet,
}
}
}
}

View file

@ -511,8 +511,8 @@ bool ParticleTree::hasParticlesDeletedSince(quint64 sinceTime) {
}
// sinceTime is an in/out parameter - it will be side effected with the last time sent out
bool ParticleTree::encodeParticlesDeletedSince(quint64& sinceTime, unsigned char* outputBuffer, size_t maxLength,
size_t& outputLength) {
bool ParticleTree::encodeParticlesDeletedSince(OCTREE_PACKET_SEQUENCE sequenceNumber, quint64& sinceTime, unsigned char* outputBuffer,
size_t maxLength, size_t& outputLength) {
bool hasMoreToSend = true;
@ -521,6 +521,24 @@ bool ParticleTree::encodeParticlesDeletedSince(quint64& sinceTime, unsigned char
copyAt += numBytesPacketHeader;
outputLength = numBytesPacketHeader;
// pack in flags
OCTREE_PACKET_FLAGS flags = 0;
memcpy(copyAt, &flags, sizeof(OCTREE_PACKET_FLAGS));
copyAt += sizeof(OCTREE_PACKET_FLAGS);
outputLength += sizeof(OCTREE_PACKET_FLAGS);
// pack in sequence number
memcpy(copyAt, &sequenceNumber, sizeof(OCTREE_PACKET_SEQUENCE));
copyAt += sizeof(OCTREE_PACKET_SEQUENCE);
outputLength += sizeof(OCTREE_PACKET_SEQUENCE);
// pack in timestamp
OCTREE_PACKET_SENT_TIME now = usecTimestampNow();
memcpy(copyAt, &now, sizeof(OCTREE_PACKET_SENT_TIME));
copyAt += sizeof(OCTREE_PACKET_SENT_TIME);
outputLength += sizeof(OCTREE_PACKET_SENT_TIME);
uint16_t numberOfIds = 0; // placeholder for now
unsigned char* numberOfIDsAt = copyAt;
memcpy(copyAt, &numberOfIds, sizeof(numberOfIds));
@ -609,6 +627,10 @@ void ParticleTree::processEraseMessage(const QByteArray& dataByteArray, const Sh
size_t processedBytes = numBytesPacketHeader;
dataAt += numBytesPacketHeader;
dataAt += sizeof(OCTREE_PACKET_FLAGS);
dataAt += sizeof(OCTREE_PACKET_SEQUENCE);
dataAt += sizeof(OCTREE_PACKET_SENT_TIME);
uint16_t numberOfIds = 0; // placeholder for now
memcpy(&numberOfIds, dataAt, sizeof(numberOfIds));
dataAt += sizeof(numberOfIds);

View file

@ -67,7 +67,7 @@ public:
bool hasAnyDeletedParticles() const { return _recentlyDeletedParticleIDs.size() > 0; }
bool hasParticlesDeletedSince(quint64 sinceTime);
bool encodeParticlesDeletedSince(quint64& sinceTime, unsigned char* packetData, size_t maxLength, size_t& outputLength);
bool encodeParticlesDeletedSince(OCTREE_PACKET_SEQUENCE sequenceNumber, quint64& sinceTime, unsigned char* packetData, size_t maxLength, size_t& outputLength);
void forgetParticlesDeletedBefore(quint64 sinceTime);
void processEraseMessage(const QByteArray& dataByteArray, const SharedNodePointer& sourceNode);