added flag,seq,ts to ParticleErase and ModelErase

... this fixes unreasonable flight-time issue
This commit is contained in:
wangyix 2014-05-28 17:20:10 -07:00
parent f361b9a8a7
commit c799077379
18 changed files with 106 additions and 52 deletions

View file

@ -86,7 +86,7 @@ bool ModelServer::hasSpecialPacketToSend(const SharedNodePointer& node) {
return shouldSendDeletedModels;
}
int ModelServer::sendSpecialPacket(const SharedNodePointer& node) {
int ModelServer::sendSpecialPacket(OCTREE_PACKET_SEQUENCE& sequenceNumber, const SharedNodePointer& node) {
unsigned char outputBuffer[MAX_PACKET_SIZE];
size_t packetLength = 0;
@ -100,12 +100,13 @@ int ModelServer::sendSpecialPacket(const SharedNodePointer& node) {
// TODO: is it possible to send too many of these packets? what if you deleted 1,000,000 models?
while (hasMoreToSend) {
hasMoreToSend = tree->encodeModelsDeletedSince(deletedModelsSentAt,
hasMoreToSend = tree->encodeModelsDeletedSince(sequenceNumber, deletedModelsSentAt,
outputBuffer, MAX_PACKET_SIZE, packetLength);
//qDebug() << "sending PacketType_MODEL_ERASE packetLength:" << packetLength;
NodeList::getInstance()->writeDatagram((char*) outputBuffer, packetLength, SharedNodePointer(node));
sequenceNumber++;
}
nodeData->setLastDeletedModelsSentAt(deletePacketSentAt);

View file

@ -37,7 +37,7 @@ public:
// subclass may implement these method
virtual void beforeRun();
virtual bool hasSpecialPacketToSend(const SharedNodePointer& node);
virtual int sendSpecialPacket(const SharedNodePointer& node);
virtual int sendSpecialPacket(OCTREE_PACKET_SEQUENCE& sequenceNumber, const SharedNodePointer& node);
virtual void modelCreated(const ModelItem& newModel, const SharedNodePointer& senderNode);

View file

@ -38,7 +38,7 @@ OctreeQueryNode::OctreeQueryNode() :
_lastClientOctreeSizeScale(DEFAULT_OCTREE_SIZE_SCALE),
_lodChanged(false),
_lodInitialized(false),
_sequenceNumber(0),
//_sequenceNumber(0),
_lastRootTimestamp(0),
_myPacketType(PacketTypeUnknown),
_isShuttingDown(false)
@ -158,11 +158,11 @@ bool OctreeQueryNode::shouldSuppressDuplicatePacket() {
void OctreeQueryNode::init() {
_myPacketType = getMyPacketType();
resetOctreePacket(); // don't bump sequence
resetOctreePacket(0); // don't bump sequence
}
void OctreeQueryNode::resetOctreePacket() {
void OctreeQueryNode::resetOctreePacket(OCTREE_PACKET_SEQUENCE sequenceNumber) {
// if shutting down, return immediately
if (_isShuttingDown) {
return;
@ -200,7 +200,7 @@ void OctreeQueryNode::resetOctreePacket() {
// pack in sequence number
OCTREE_PACKET_SEQUENCE* sequenceAt = (OCTREE_PACKET_SEQUENCE*)_octreePacketAt;
*sequenceAt = _sequenceNumber;
*sequenceAt = sequenceNumber;
_octreePacketAt += sizeof(OCTREE_PACKET_SEQUENCE);
_octreePacketAvailableBytes -= sizeof(OCTREE_PACKET_SEQUENCE);
@ -362,7 +362,8 @@ void OctreeQueryNode::dumpOutOfView() {
}
}
}
/*
void OctreeQueryNode::incrementSequenceNumber() {
_sequenceNumber++;
}
*/

View file

@ -35,7 +35,7 @@ public:
void init(); // called after creation to set up some virtual items
virtual PacketType getMyPacketType() const = 0;
void resetOctreePacket(); // resets octree packet to after "V" header
void resetOctreePacket(OCTREE_PACKET_SEQUENCE sequenceNumber); // resets octree packet to after "V" header
void writeToPacket(const unsigned char* buffer, unsigned int bytes); // writes to end of packet
@ -100,7 +100,7 @@ public:
void forceNodeShutdown();
bool isShuttingDown() const { return _isShuttingDown; }
void incrementSequenceNumber();
//void incrementSequenceNumber();
private slots:
void sendThreadFinished();
@ -138,7 +138,7 @@ private:
bool _lodChanged;
bool _lodInitialized;
OCTREE_PACKET_SEQUENCE _sequenceNumber;
//OCTREE_PACKET_SEQUENCE _sequenceNumber;
quint64 _lastRootTimestamp;

View file

@ -28,7 +28,8 @@ OctreeSendThread::OctreeSendThread(const SharedAssignmentPointer& myAssignment,
_nodeUUID(node->getUUID()),
_packetData(),
_nodeMissingCount(0),
_isShuttingDown(false)
_isShuttingDown(false),
_sequenceNumber(0)
{
QString safeServerName("Octree");
if (_myServer) {
@ -137,7 +138,7 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
// obscure the packet and not send it. This allows the callers and upper level logic to not need to know about
// this rate control savings.
if (nodeData->shouldSuppressDuplicatePacket()) {
nodeData->resetOctreePacket(); // we still need to reset it though!
nodeData->resetOctreePacket(_sequenceNumber); // we still need to reset it though!
return packetsSent; // without sending...
}
@ -163,11 +164,10 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
int piggyBackSize = nodeData->getPacketLength() + statsMessageLength;
// If the size of the stats message and the voxel message will fit in a packet, then piggyback them
if (piggyBackSize < MAX_PACKET_SIZE) {
if (piggyBackSize < MAX_PACKET_SIZE && false) {
// copy voxel message to back of stats message
memcpy(statsMessage + statsMessageLength, nodeData->getPacket(), nodeData->getPacketLength());
//memcpy(statsMessage + statsMessageLength, messageData, nodeData->getPacketLength());
statsMessageLength += nodeData->getPacketLength();
// since a stats message is only included on end of scene, don't consider any of these bytes "wasted", since
@ -210,7 +210,6 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
OctreeServer::didCallWriteDatagram(this);
NodeList::getInstance()->writeDatagram((char*) nodeData->getPacket(), nodeData->getPacketLength(), _node);
//NodeList::getInstance()->writeDatagram((char*)messageData, nodeData->getPacketLength(), _node);
packetSent = true;
@ -231,7 +230,6 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
// just send the voxel packet
OctreeServer::didCallWriteDatagram(this);
NodeList::getInstance()->writeDatagram((char*) nodeData->getPacket(), nodeData->getPacketLength(), _node);
//NodeList::getInstance()->writeDatagram((char*)messageData, nodeData->getPacketLength(), _node);
packetSent = true;
int thisWastedBytes = MAX_PACKET_SIZE - nodeData->getPacketLength();
@ -251,8 +249,8 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
trueBytesSent += nodeData->getPacketLength();
truePacketsSent++;
packetsSent++;
nodeData->incrementSequenceNumber();
nodeData->resetOctreePacket();
_sequenceNumber++;
nodeData->resetOctreePacket(_sequenceNumber);
}
return packetsSent;
}
@ -291,7 +289,7 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
if (nodeData->isPacketWaiting()) {
packetsSentThisInterval += handlePacketSend(nodeData, trueBytesSent, truePacketsSent);
} else {
nodeData->resetOctreePacket();
nodeData->resetOctreePacket(_sequenceNumber);
}
int targetSize = MAX_OCTREE_PACKET_DATA_SIZE;
if (wantCompression) {
@ -537,7 +535,9 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
// send the environment packet
// TODO: should we turn this into a while loop to better handle sending multiple special packets
if (_myServer->hasSpecialPacketToSend(_node) && !nodeData->isShuttingDown()) {
trueBytesSent += _myServer->sendSpecialPacket(_node);
qDebug() << "sending special packet...";
trueBytesSent += _myServer->sendSpecialPacket(_sequenceNumber, _node);
nodeData->resetOctreePacket(_sequenceNumber); // because _sequenceNumber has changed
truePacketsSent++;
packetsSentThisInterval++;
}

View file

@ -55,6 +55,8 @@ private:
int _nodeMissingCount;
bool _isShuttingDown;
OCTREE_PACKET_SEQUENCE _sequenceNumber;
};
#endif // hifi_OctreeSendThread_h

View file

@ -72,7 +72,7 @@ public:
// subclass may implement these method
virtual void beforeRun() { };
virtual bool hasSpecialPacketToSend(const SharedNodePointer& node) { return false; }
virtual int sendSpecialPacket(const SharedNodePointer& node) { return 0; }
virtual int sendSpecialPacket(OCTREE_PACKET_SEQUENCE& sequenceNumber, const SharedNodePointer& node) { return 0; }
static void attachQueryNodeToNode(Node* newNode);

View file

@ -86,7 +86,7 @@ bool ParticleServer::hasSpecialPacketToSend(const SharedNodePointer& node) {
return shouldSendDeletedParticles;
}
int ParticleServer::sendSpecialPacket(const SharedNodePointer& node) {
int ParticleServer::sendSpecialPacket(OCTREE_PACKET_SEQUENCE& sequenceNumber, const SharedNodePointer& node) {
unsigned char outputBuffer[MAX_PACKET_SIZE];
size_t packetLength = 0;
@ -100,12 +100,13 @@ int ParticleServer::sendSpecialPacket(const SharedNodePointer& node) {
// TODO: is it possible to send too many of these packets? what if you deleted 1,000,000 particles?
while (hasMoreToSend) {
hasMoreToSend = tree->encodeParticlesDeletedSince(deletedParticlesSentAt,
hasMoreToSend = tree->encodeParticlesDeletedSince(sequenceNumber, deletedParticlesSentAt,
outputBuffer, MAX_PACKET_SIZE, packetLength);
//qDebug() << "sending PacketType_PARTICLE_ERASE packetLength:" << packetLength;
NodeList::getInstance()->writeDatagram((char*) outputBuffer, packetLength, SharedNodePointer(node));
sequenceNumber++;
}
nodeData->setLastDeletedParticlesSentAt(deletePacketSentAt);

View file

@ -37,7 +37,7 @@ public:
// subclass may implement these method
virtual void beforeRun();
virtual bool hasSpecialPacketToSend(const SharedNodePointer& node);
virtual int sendSpecialPacket(const SharedNodePointer& node);
virtual int sendSpecialPacket(OCTREE_PACKET_SEQUENCE& sequenceNumber, const SharedNodePointer& node);
virtual void particleCreated(const Particle& newParticle, const SharedNodePointer& senderNode);

View file

@ -40,7 +40,10 @@ bool VoxelServer::hasSpecialPacketToSend(const SharedNodePointer& node) {
return shouldSendEnvironments;
}
int VoxelServer::sendSpecialPacket(const SharedNodePointer& node) {
int VoxelServer::sendSpecialPacket(OCTREE_PACKET_SEQUENCE& sequence, const SharedNodePointer& node) {
// TODO: add flags, seq, timestamp to packet
int numBytesPacketHeader = populatePacketHeader(reinterpret_cast<char*>(_tempOutputBuffer), PacketTypeEnvironmentData);
int envPacketLength = numBytesPacketHeader;
int environmentsToSend = getSendMinimalEnvironment() ? 1 : getEnvironmentDataCount();

View file

@ -46,7 +46,7 @@ public:
// subclass may implement these method
virtual void beforeRun();
virtual bool hasSpecialPacketToSend(const SharedNodePointer& node);
virtual int sendSpecialPacket(const SharedNodePointer& node);
virtual int sendSpecialPacket(OCTREE_PACKET_SEQUENCE& sequence, const SharedNodePointer& node);
private:
bool _sendEnvironments;

View file

@ -81,7 +81,7 @@ void VoxelPacketProcessor::processPacket(const SharedNodePointer& sendingNode, c
if (Menu::getInstance()->isOptionChecked(MenuOption::Voxels)) {
if (voxelPacketType != PacketTypeParticleErase)
//if (voxelPacketType != PacketTypeParticleErase)
app->trackIncomingVoxelPacket(mutablePacket, sendingNode, wasStatsPacket);
if (sendingNode) {

View file

@ -545,8 +545,8 @@ bool ModelTree::hasModelsDeletedSince(quint64 sinceTime) {
}
// sinceTime is an in/out parameter - it will be side effected with the last time sent out
bool ModelTree::encodeModelsDeletedSince(quint64& sinceTime, unsigned char* outputBuffer, size_t maxLength,
size_t& outputLength) {
bool ModelTree::encodeModelsDeletedSince(OCTREE_PACKET_SEQUENCE sequenceNumber, quint64& sinceTime, unsigned char* outputBuffer,
size_t maxLength, size_t& outputLength) {
bool hasMoreToSend = true;
@ -555,6 +555,26 @@ bool ModelTree::encodeModelsDeletedSince(quint64& sinceTime, unsigned char* outp
copyAt += numBytesPacketHeader;
outputLength = numBytesPacketHeader;
// pack in flags
OCTREE_PACKET_FLAGS flags = 0;
OCTREE_PACKET_FLAGS* flagsAt = (OCTREE_PACKET_FLAGS*)copyAt;
*flagsAt = flags;
copyAt += sizeof(OCTREE_PACKET_FLAGS);
outputLength += sizeof(OCTREE_PACKET_FLAGS);
// pack in sequence number
OCTREE_PACKET_SEQUENCE* sequenceAt = (OCTREE_PACKET_SEQUENCE*)copyAt;
*sequenceAt = sequenceNumber;
copyAt += sizeof(OCTREE_PACKET_SEQUENCE);
outputLength += sizeof(OCTREE_PACKET_SEQUENCE);
// pack in timestamp
OCTREE_PACKET_SENT_TIME now = usecTimestampNow();
OCTREE_PACKET_SENT_TIME* timeAt = (OCTREE_PACKET_SENT_TIME*)copyAt;
*timeAt = now;
copyAt += sizeof(OCTREE_PACKET_SENT_TIME);
outputLength += sizeof(OCTREE_PACKET_SENT_TIME);
uint16_t numberOfIds = 0; // placeholder for now
unsigned char* numberOfIDsAt = copyAt;
memcpy(copyAt, &numberOfIds, sizeof(numberOfIds));
@ -642,8 +662,13 @@ void ModelTree::processEraseMessage(const QByteArray& dataByteArray, const Share
size_t processedBytes = numBytesPacketHeader;
dataAt += numBytesPacketHeader;
dataAt += sizeof(OCTREE_PACKET_FLAGS);
dataAt += sizeof(OCTREE_PACKET_SEQUENCE);
dataAt += sizeof(OCTREE_PACKET_SENT_TIME);
uint16_t numberOfIds = 0; // placeholder for now
memcpy(&numberOfIds, dataAt, sizeof(numberOfIds));
qDebug() << "\t\t\t numberOfIds: " << numberOfIds;
dataAt += sizeof(numberOfIds);
processedBytes += sizeof(numberOfIds);

View file

@ -69,7 +69,7 @@ public:
bool hasAnyDeletedModels() const { return _recentlyDeletedModelItemIDs.size() > 0; }
bool hasModelsDeletedSince(quint64 sinceTime);
bool encodeModelsDeletedSince(quint64& sinceTime, unsigned char* packetData, size_t maxLength, size_t& outputLength);
bool encodeModelsDeletedSince(OCTREE_PACKET_SEQUENCE sequenceNumber, quint64& sinceTime, unsigned char* packetData, size_t maxLength, size_t& outputLength);
void forgetModelsDeletedBefore(quint64 sinceTime);
void processEraseMessage(const QByteArray& dataByteArray, const SharedNodePointer& sourceNode);

View file

@ -268,21 +268,22 @@ qint64 LimitedNodeList::writeUnverifiedDatagram(const QByteArray& datagram, cons
qint64 LimitedNodeList::writeDatagram(const char* data, qint64 size, const SharedNodePointer& destinationNode,
const HifiSockAddr& overridenSockAddr) {
/*
QByteArray datagram(data, size);
qDebug() << "\t writeDatagram()...";
int numBytesPacketHeader = numBytesForPacketHeader(datagram);
const unsigned char* dataAt = reinterpret_cast<const unsigned char*>(datagram.data()) + numBytesPacketHeader;
PacketType type = packetTypeForPacket(datagram);
if (type != PacketType::PacketTypeParticleErase) {
qDebug() << "\t\t type: " << (unsigned char)type;
qDebug() << "\t\t UUID: " << uuidFromPacketHeader(datagram);
qDebug() << "\t\t MD5: " << hashFromPacketHeader(datagram);
qDebug() << "\t\t type: " << (unsigned char)type;
int numBytesPacketHeader = numBytesForPacketHeader(datagram);
const unsigned char* dataAt = reinterpret_cast<const unsigned char*>(datagram.data()) + numBytesPacketHeader;
if (type != PacketTypeOctreeStats) {
//qDebug() << "\t\t UUID: " << uuidFromPacketHeader(datagram);
//qDebug() << "\t\t MD5: " << hashFromPacketHeader(datagram);
unsigned char flags = (*(unsigned char*)(dataAt));
dataAt += sizeof(unsigned char);
@ -296,15 +297,12 @@ qint64 LimitedNodeList::writeDatagram(const char* data, qint64 size, const Share
dataAt += sizeof(quint64);
qDebug() << "\t\t sent at: " << QString::number(sentAt, 16) << "\n";
}
else {
qDebug() << "size: " << size;
const char* dataAt = data;
for (int i = 0; i < size; i++) {
unsigned char byte = *((unsigned char*)dataAt);
dataAt += sizeof(unsigned char);
qDebug() << "\t\t " << QString::number(byte, 16);
}
}*/
if (type == PacketTypeParticleErase || type==PacketTypeModelErase) {
uint16_t ids = *((uint16_t*)dataAt);
dataAt += sizeof(uint16_t);
qDebug() << "\t\t\t ids: " << ids;
}
return writeDatagram(QByteArray(data, size), destinationNode, overridenSockAddr);
}

View file

@ -868,7 +868,7 @@ bool OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet,
}
//qDebug() << "\t" << QString::number(sequence, 16) << "\t sentAt:" << QString::number(sentAt, 16) << " usecs";
qDebug() << "\t" << QString::number(sequence, 16) << "\t sentAt:" << QString::number(sentAt, 16) << " usecs";
// Guard against possible corrupted packets... with bad timestamps
const int MAX_RESONABLE_FLIGHT_TIME = 200 * USECS_PER_SECOND; // 200 seconds is more than enough time for a packet to arrive

View file

@ -511,8 +511,8 @@ bool ParticleTree::hasParticlesDeletedSince(quint64 sinceTime) {
}
// sinceTime is an in/out parameter - it will be side effected with the last time sent out
bool ParticleTree::encodeParticlesDeletedSince(quint64& sinceTime, unsigned char* outputBuffer, size_t maxLength,
size_t& outputLength) {
bool ParticleTree::encodeParticlesDeletedSince(OCTREE_PACKET_SEQUENCE sequenceNumber, quint64& sinceTime, unsigned char* outputBuffer,
size_t maxLength, size_t& outputLength) {
bool hasMoreToSend = true;
@ -521,6 +521,24 @@ bool ParticleTree::encodeParticlesDeletedSince(quint64& sinceTime, unsigned char
copyAt += numBytesPacketHeader;
outputLength = numBytesPacketHeader;
// pack in flags
OCTREE_PACKET_FLAGS flags = 0;
memcpy(copyAt, &flags, sizeof(OCTREE_PACKET_FLAGS));
copyAt += sizeof(OCTREE_PACKET_FLAGS);
outputLength += sizeof(OCTREE_PACKET_FLAGS);
// pack in sequence number
memcpy(copyAt, &sequenceNumber, sizeof(OCTREE_PACKET_SEQUENCE));
copyAt += sizeof(OCTREE_PACKET_SEQUENCE);
outputLength += sizeof(OCTREE_PACKET_SEQUENCE);
// pack in timestamp
OCTREE_PACKET_SENT_TIME now = usecTimestampNow();
memcpy(copyAt, &now, sizeof(OCTREE_PACKET_SENT_TIME));
copyAt += sizeof(OCTREE_PACKET_SENT_TIME);
outputLength += sizeof(OCTREE_PACKET_SENT_TIME);
uint16_t numberOfIds = 0; // placeholder for now
unsigned char* numberOfIDsAt = copyAt;
memcpy(copyAt, &numberOfIds, sizeof(numberOfIds));
@ -609,8 +627,13 @@ void ParticleTree::processEraseMessage(const QByteArray& dataByteArray, const Sh
size_t processedBytes = numBytesPacketHeader;
dataAt += numBytesPacketHeader;
dataAt += sizeof(OCTREE_PACKET_FLAGS);
dataAt += sizeof(OCTREE_PACKET_SEQUENCE);
dataAt += sizeof(OCTREE_PACKET_SENT_TIME);
uint16_t numberOfIds = 0; // placeholder for now
memcpy(&numberOfIds, dataAt, sizeof(numberOfIds));
qDebug() << "\t\t\t numberOfIds: " << numberOfIds;
dataAt += sizeof(numberOfIds);
processedBytes += sizeof(numberOfIds);

View file

@ -67,7 +67,7 @@ public:
bool hasAnyDeletedParticles() const { return _recentlyDeletedParticleIDs.size() > 0; }
bool hasParticlesDeletedSince(quint64 sinceTime);
bool encodeParticlesDeletedSince(quint64& sinceTime, unsigned char* packetData, size_t maxLength, size_t& outputLength);
bool encodeParticlesDeletedSince(OCTREE_PACKET_SEQUENCE sequenceNumber, quint64& sinceTime, unsigned char* packetData, size_t maxLength, size_t& outputLength);
void forgetParticlesDeletedBefore(quint64 sinceTime);
void processEraseMessage(const QByteArray& dataByteArray, const SharedNodePointer& sourceNode);