cleanup edit message memory management in octree packet sending

This commit is contained in:
Stephen Birarda 2015-07-08 14:12:04 -07:00
parent d9e10db011
commit fa71c781f4
8 changed files with 223 additions and 227 deletions

View file

@ -18,11 +18,10 @@
#include "EntityItem.h"
void EntityEditPacketSender::adjustEditPacketForClockSkew(PacketType::Value type,
unsigned char* editBuffer, size_t length, int clockSkew) {
void EntityEditPacketSender::adjustEditPacketForClockSkew(PacketType::Value type, QByteArray& buffer, int clockSkew) {
if (type == PacketType::EntityAdd || type == PacketType::EntityEdit) {
EntityItem::adjustEditPacketForClockSkew(editBuffer, length, clockSkew);
EntityItem::adjustEditPacketForClockSkew(buffer, clockSkew);
}
}
@ -32,17 +31,15 @@ void EntityEditPacketSender::queueEditEntityMessage(PacketType::Value type, Enti
return; // bail early
}
// use MAX_PACKET_SIZE since it's static and guaranteed to be larger than _maxPacketSize
unsigned char bufferOut[MAX_PACKET_SIZE];
int sizeOut = 0;
QByteArray bufferOut(NLPacket::maxPayloadSize(type), 0);
if (EntityItemProperties::encodeEntityEditPacket(type, modelID, properties, &bufferOut[0], _maxPacketSize, sizeOut)) {
if (EntityItemProperties::encodeEntityEditPacket(type, modelID, properties, bufferOut)) {
#ifdef WANT_DEBUG
qCDebug(entities) << "calling queueOctreeEditMessage()...";
qCDebug(entities) << " id:" << modelID;
qCDebug(entities) << " properties:" << properties;
#endif
queueOctreeEditMessage(type, bufferOut, sizeOut);
queueOctreeEditMessage(type, bufferOut);
}
}
@ -50,10 +47,10 @@ void EntityEditPacketSender::queueEraseEntityMessage(const EntityItemID& entityI
if (!_shouldSend) {
return; // bail early
}
// use MAX_PACKET_SIZE since it's static and guaranteed to be larger than _maxPacketSize
unsigned char bufferOut[MAX_PACKET_SIZE];
size_t sizeOut = 0;
if (EntityItemProperties::encodeEraseEntityMessage(entityItemID, &bufferOut[0], _maxPacketSize, sizeOut)) {
queueOctreeEditMessage(PacketType::EntityErase, bufferOut, sizeOut);
QByteArray bufferOut(NLPacket::maxPayloadSize(PacketType::EntityErase), 0);
if (EntityItemProperties::encodeEraseEntityMessage(entityItemID, bufferOut)) {
queueOctreeEditMessage(PacketType::EntityErase, bufferOut);
}
}

View file

@ -30,6 +30,6 @@ public:
// My server type is the model server
virtual char getMyNodeType() const { return NodeType::EntityServer; }
virtual void adjustEditPacketForClockSkew(PacketType::Value type, unsigned char* editBuffer, size_t length, int clockSkew);
virtual void adjustEditPacketForClockSkew(PacketType::Value type, QByteArray& buffer, int clockSkew);
};
#endif // hifi_EntityEditPacketSender_h

View file

@ -688,8 +688,8 @@ void EntityItem::debugDump() const {
}
// adjust any internal timestamps to fix clock skew for this server
void EntityItem::adjustEditPacketForClockSkew(unsigned char* editPacketBuffer, size_t length, int clockSkew) {
unsigned char* dataAt = editPacketBuffer;
void EntityItem::adjustEditPacketForClockSkew(QByteArray& buffer, int clockSkew) {
unsigned char* dataAt = reinterpret_cast<unsigned char*>(buffer.data());
int octets = numberOfThreeBitSectionsInCode(dataAt);
int lengthOfOctcode = bytesRequiredForCodeLength(octets);
dataAt += lengthOfOctcode;

View file

@ -169,7 +169,7 @@ public:
static int expectedBytes();
static void adjustEditPacketForClockSkew(unsigned char* codeColorBuffer, size_t length, int clockSkew);
static void adjustEditPacketForClockSkew(QByteArray& buffer, int clockSkew);
// perform update
virtual void update(const quint64& now) { _lastUpdated = now; }

View file

@ -615,13 +615,12 @@ void EntityItemPropertiesFromScriptValueHonorReadOnly(const QScriptValue &object
// TODO: Implement support for script and visible properties.
//
bool EntityItemProperties::encodeEntityEditPacket(PacketType::Value command, EntityItemID id, const EntityItemProperties& properties,
unsigned char* bufferOut, int sizeIn, int& sizeOut) {
OctreePacketData ourDataPacket(false, sizeIn); // create a packetData object to add out packet details too.
QByteArray& buffer) {
OctreePacketData ourDataPacket(false, buffer.size()); // create a packetData object to add out packet details too.
OctreePacketData* packetData = &ourDataPacket; // we want a pointer to this so we can use our APPEND_ENTITY_PROPERTY macro
bool success = true; // assume the best
OctreeElement::AppendState appendState = OctreeElement::COMPLETED; // assume the best
sizeOut = 0;
// TODO: We need to review how jurisdictions should be handled for entities. (The old Models and Particles code
// didn't do anything special for jurisdictions, so we're keeping that same behavior here.)
@ -863,20 +862,21 @@ bool EntityItemProperties::encodeEntityEditPacket(PacketType::Value command, Ent
if (success) {
packetData->endSubTree();
const unsigned char* finalizedData = packetData->getFinalizedData();
int finalizedSize = packetData->getFinalizedSize();
if (finalizedSize <= sizeIn) {
memcpy(bufferOut, finalizedData, finalizedSize);
sizeOut = finalizedSize;
int finalizedSize = packetData->getFinalizedSize();
if (finalizedSize <= buffer.size()) {
buffer.replace(0, finalizedSize, finalizedData, finalizedSize);
buffer.resize(finalizedSize);
} else {
qCDebug(entities) << "ERROR - encoded edit message doesn't fit in output buffer.";
sizeOut = 0;
success = false;
}
} else {
packetData->discardSubTree();
sizeOut = 0;
}
return success;
}
@ -1077,28 +1077,28 @@ bool EntityItemProperties::decodeEntityEditPacket(const unsigned char* data, int
// NOTE: This version will only encode the portion of the edit message immediately following the
// header it does not include the send times and sequence number because that is handled by the
// edit packet sender...
bool EntityItemProperties::encodeEraseEntityMessage(const EntityItemID& entityItemID,
unsigned char* outputBuffer, size_t maxLength, size_t& outputLength) {
bool EntityItemProperties::encodeEraseEntityMessage(const EntityItemID& entityItemID, QByteArray& buffer) {
unsigned char* copyAt = outputBuffer;
char* copyAt = buffer.data();
uint16_t numberOfIds = 1; // only one entity ID in this message
if (maxLength < sizeof(numberOfIds) + NUM_BYTES_RFC4122_UUID) {
int outputLength = 0;
if (buffer.size() < sizeof(numberOfIds) + NUM_BYTES_RFC4122_UUID) {
qCDebug(entities) << "ERROR - encodeEraseEntityMessage() called with buffer that is too small!";
outputLength = 0;
return false;
}
memcpy(copyAt, &numberOfIds, sizeof(numberOfIds));
copyAt += sizeof(numberOfIds);
outputLength = sizeof(numberOfIds);
QUuid entityID = entityItemID;
QByteArray encodedEntityID = entityID.toRfc4122();
memcpy(copyAt, encodedEntityID.constData(), NUM_BYTES_RFC4122_UUID);
memcpy(copyAt, entityItemID.toRfc4122().constData(), NUM_BYTES_RFC4122_UUID);
copyAt += NUM_BYTES_RFC4122_UUID;
outputLength += NUM_BYTES_RFC4122_UUID;
buffer.resize(outputLength);
return true;
}

View file

@ -175,11 +175,9 @@ public:
void setLocalRenderAlpha(float value) { _localRenderAlpha = value; _localRenderAlphaChanged = true; }
static bool encodeEntityEditPacket(PacketType::Value command, EntityItemID id, const EntityItemProperties& properties,
unsigned char* bufferOut, int sizeIn, int& sizeOut);
static bool encodeEraseEntityMessage(const EntityItemID& entityItemID,
unsigned char* outputBuffer, size_t maxLength, size_t& outputLength);
QByteArray& buffer);
static bool encodeEraseEntityMessage(const EntityItemID& entityItemID, QByteArray& buffer);
static bool decodeEntityEditPacket(const unsigned char* data, int bytesToRead, int& processedBytes,
EntityItemID& entityID, EntityItemProperties& properties);

View file

@ -128,8 +128,8 @@ void OctreeEditPacketSender::processPreServerExistsPackets() {
// Then "process" all the packable messages...
while (!_preServerEdits.empty()) {
EditMessageTuple editMessage = std::move(_preServerEdits.front());
queueOctreeEditMessage(std::get<0>(editMessage), std::get<1>(editMessage), std::get<2>(editMessage));
EditMessagePair& editMessage = _preServerEdits.front();
queueOctreeEditMessage(editMessage.first, editMessage.second);
_preServerEdits.pop_front();
}
@ -194,8 +194,8 @@ void OctreeEditPacketSender::queuePacketToNodes(std::unique_ptr<NLPacket> packet
}
// NOTE: editPacketBuffer - is JUST the octcode/color and does not contain the packet header!
void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsigned char* editPacketBuffer, size_t length) {
// NOTE: editMessage - is JUST the octcode/color and does not contain the packet header
void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, QByteArray& editMessage) {
if (!_shouldSend) {
return; // bail early
@ -205,9 +205,10 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi
// jurisdictions for processing
if (!serversExist()) {
if (_maxPendingMessages > 0) {
EditMessageTuple messageTuple { type, editPacketBuffer, length };
EditMessagePair messagePair { type, QByteArray(editMessage) };
_pendingPacketsLock.lock();
_preServerEdits.push_back(messageTuple);
_preServerEdits.push_back(messagePair);
// if we've saved MORE than out max, then clear out the oldest packet...
int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerEdits.size();
@ -239,7 +240,8 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi
_serverJurisdictions->lockForRead();
if ((*_serverJurisdictions).find(nodeUUID) != (*_serverJurisdictions).end()) {
const JurisdictionMap& map = (*_serverJurisdictions)[nodeUUID];
isMyJurisdiction = (map.isMyJurisdiction(editPacketBuffer, CHECK_NODE_ONLY) == JurisdictionMap::WITHIN);
isMyJurisdiction = (map.isMyJurisdiction(reinterpret_cast<const unsigned char*>(editMessage.data()),
CHECK_NODE_ONLY) == JurisdictionMap::WITHIN);
} else {
isMyJurisdiction = false;
}
@ -253,7 +255,7 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi
} else {
// If we're switching type, then we send the last one and start over
if ((type != bufferedPacket->readType() && bufferedPacket->getSizeUsed() > 0) ||
(length >= bufferedPacket->bytesAvailable())) {
(editMessage.size() >= bufferedPacket->bytesAvailable())) {
// create the new packet and swap it with the packet in _pendingEditPackets
auto packetToRelease = initializePacket(type, node->getClockSkewUsec());
@ -269,10 +271,10 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi
// We call this virtual function that allows our specific type of EditPacketSender to
// fixup the buffer for any clock skew
if (node->getClockSkewUsec() != 0) {
adjustEditPacketForClockSkew(type, editPacketBuffer, length, node->getClockSkewUsec());
adjustEditPacketForClockSkew(type, editMessage, node->getClockSkewUsec());
}
bufferedPacket->write(reinterpret_cast<char*>(editPacketBuffer), length);
bufferedPacket->write(editMessage);
}
}
});

View file

@ -29,7 +29,7 @@ public:
/// Queues a single edit message. Will potentially send a pending multi-command packet. Determines which server
/// node or nodes the packet should be sent to. Can be called even before servers are known, in which case up to
/// MaxPendingMessages will be buffered and processed when servers are known.
void queueOctreeEditMessage(PacketType::Value type, unsigned char* editPacketBuffer, size_t length);
void queueOctreeEditMessage(PacketType::Value type, QByteArray& editMessage);
/// Releases all queued messages even if those messages haven't filled an MTU packet. This will move the packed message
/// packets onto the send queue. If running in threaded mode, the caller does not need to do any further processing to
@ -74,8 +74,7 @@ public:
// you must override these...
virtual char getMyNodeType() const = 0;
virtual void adjustEditPacketForClockSkew(PacketType::Value type,
unsigned char* editPacketBuffer, size_t length, int clockSkew) { }
virtual void adjustEditPacketForClockSkew(PacketType::Value type, QByteArray& buffer, int clockSkew) { }
void processNackPacket(const QByteArray& packet);
@ -83,7 +82,7 @@ public slots:
void nodeKilled(SharedNodePointer node);
protected:
using EditMessageTuple = std::tuple<PacketType::Value, unsigned char*, int>;
using EditMessagePair = std::pair<PacketType::Value, QByteArray>;
bool _shouldSend;
void queuePacketToNode(const QUuid& nodeID, std::unique_ptr<NLPacket> packet);
@ -102,7 +101,7 @@ protected:
bool _releaseQueuedMessagesPending;
QMutex _pendingPacketsLock;
QMutex _packetsQueueLock; // don't let different threads release the queue while another thread is writing to it
std::list<EditMessageTuple> _preServerEdits; // these will get packed into other larger packets
std::list<EditMessagePair> _preServerEdits; // these will get packed into other larger packets
std::list<std::unique_ptr<NLPacket>> _preServerSingleMessagePackets; // these will go out as is
NodeToJurisdictionMap* _serverJurisdictions;