more packet creation fixes for entitites

This commit is contained in:
Stephen Birarda 2015-07-08 18:13:06 -07:00
parent ac40790841
commit 5dc09692b4
10 changed files with 77 additions and 81 deletions

View file

@ -21,7 +21,7 @@ const char* MODEL_SERVER_NAME = "Entity";
const char* MODEL_SERVER_LOGGING_TARGET_NAME = "entity-server"; const char* MODEL_SERVER_LOGGING_TARGET_NAME = "entity-server";
const char* LOCAL_MODELS_PERSIST_FILE = "resources/models.svo"; const char* LOCAL_MODELS_PERSIST_FILE = "resources/models.svo";
EntityServer::EntityServer(const QByteArray& packet) EntityServer::EntityServer(const QByteArray& packet)
: OctreeServer(packet), _entitySimulation(NULL) { : OctreeServer(packet), _entitySimulation(NULL) {
// nothing special to do here... // nothing special to do here...
} }
@ -64,7 +64,7 @@ void EntityServer::entityCreated(const EntityItem& newEntity, const SharedNodePo
// EntityServer will use the "special packets" to send list of recently deleted entities // EntityServer will use the "special packets" to send list of recently deleted entities
bool EntityServer::hasSpecialPacketToSend(const SharedNodePointer& node) { bool EntityServer::hasSpecialPacketsToSend(const SharedNodePointer& node) {
bool shouldSendDeletedEntities = false; bool shouldSendDeletedEntities = false;
// check to see if any new entities have been added since we last sent to this node... // check to see if any new entities have been added since we last sent to this node...
@ -79,9 +79,8 @@ bool EntityServer::hasSpecialPacketToSend(const SharedNodePointer& node) {
return shouldSendDeletedEntities; return shouldSendDeletedEntities;
} }
int EntityServer::sendSpecialPacket(const SharedNodePointer& node, OctreeQueryNode* queryNode, int& packetsSent) { int EntityServer::sendSpecialPackets(const SharedNodePointer& node, OctreeQueryNode* queryNode, int& packetsSent) {
unsigned char outputBuffer[MAX_PACKET_SIZE]; int totalBytes = 0;
size_t packetLength = 0;
EntityNodeData* nodeData = static_cast<EntityNodeData*>(node->getLinkedData()); EntityNodeData* nodeData = static_cast<EntityNodeData*>(node->getLinkedData());
if (nodeData) { if (nodeData) {
@ -91,23 +90,25 @@ int EntityServer::sendSpecialPacket(const SharedNodePointer& node, OctreeQueryNo
EntityTree* tree = static_cast<EntityTree*>(_tree); EntityTree* tree = static_cast<EntityTree*>(_tree);
bool hasMoreToSend = true; bool hasMoreToSend = true;
// TODO: is it possible to send too many of these packets? what if you deleted 1,000,000 entities?
packetsSent = 0; packetsSent = 0;
while (hasMoreToSend) {
hasMoreToSend = tree->encodeEntitiesDeletedSince(queryNode->getSequenceNumber(), deletedEntitiesSentAt,
outputBuffer, MAX_PACKET_SIZE, packetLength);
DependencyManager::get<NodeList>()->writeDatagram((char*) outputBuffer, packetLength, while (hasMoreToSend) {
SharedNodePointer(node)); auto specialPacket = tree->encodeEntitiesDeletedSince(queryNode->getSequenceNumber(), deletedEntitiesSentAt,
queryNode->packetSent(outputBuffer, packetLength); hasMoreToSend);
queryNode->packetSent(specialPacket);
totalBytes += specialPacket->getSizeWithHeader();
packetsSent++; packetsSent++;
DependencyManager::get<NodeList>()->sendPacket(std::move(specialPacket), SharedNodePointer(node));
} }
nodeData->setLastDeletedEntitiesSentAt(deletePacketSentAt); nodeData->setLastDeletedEntitiesSentAt(deletePacketSentAt);
} }
// TODO: caller is expecting a packetLength, what if we send more than one packet?? // TODO: caller is expecting a packetLength, what if we send more than one packet??
return packetLength; return totalBytes;
} }
void EntityServer::pruneDeletedEntities() { void EntityServer::pruneDeletedEntities() {
@ -115,7 +116,7 @@ void EntityServer::pruneDeletedEntities() {
if (tree->hasAnyDeletedEntities()) { if (tree->hasAnyDeletedEntities()) {
quint64 earliestLastDeletedEntitiesSent = usecTimestampNow() + 1; // in the future quint64 earliestLastDeletedEntitiesSent = usecTimestampNow() + 1; // in the future
DependencyManager::get<NodeList>()->eachNode([&earliestLastDeletedEntitiesSent](const SharedNodePointer& node) { DependencyManager::get<NodeList>()->eachNode([&earliestLastDeletedEntitiesSent](const SharedNodePointer& node) {
if (node->getLinkedData()) { if (node->getLinkedData()) {
EntityNodeData* nodeData = static_cast<EntityNodeData*>(node->getLinkedData()); EntityNodeData* nodeData = static_cast<EntityNodeData*>(node->getLinkedData());
@ -125,12 +126,12 @@ void EntityServer::pruneDeletedEntities() {
} }
} }
}); });
tree->forgetEntitiesDeletedBefore(earliestLastDeletedEntitiesSent); tree->forgetEntitiesDeletedBefore(earliestLastDeletedEntitiesSent);
} }
} }
void EntityServer::readAdditionalConfiguration(const QJsonObject& settingsSectionObject) { void EntityServer::readAdditionalConfiguration(const QJsonObject& settingsSectionObject) {
bool wantEditLogging = false; bool wantEditLogging = false;
readOptionBool(QString("wantEditLogging"), settingsSectionObject, wantEditLogging); readOptionBool(QString("wantEditLogging"), settingsSectionObject, wantEditLogging);
qDebug("wantEditLogging=%s", debug::valueOf(wantEditLogging)); qDebug("wantEditLogging=%s", debug::valueOf(wantEditLogging));

View file

@ -37,8 +37,8 @@ public:
// subclass may implement these method // subclass may implement these method
virtual void beforeRun(); virtual void beforeRun();
virtual bool hasSpecialPacketToSend(const SharedNodePointer& node); virtual bool hasSpecialPacketsToSend(const SharedNodePointer& node);
virtual int sendSpecialPacket(const SharedNodePointer& node, OctreeQueryNode* queryNode, int& packetsSent); virtual int sendSpecialPackets(const SharedNodePointer& node, OctreeQueryNode* queryNode, int& packetsSent);
virtual void entityCreated(const EntityItem& newEntity, const SharedNodePointer& senderNode); virtual void entityCreated(const EntityItem& newEntity, const SharedNodePointer& senderNode);
virtual void readAdditionalConfiguration(const QJsonObject& settingsSectionObject); virtual void readAdditionalConfiguration(const QJsonObject& settingsSectionObject);

View file

@ -356,7 +356,7 @@ void OctreeQueryNode::dumpOutOfView() {
} }
} }
void OctreeQueryNode::packetSent(const NLPacket& packet) { void OctreeQueryNode::packetSent(const std::unique_ptr<NLPacket>& packet) {
_sentPacketHistory.packetSent(_sequenceNumber, packet); _sentPacketHistory.packetSent(_sequenceNumber, packet);
_sequenceNumber++; _sequenceNumber++;
} }

View file

@ -105,7 +105,7 @@ public:
bool isShuttingDown() const { return _isShuttingDown; } bool isShuttingDown() const { return _isShuttingDown; }
void octreePacketSent() { packetSent(_octreePacket); } void octreePacketSent() { packetSent(_octreePacket); }
void packetSent(const NLPacket& packet); void packetSent(const std::unique_ptr<NLPacket>& packet);
OCTREE_PACKET_SEQUENCE getSequenceNumber() const { return _sequenceNumber; } OCTREE_PACKET_SEQUENCE getSequenceNumber() const { return _sequenceNumber; }

View file

@ -45,13 +45,13 @@ public:
Octree* getOctree() { return _tree; } Octree* getOctree() { return _tree; }
JurisdictionMap* getJurisdiction() { return _jurisdiction; } JurisdictionMap* getJurisdiction() { return _jurisdiction; }
int getPacketsPerClientPerInterval() const { return std::min(_packetsPerClientPerInterval, int getPacketsPerClientPerInterval() const { return std::min(_packetsPerClientPerInterval,
std::max(1, getPacketsTotalPerInterval() / std::max(1, getCurrentClientCount()))); } std::max(1, getPacketsTotalPerInterval() / std::max(1, getCurrentClientCount()))); }
int getPacketsPerClientPerSecond() const { return getPacketsPerClientPerInterval() * INTERVALS_PER_SECOND; } int getPacketsPerClientPerSecond() const { return getPacketsPerClientPerInterval() * INTERVALS_PER_SECOND; }
int getPacketsTotalPerInterval() const { return _packetsTotalPerInterval; } int getPacketsTotalPerInterval() const { return _packetsTotalPerInterval; }
int getPacketsTotalPerSecond() const { return getPacketsTotalPerInterval() * INTERVALS_PER_SECOND; } int getPacketsTotalPerSecond() const { return getPacketsTotalPerInterval() * INTERVALS_PER_SECOND; }
static int getCurrentClientCount() { return _clientCount; } static int getCurrentClientCount() { return _clientCount; }
static void clientConnected() { _clientCount++; } static void clientConnected() { _clientCount++; }
static void clientDisconnected() { _clientCount--; } static void clientDisconnected() { _clientCount--; }
@ -72,8 +72,8 @@ public:
// subclass may implement these method // subclass may implement these method
virtual void beforeRun() { } virtual void beforeRun() { }
virtual bool hasSpecialPacketToSend(const SharedNodePointer& node) { return false; } virtual bool hasSpecialPacketsToSend(const SharedNodePointer& node) { return false; }
virtual int sendSpecialPacket(const SharedNodePointer& node, OctreeQueryNode* queryNode, int& packetsSent) { return 0; } virtual int sendSpecialPackets(const SharedNodePointer& node, OctreeQueryNode* queryNode, int& packetsSent) { return 0; }
static float SKIP_TIME; // use this for trackXXXTime() calls for non-times static float SKIP_TIME; // use this for trackXXXTime() calls for non-times
@ -100,7 +100,7 @@ public:
static void trackProcessWaitTime(float time); static void trackProcessWaitTime(float time);
static float getAverageProcessWaitTime() { return _averageProcessWaitTime.getAverage(); } static float getAverageProcessWaitTime() { return _averageProcessWaitTime.getAverage(); }
// these methods allow us to track which threads got to various states // these methods allow us to track which threads got to various states
static void didProcess(OctreeSendThread* thread); static void didProcess(OctreeSendThread* thread);
static void didPacketDistributor(OctreeSendThread* thread); static void didPacketDistributor(OctreeSendThread* thread);
@ -117,14 +117,14 @@ public:
virtual void aboutToFinish(); virtual void aboutToFinish();
void forceNodeShutdown(SharedNodePointer node); void forceNodeShutdown(SharedNodePointer node);
public slots: public slots:
/// runs the octree server assignment /// runs the octree server assignment
void run(); void run();
void nodeAdded(SharedNodePointer node); void nodeAdded(SharedNodePointer node);
void nodeKilled(SharedNodePointer node); void nodeKilled(SharedNodePointer node);
void sendStatsPacket(); void sendStatsPacket();
void readPendingDatagrams() { }; // this will not be called since our datagram processing thread will handle void readPendingDatagrams() { }; // this will not be called since our datagram processing thread will handle
void readPendingDatagram(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr); void readPendingDatagram(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr);
@ -170,7 +170,7 @@ protected:
JurisdictionSender* _jurisdictionSender; JurisdictionSender* _jurisdictionSender;
OctreeInboundPacketProcessor* _octreeInboundPacketProcessor; OctreeInboundPacketProcessor* _octreeInboundPacketProcessor;
OctreePersistThread* _persistThread; OctreePersistThread* _persistThread;
int _persistInterval; int _persistInterval;
bool _wantBackup; bool _wantBackup;
QString _backupExtensionFormat; QString _backupExtensionFormat;
@ -182,7 +182,7 @@ protected:
time_t _started; time_t _started;
quint64 _startedUSecs; quint64 _startedUSecs;
QString _safeServerName; QString _safeServerName;
static int _clientCount; static int _clientCount;
static SimpleMovingAverage _averageLoopTime; static SimpleMovingAverage _averageLoopTime;

View file

@ -863,7 +863,7 @@ bool EntityItemProperties::encodeEntityEditPacket(PacketType::Value command, Ent
if (success) { if (success) {
packetData->endSubTree(); packetData->endSubTree();
const unsigned char* finalizedData = packetData->getFinalizedData(); const char* finalizedData = reinterpret_cast<const char*>(packetData->getFinalizedData());
int finalizedSize = packetData->getFinalizedSize(); int finalizedSize = packetData->getFinalizedSize();
if (finalizedSize <= buffer.size()) { if (finalizedSize <= buffer.size()) {
@ -1084,7 +1084,7 @@ bool EntityItemProperties::encodeEraseEntityMessage(const EntityItemID& entityIt
int outputLength = 0; int outputLength = 0;
if (buffer.size() < sizeof(numberOfIds) + NUM_BYTES_RFC4122_UUID) { if (buffer.size() < (int) (sizeof(numberOfIds) + NUM_BYTES_RFC4122_UUID)) {
qCDebug(entities) << "ERROR - encodeEraseEntityMessage() called with buffer that is too small!"; qCDebug(entities) << "ERROR - encodeEraseEntityMessage() called with buffer that is too small!";
return false; return false;
} }

View file

@ -760,87 +760,65 @@ bool EntityTree::hasEntitiesDeletedSince(quint64 sinceTime) {
} }
// sinceTime is an in/out parameter - it will be side effected with the last time sent out // sinceTime is an in/out parameter - it will be side effected with the last time sent out
bool EntityTree::encodeEntitiesDeletedSince(OCTREE_PACKET_SEQUENCE sequenceNumber, quint64& sinceTime, unsigned char* outputBuffer, std::unique_ptr<NLPacket> EntityTree::encodeEntitiesDeletedSince(OCTREE_PACKET_SEQUENCE sequenceNumber, quint64& sinceTime,
size_t maxLength, size_t& outputLength) { bool& hasMore) {
bool hasMoreToSend = true;
unsigned char* copyAt = outputBuffer; auto deletesPacket = NLPacket::create(PacketType::EntityErase);
size_t numBytesPacketHeader = DependencyManager::get<NodeList>()->populatePacketHeader(reinterpret_cast<char*>(outputBuffer),
PacketType::EntityErase);
copyAt += numBytesPacketHeader;
outputLength = numBytesPacketHeader;
// pack in flags // pack in flags
OCTREE_PACKET_FLAGS flags = 0; OCTREE_PACKET_FLAGS flags = 0;
OCTREE_PACKET_FLAGS* flagsAt = (OCTREE_PACKET_FLAGS*)copyAt; deletesPacket->writePrimitive(flags);
*flagsAt = flags;
copyAt += sizeof(OCTREE_PACKET_FLAGS);
outputLength += sizeof(OCTREE_PACKET_FLAGS);
// pack in sequence number // pack in sequence number
OCTREE_PACKET_SEQUENCE* sequenceAt = (OCTREE_PACKET_SEQUENCE*)copyAt; deletesPacket->writePrimitive(sequenceNumber);
*sequenceAt = sequenceNumber;
copyAt += sizeof(OCTREE_PACKET_SEQUENCE);
outputLength += sizeof(OCTREE_PACKET_SEQUENCE);
// pack in timestamp // pack in timestamp
OCTREE_PACKET_SENT_TIME now = usecTimestampNow(); OCTREE_PACKET_SENT_TIME now = usecTimestampNow();
OCTREE_PACKET_SENT_TIME* timeAt = (OCTREE_PACKET_SENT_TIME*)copyAt; deletesPacket->writePrimitive(now);
*timeAt = now;
copyAt += sizeof(OCTREE_PACKET_SENT_TIME);
outputLength += sizeof(OCTREE_PACKET_SENT_TIME);
uint16_t numberOfIds = 0; // placeholder for now
unsigned char* numberOfIDsAt = copyAt;
memcpy(copyAt, &numberOfIds, sizeof(numberOfIds));
copyAt += sizeof(numberOfIds);
outputLength += sizeof(numberOfIds);
// we keep a multi map of entity IDs to timestamps, we only want to include the entity IDs that have been // we keep a multi map of entity IDs to timestamps, we only want to include the entity IDs that have been
// deleted since we last sent to this node // deleted since we last sent to this node
_recentlyDeletedEntitiesLock.lockForRead(); _recentlyDeletedEntitiesLock.lockForRead();
QMultiMap<quint64, QUuid>::const_iterator iterator = _recentlyDeletedEntityItemIDs.constBegin(); bool hasFilledPacket = false;
while (iterator != _recentlyDeletedEntityItemIDs.constEnd()) {
QList<QUuid> values = _recentlyDeletedEntityItemIDs.values(iterator.key()); auto it = _recentlyDeletedEntityItemIDs.constBegin();
while (it != _recentlyDeletedEntityItemIDs.constEnd()) {
QList<QUuid> values = _recentlyDeletedEntityItemIDs.values(it.key());
for (int valueItem = 0; valueItem < values.size(); ++valueItem) { for (int valueItem = 0; valueItem < values.size(); ++valueItem) {
// if the timestamp is more recent then out last sent time, include it // if the timestamp is more recent then out last sent time, include it
if (iterator.key() > sinceTime) { if (it.key() > sinceTime) {
QUuid entityID = values.at(valueItem); QUuid entityID = values.at(valueItem);
QByteArray encodedEntityID = entityID.toRfc4122(); deletesPacket->write(entityID.toRfc4122());
memcpy(copyAt, encodedEntityID.constData(), NUM_BYTES_RFC4122_UUID);
copyAt += NUM_BYTES_RFC4122_UUID;
outputLength += NUM_BYTES_RFC4122_UUID;
numberOfIds++;
// check to make sure we have room for one more id... // check to make sure we have room for one more ID
if (outputLength + NUM_BYTES_RFC4122_UUID > maxLength) { if (NUM_BYTES_RFC4122_UUID > deletesPacket->bytesAvailable()) {
hasFilledPacket = true;
break; break;
} }
} }
} }
// check to make sure we have room for one more id... // check to see if we're about to return
if (outputLength + NUM_BYTES_RFC4122_UUID > maxLength) { if (hasFilledPacket) {
// let our caller know how far we got // let our caller know how far we got
sinceTime = iterator.key(); sinceTime = it.key();
break; break;
} }
++iterator;
++it;
} }
// if we got to the end, then we're done sending // if we got to the end, then we're done sending
if (iterator == _recentlyDeletedEntityItemIDs.constEnd()) { if (it == _recentlyDeletedEntityItemIDs.constEnd()) {
hasMoreToSend = false; hasMore = false;
} }
_recentlyDeletedEntitiesLock.unlock(); _recentlyDeletedEntitiesLock.unlock();
// replace the correct count for ids included return std::move(deletesPacket);
memcpy(numberOfIDsAt, &numberOfIds, sizeof(numberOfIds));
return hasMoreToSend;
} }

View file

@ -129,8 +129,8 @@ public:
bool hasAnyDeletedEntities() const { return _recentlyDeletedEntityItemIDs.size() > 0; } bool hasAnyDeletedEntities() const { return _recentlyDeletedEntityItemIDs.size() > 0; }
bool hasEntitiesDeletedSince(quint64 sinceTime); bool hasEntitiesDeletedSince(quint64 sinceTime);
bool encodeEntitiesDeletedSince(OCTREE_PACKET_SEQUENCE sequenceNumber, quint64& sinceTime, std::unique_ptr<NLPacket> encodeEntitiesDeletedSince(OCTREE_PACKET_SEQUENCE sequenceNumber, quint64& sinceTime,
unsigned char* packetData, size_t maxLength, size_t& outputLength); bool& hasMore);
void forgetEntitiesDeletedBefore(quint64 sinceTime); void forgetEntitiesDeletedBefore(quint64 sinceTime);
int processEraseMessage(const QByteArray& dataByteArray, const SharedNodePointer& sourceNode); int processEraseMessage(const QByteArray& dataByteArray, const SharedNodePointer& sourceNode);

View file

@ -32,6 +32,15 @@ template <class T> std::unique_ptr<NLPacket> PacketList<T>::createPacketWithExte
} }
} }
template<typename T> qint64 Packet::readPrimitive(T* data) {
return QIODevice::read(reinterpret_cast<char*>(data), sizeof(T));
}
template<typename T> qint64 Packet::writePrimitive(const T& data) {
static_assert(!std::is_pointer<T>::value, "T must not be a pointer");
return QIODevice::write(reinterpret_cast<const char*>(&data), sizeof(T));
}
template <class T> qint64 PacketList<T>::writeData(const char* data, qint64 maxSize) { template <class T> qint64 PacketList<T>::writeData(const char* data, qint64 maxSize) {
if (!_currentPacket) { if (!_currentPacket) {
// we don't have a current packet, time to set one up // we don't have a current packet, time to set one up

View file

@ -29,13 +29,21 @@ public:
int getNumPackets() const { return _packets.size() + (_currentPacket ? 1 : 0); } int getNumPackets() const { return _packets.size() + (_currentPacket ? 1 : 0); }
void getCurrentPacketCapacity() const { return _currentPacket->bytesAvailable(); }
void closeCurrentPacket(); void closeCurrentPacket();
void setExtendedHeader(const QByteArray& extendedHeader) { _extendedHeader = extendedHeader; } void setExtendedHeader(const QByteArray& extendedHeader) { _extendedHeader = extendedHeader; }
template<typename U> qint64 readPrimitive(U* data);
template<typename U> qint64 writePrimitive(const U& data);
protected: protected:
qint64 writeData(const char* data, qint64 maxSize); qint64 writeData(const char* data, qint64 maxSize);
qint64 readData(char* data, qint64 maxSize) { return 0; } qint64 readData(char* data, qint64 maxSize) { return 0; }
private: private:
PacketList(const PacketList& other) = delete;
PacketList& operator=(const PacketList& other) = delete;
std::unique_ptr<NLPacket> createPacketWithExtendedHeader(); std::unique_ptr<NLPacket> createPacketWithExtendedHeader();
PacketType::Value _packetType; PacketType::Value _packetType;