support adjusting particles based on clockSkew

This commit is contained in:
ZappoMan 2013-12-19 14:49:58 -08:00
parent 72649f1ac9
commit d71bc248f4
16 changed files with 80 additions and 21 deletions

View file

@ -56,12 +56,12 @@ void VoxelPacketProcessor::processPacket(const HifiSockAddr& senderSockAddr, uns
if (Menu::getInstance()->isOptionChecked(MenuOption::Voxels)) { if (Menu::getInstance()->isOptionChecked(MenuOption::Voxels)) {
app->trackIncomingVoxelPacket(packetData, messageLength, senderSockAddr, wasStatsPacket); app->trackIncomingVoxelPacket(packetData, messageLength, senderSockAddr, wasStatsPacket);
Node* voxelServer = NodeList::getInstance()->nodeWithAddress(senderSockAddr); Node* serverNode = NodeList::getInstance()->nodeWithAddress(senderSockAddr);
if (voxelServer && *voxelServer->getActiveSocket() == senderSockAddr) { if (serverNode && serverNode->getActiveSocket() && *serverNode->getActiveSocket() == senderSockAddr) {
switch(packetData[0]) { switch(packetData[0]) {
case PACKET_TYPE_PARTICLE_DATA: { case PACKET_TYPE_PARTICLE_DATA: {
app->_particles.processDatagram(QByteArray((char*) packetData, messageLength), senderSockAddr); app->_particles.processDatagram(QByteArray((char*) packetData, messageLength), senderSockAddr, serverNode);
} break; } break;
case PACKET_TYPE_ENVIRONMENT_DATA: { case PACKET_TYPE_ENVIRONMENT_DATA: {
@ -69,7 +69,7 @@ void VoxelPacketProcessor::processPacket(const HifiSockAddr& senderSockAddr, uns
} break; } break;
default : { default : {
app->_voxels.setDataSourceUUID(voxelServer->getUUID()); app->_voxels.setDataSourceUUID(serverNode->getUUID());
app->_voxels.parseData(packetData, messageLength); app->_voxels.parseData(packetData, messageLength);
app->_voxels.setDataSourceUUID(QUuid()); app->_voxels.setDataSourceUUID(QUuid());
} break; } break;

View file

@ -1342,7 +1342,7 @@ bool Octree::readFromSVOFile(const char* fileName) {
fileOk = true; // assume the file is ok fileOk = true; // assume the file is ok
} }
if (fileOk) { if (fileOk) {
ReadBitstreamToTreeParams args(WANT_COLOR, NO_EXISTS_BITS, NULL, 0, wantImportProgress); ReadBitstreamToTreeParams args(WANT_COLOR, NO_EXISTS_BITS, NULL, 0, NULL, wantImportProgress);
readBitstreamToTree(dataAt, dataLength, args); readBitstreamToTree(dataAt, dataLength, args);
} }
delete[] entireFile; delete[] entireFile;
@ -1481,7 +1481,7 @@ void Octree::copyFromTreeIntoSubTree(Octree* sourceTree, OctreeElement* destinat
// ask destination tree to read the bitstream // ask destination tree to read the bitstream
bool wantImportProgress = true; bool wantImportProgress = true;
ReadBitstreamToTreeParams args(WANT_COLOR, NO_EXISTS_BITS, destinationNode, 0, wantImportProgress); ReadBitstreamToTreeParams args(WANT_COLOR, NO_EXISTS_BITS, destinationNode, 0, NULL, wantImportProgress);
readBitstreamToTree(packetData.getUncompressedData(), packetData.getUncompressedSize(), args); readBitstreamToTree(packetData.getUncompressedData(), packetData.getUncompressedSize(), args);
} }
} }

View file

@ -156,6 +156,7 @@ public:
bool includeExistsBits; bool includeExistsBits;
OctreeElement* destinationNode; OctreeElement* destinationNode;
QUuid sourceUUID; QUuid sourceUUID;
Node* sourceNode;
bool wantImportProgress; bool wantImportProgress;
ReadBitstreamToTreeParams( ReadBitstreamToTreeParams(
@ -163,11 +164,13 @@ public:
bool includeExistsBits = WANT_EXISTS_BITS, bool includeExistsBits = WANT_EXISTS_BITS,
OctreeElement* destinationNode = NULL, OctreeElement* destinationNode = NULL,
QUuid sourceUUID = QUuid(), QUuid sourceUUID = QUuid(),
Node* sourceNode = NULL,
bool wantImportProgress = false) : bool wantImportProgress = false) :
includeColor(includeColor), includeColor(includeColor),
includeExistsBits(includeExistsBits), includeExistsBits(includeExistsBits),
destinationNode(destinationNode), destinationNode(destinationNode),
sourceUUID(sourceUUID), sourceUUID(sourceUUID),
sourceNode(sourceNode),
wantImportProgress(wantImportProgress) wantImportProgress(wantImportProgress)
{} {}
}; };

View file

@ -81,7 +81,7 @@ bool OctreeEditPacketSender::serversExist() const {
} }
// This method is called when the edit packet layer has determined that it has a fully formed packet destined for // This method is called when the edit packet layer has determined that it has a fully formed packet destined for
// a known nodeID. However, we also want to handle the case where the // a known nodeID.
void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned char* buffer, ssize_t length) { void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned char* buffer, ssize_t length) {
NodeList* nodeList = NodeList::getInstance(); NodeList* nodeList = NodeList::getInstance();
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
@ -243,6 +243,14 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PACKET_TYPE type, unsigned c
initializePacket(packetBuffer, type); initializePacket(packetBuffer, type);
} }
// This is really the first time we know which server/node this particular edit message
// is going to, so we couldn't adjust for clock skew till now. But here's our chance.
// We call this virtual function that allows our specific type of EditPacketSender to
// fixup the buffer for any clock skew
if (node->getClockSkewUsec() != 0) {
adjustEditPacketForClockSkew(codeColorBuffer, length, node->getClockSkewUsec());
}
memcpy(&packetBuffer._currentBuffer[packetBuffer._currentSize], codeColorBuffer, length); memcpy(&packetBuffer._currentBuffer[packetBuffer._currentSize], codeColorBuffer, length);
packetBuffer._currentSize += length; packetBuffer._currentSize += length;
} }

View file

@ -86,6 +86,7 @@ public:
// you must override these... // you must override these...
virtual unsigned char getMyNodeType() const = 0; virtual unsigned char getMyNodeType() const = 0;
virtual void adjustEditPacketForClockSkew(unsigned char* codeColorBuffer, ssize_t length, int clockSkew) { };
protected: protected:
bool _shouldSend; bool _shouldSend;

View file

@ -26,7 +26,7 @@ void OctreeRenderer::init() {
OctreeRenderer::~OctreeRenderer() { OctreeRenderer::~OctreeRenderer() {
} }
void OctreeRenderer::processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr) { void OctreeRenderer::processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr, Node* sourceNode) {
bool showTimingDetails = false; // Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings); bool showTimingDetails = false; // Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings);
bool extraDebugging = false; // Menu::getInstance()->isOptionChecked(MenuOption::ExtraDebugging) bool extraDebugging = false; // Menu::getInstance()->isOptionChecked(MenuOption::ExtraDebugging)
PerformanceWarning warn(showTimingDetails, "OctreeRenderer::processDatagram()",showTimingDetails); PerformanceWarning warn(showTimingDetails, "OctreeRenderer::processDatagram()",showTimingDetails);
@ -57,7 +57,8 @@ void OctreeRenderer::processDatagram(const QByteArray& dataByteArray, const Hifi
bool packetIsCompressed = oneAtBit(flags, PACKET_IS_COMPRESSED_BIT); bool packetIsCompressed = oneAtBit(flags, PACKET_IS_COMPRESSED_BIT);
OCTREE_PACKET_SENT_TIME arrivedAt = usecTimestampNow(); OCTREE_PACKET_SENT_TIME arrivedAt = usecTimestampNow();
int flightTime = arrivedAt - sentAt; int clockSkew = sourceNode ? sourceNode->getClockSkewUsec() : 0;
int flightTime = arrivedAt - sentAt + clockSkew;
OCTREE_PACKET_INTERNAL_SECTION_SIZE sectionLength = 0; OCTREE_PACKET_INTERNAL_SECTION_SIZE sectionLength = 0;
int dataBytes = packetLength - OCTREE_PACKET_HEADER_SIZE; int dataBytes = packetLength - OCTREE_PACKET_HEADER_SIZE;
@ -88,7 +89,7 @@ void OctreeRenderer::processDatagram(const QByteArray& dataByteArray, const Hifi
if (sectionLength) { if (sectionLength) {
// ask the VoxelTree to read the bitstream into the tree // ask the VoxelTree to read the bitstream into the tree
ReadBitstreamToTreeParams args(packetIsColored ? WANT_COLOR : NO_COLOR, WANT_EXISTS_BITS, NULL, ReadBitstreamToTreeParams args(packetIsColored ? WANT_COLOR : NO_COLOR, WANT_EXISTS_BITS, NULL,
getDataSourceUUID()); getDataSourceUUID(), sourceNode);
_tree->lockForWrite(); _tree->lockForWrite();
OctreePacketData packetData(packetIsCompressed); OctreePacketData packetData(packetIsCompressed);
packetData.loadFinalizedContent(dataAt, sectionLength); packetData.loadFinalizedContent(dataAt, sectionLength);

View file

@ -43,7 +43,7 @@ public:
virtual void renderElement(OctreeElement* element, RenderArgs* args) = 0; virtual void renderElement(OctreeElement* element, RenderArgs* args) = 0;
/// process incoming data /// process incoming data
virtual void processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr); virtual void processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr, Node* sourceNode);
/// initialize and GPU/rendering related resources /// initialize and GPU/rendering related resources
void init(); void init();

View file

@ -12,6 +12,7 @@
#include <RegisteredMetaTypes.h> #include <RegisteredMetaTypes.h>
#include <SharedUtil.h> // usecTimestampNow() #include <SharedUtil.h> // usecTimestampNow()
#include <Octree.h>
#include "Particle.h" #include "Particle.h"
@ -136,6 +137,8 @@ int Particle::expectedEditMessageBytes() {
int Particle::readParticleDataFromBuffer(const unsigned char* data, int bytesLeftToRead, ReadBitstreamToTreeParams& args) { int Particle::readParticleDataFromBuffer(const unsigned char* data, int bytesLeftToRead, ReadBitstreamToTreeParams& args) {
int bytesRead = 0; int bytesRead = 0;
if (bytesLeftToRead >= expectedBytes()) { if (bytesLeftToRead >= expectedBytes()) {
int clockSkew = args.sourceNode ? args.sourceNode->getClockSkewUsec() : 0;
const unsigned char* dataAt = data; const unsigned char* dataAt = data;
// id // id
@ -154,11 +157,13 @@ int Particle::readParticleDataFromBuffer(const unsigned char* data, int bytesLef
memcpy(&_lastUpdated, dataAt, sizeof(_lastUpdated)); memcpy(&_lastUpdated, dataAt, sizeof(_lastUpdated));
dataAt += sizeof(_lastUpdated); dataAt += sizeof(_lastUpdated);
bytesRead += sizeof(_lastUpdated); bytesRead += sizeof(_lastUpdated);
_lastUpdated -= clockSkew;
// _lastEdited // _lastEdited
memcpy(&_lastEdited, dataAt, sizeof(_lastEdited)); memcpy(&_lastEdited, dataAt, sizeof(_lastEdited));
dataAt += sizeof(_lastEdited); dataAt += sizeof(_lastEdited);
bytesRead += sizeof(_lastEdited); bytesRead += sizeof(_lastEdited);
_lastEdited -= clockSkew;
// radius // radius
memcpy(&_radius, dataAt, sizeof(_radius)); memcpy(&_radius, dataAt, sizeof(_radius));
@ -357,7 +362,7 @@ bool Particle::encodeParticleEditMessageDetails(PACKET_TYPE command, int count,
sizeOut += sizeof(details[i].creatorTokenID); sizeOut += sizeof(details[i].creatorTokenID);
} }
// radius // lastEdited
memcpy(copyAt, &details[i].lastEdited, sizeof(details[i].lastEdited)); memcpy(copyAt, &details[i].lastEdited, sizeof(details[i].lastEdited));
copyAt += sizeof(details[i].lastEdited); copyAt += sizeof(details[i].lastEdited);
sizeOut += sizeof(details[i].lastEdited); sizeOut += sizeof(details[i].lastEdited);
@ -420,6 +425,38 @@ bool Particle::encodeParticleEditMessageDetails(PACKET_TYPE command, int count,
return success; return success;
} }
// adjust any internal timestamps to fix clock skew for this server
void Particle::adjustEditPacketForClockSkew(unsigned char* codeColorBuffer, ssize_t length, int clockSkew) {
unsigned char* dataAt = codeColorBuffer;
int octets = numberOfThreeBitSectionsInCode(dataAt);
int lengthOfOctcode = bytesRequiredForCodeLength(octets);
dataAt += lengthOfOctcode;
// id
uint32_t id;
memcpy(&id, dataAt, sizeof(id));
dataAt += sizeof(id);
// special case for handling "new" particles
if (id == NEW_PARTICLE) {
// If this is a NEW_PARTICLE, then we assume that there's an additional uint32_t creatorToken, that
// we want to send back to the creator as an map to the actual id
dataAt += sizeof(uint32_t);
}
// lastEdited
uint64_t lastEditedInLocalTime;
memcpy(&lastEditedInLocalTime, dataAt, sizeof(lastEditedInLocalTime));
uint64_t lastEditedInServerTime = lastEditedInLocalTime + clockSkew;
memcpy(dataAt, &lastEditedInServerTime, sizeof(lastEditedInServerTime));
const bool wantDebug = false;
if (wantDebug) {
qDebug("Particle::adjustEditPacketForClockSkew()...\n");
qDebug(" lastEditedInLocalTime: %llu\n", lastEditedInLocalTime);
qDebug(" clockSkew: %d\n", clockSkew);
qDebug(" lastEditedInServerTime: %llu\n", lastEditedInServerTime);
}
}
void Particle::update() { void Particle::update() {

View file

@ -107,6 +107,8 @@ public:
static bool encodeParticleEditMessageDetails(PACKET_TYPE command, int count, const ParticleDetail* details, static bool encodeParticleEditMessageDetails(PACKET_TYPE command, int count, const ParticleDetail* details,
unsigned char* bufferOut, int sizeIn, int& sizeOut); unsigned char* bufferOut, int sizeIn, int& sizeOut);
static void adjustEditPacketForClockSkew(unsigned char* codeColorBuffer, ssize_t length, int clockSkew);
void update(); void update();
void debugDump() const; void debugDump() const;

View file

@ -37,6 +37,11 @@ void ParticleEditPacketSender::sendEditParticleMessage(PACKET_TYPE type, const P
} }
} }
void ParticleEditPacketSender::adjustEditPacketForClockSkew(unsigned char* codeColorBuffer, ssize_t length, int clockSkew) {
Particle::adjustEditPacketForClockSkew(codeColorBuffer, length, clockSkew);
}
void ParticleEditPacketSender::queueParticleEditMessages(PACKET_TYPE type, int numberOfDetails, ParticleDetail* details) { void ParticleEditPacketSender::queueParticleEditMessages(PACKET_TYPE type, int numberOfDetails, ParticleDetail* details) {
if (!_shouldSend) { if (!_shouldSend) {
return; // bail early return; // bail early

View file

@ -28,7 +28,8 @@ public:
/// which case up to MaxPendingMessages will be buffered and processed when voxel servers are known. /// which case up to MaxPendingMessages will be buffered and processed when voxel servers are known.
void queueParticleEditMessages(PACKET_TYPE type, int numberOfDetails, ParticleDetail* details); void queueParticleEditMessages(PACKET_TYPE type, int numberOfDetails, ParticleDetail* details);
// My server type is the voxel server // My server type is the particle server
virtual unsigned char getMyNodeType() const { return NODE_TYPE_PARTICLE_SERVER; } virtual unsigned char getMyNodeType() const { return NODE_TYPE_PARTICLE_SERVER; }
virtual void adjustEditPacketForClockSkew(unsigned char* codeColorBuffer, ssize_t length, int clockSkew);
}; };
#endif // __shared__ParticleEditPacketSender__ #endif // __shared__ParticleEditPacketSender__

View file

@ -47,7 +47,7 @@ bool ParticleTree::findAndUpdateOperation(OctreeElement* element, void* extraDat
return true; return true;
} }
void ParticleTree::storeParticle(const Particle& particle) { void ParticleTree::storeParticle(const Particle& particle, Node* senderNode) {
// First, look for the existing particle in the tree.. // First, look for the existing particle in the tree..
FindAndUpdateParticleArgs args = { particle, false }; FindAndUpdateParticleArgs args = { particle, false };
recurseTreeWithOperation(findAndUpdateOperation, &args); recurseTreeWithOperation(findAndUpdateOperation, &args);
@ -58,7 +58,7 @@ void ParticleTree::storeParticle(const Particle& particle) {
float size = particle.getRadius(); float size = particle.getRadius();
ParticleTreeElement* element = (ParticleTreeElement*)getOrCreateChildElementAt(position.x, position.y, position.z, size); ParticleTreeElement* element = (ParticleTreeElement*)getOrCreateChildElementAt(position.x, position.y, position.z, size);
element->storeParticle(particle); element->storeParticle(particle, senderNode);
} }
// what else do we need to do here to get reaveraging to work // what else do we need to do here to get reaveraging to work
_isDirty = true; _isDirty = true;
@ -165,7 +165,7 @@ int ParticleTree::processEditPacketData(PACKET_TYPE packetType, unsigned char* p
switch (packetType) { switch (packetType) {
case PACKET_TYPE_PARTICLE_ADD_OR_EDIT: { case PACKET_TYPE_PARTICLE_ADD_OR_EDIT: {
Particle newParticle = Particle::fromEditPacket(editData, maxLength, processedBytes); Particle newParticle = Particle::fromEditPacket(editData, maxLength, processedBytes);
storeParticle(newParticle); storeParticle(newParticle, senderNode);
if (newParticle.isNewlyCreated()) { if (newParticle.isNewlyCreated()) {
notifyNewlyCreatedParticle(newParticle, senderNode); notifyNewlyCreatedParticle(newParticle, senderNode);
} }

View file

@ -44,7 +44,7 @@ public:
virtual void update(); virtual void update();
void storeParticle(const Particle& particle); void storeParticle(const Particle& particle, Node* senderNode = NULL);
const Particle* findClosestParticle(glm::vec3 position, float targetRadius); const Particle* findClosestParticle(glm::vec3 position, float targetRadius);
const Particle* findParticleByID(uint32_t id); const Particle* findParticleByID(uint32_t id);

View file

@ -128,7 +128,7 @@ bool ParticleTreeElement::containsParticle(const Particle& particle) const {
} }
bool ParticleTreeElement::updateParticle(const Particle& particle) { bool ParticleTreeElement::updateParticle(const Particle& particle) {
const bool wantDebug = false; const bool wantDebug = true;
uint16_t numberOfParticles = _particles.size(); uint16_t numberOfParticles = _particles.size();
for (uint16_t i = 0; i < numberOfParticles; i++) { for (uint16_t i = 0; i < numberOfParticles; i++) {
if (_particles[i].getID() == particle.getID()) { if (_particles[i].getID() == particle.getID()) {
@ -230,7 +230,7 @@ bool ParticleTreeElement::collapseChildren() {
} }
void ParticleTreeElement::storeParticle(const Particle& particle) { void ParticleTreeElement::storeParticle(const Particle& particle, Node* senderNode) {
_particles.push_back(particle); _particles.push_back(particle);
markWithChangedTime(); markWithChangedTime();
} }

View file

@ -89,7 +89,7 @@ public:
protected: protected:
void storeParticle(const Particle& particle); void storeParticle(const Particle& particle, Node* senderNode = NULL);
ParticleTree* _myTree; ParticleTree* _myTree;
std::vector<Particle> _particles; std::vector<Particle> _particles;

View file

@ -32,7 +32,8 @@ Node::Node(const QUuid& uuid, char type, const HifiSockAddr& publicSocket, const
_activeSocket(NULL), _activeSocket(NULL),
_bytesReceivedMovingAverage(NULL), _bytesReceivedMovingAverage(NULL),
_linkedData(NULL), _linkedData(NULL),
_isAlive(true) _isAlive(true),
_clockSkewUsec(0)
{ {
pthread_mutex_init(&_mutex, 0); pthread_mutex_init(&_mutex, 0);
} }