make pending packets thread safe

This commit is contained in:
ZappoMan 2014-02-10 12:48:45 -08:00
parent f8a24a2e99
commit 4bb7bb2b77
2 changed files with 13 additions and 14 deletions

View file

@ -39,6 +39,7 @@ OctreeEditPacketSender::OctreeEditPacketSender() :
}
OctreeEditPacketSender::~OctreeEditPacketSender() {
_pendingPacketsLock.lock();
while (!_preServerSingleMessagePackets.empty()) {
EditPacketBuffer* packet = _preServerSingleMessagePackets.front();
delete packet;
@ -49,6 +50,7 @@ OctreeEditPacketSender::~OctreeEditPacketSender() {
delete packet;
_preServerPackets.erase(_preServerPackets.begin());
}
_pendingPacketsLock.unlock();
//printf("OctreeEditPacketSender::~OctreeEditPacketSender() [%p] destroyed... \n", this);
}
@ -115,6 +117,7 @@ void OctreeEditPacketSender::processPreServerExistsPackets() {
assert(serversExist()); // we should only be here if we have jurisdictions
// First send out all the single message packets...
_pendingPacketsLock.lock();
while (!_preServerSingleMessagePackets.empty()) {
EditPacketBuffer* packet = _preServerSingleMessagePackets.front();
queuePacketToNodes(&packet->_currentBuffer[0], packet->_currentSize);
@ -129,6 +132,7 @@ void OctreeEditPacketSender::processPreServerExistsPackets() {
delete packet;
_preServerPackets.erase(_preServerPackets.begin());
}
_pendingPacketsLock.unlock();
// if while waiting for the jurisdictions the caller called releaseQueuedMessages()
// then we want to honor that request now.
@ -140,8 +144,10 @@ void OctreeEditPacketSender::processPreServerExistsPackets() {
void OctreeEditPacketSender::queuePendingPacketToNodes(PacketType type, unsigned char* buffer, ssize_t length) {
// If we're asked to save messages while waiting for voxel servers to arrive, then do so...
if (_maxPendingMessages > 0) {
EditPacketBuffer* packet = new EditPacketBuffer(type, buffer, length);
_pendingPacketsLock.lock();
_preServerSingleMessagePackets.push_back(packet);
// if we've saved MORE than our max, then clear out the oldest packet...
int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size();
@ -150,6 +156,7 @@ void OctreeEditPacketSender::queuePendingPacketToNodes(PacketType type, unsigned
delete packet;
_preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin());
}
_pendingPacketsLock.unlock();
}
}
@ -197,6 +204,7 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType type, unsigned ch
if (!serversExist()) {
if (_maxPendingMessages > 0) {
EditPacketBuffer* packet = new EditPacketBuffer(type, codeColorBuffer, length);
_pendingPacketsLock.lock();
_preServerPackets.push_back(packet);
// if we've saved MORE than out max, then clear out the oldest packet...
@ -206,12 +214,11 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType type, unsigned ch
delete packet;
_preServerPackets.erase(_preServerPackets.begin());
}
}
_pendingPacketsLock.unlock();
}
return; // bail early
}
//qDebug() << "queueOctreeEditMessage() line:" << __LINE__;
// We want to filter out edit messages for servers based on the server's Jurisdiction
// But we can't really do that with a packed message, since each edit message could be destined
// for a different server... So we need to actually manage multiple queued packets... one
@ -229,18 +236,14 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType type, unsigned ch
if ((*_serverJurisdictions).find(nodeUUID) != (*_serverJurisdictions).end()) {
const JurisdictionMap& map = (*_serverJurisdictions)[nodeUUID];
isMyJurisdiction = (map.isMyJurisdiction(codeColorBuffer, CHECK_NODE_ONLY) == JurisdictionMap::WITHIN);
//qDebug() << "queueOctreeEditMessage() line:" << __LINE__ << " isMyJurisdiction=" << isMyJurisdiction;
} else {
isMyJurisdiction = false;
//qDebug() << "queueOctreeEditMessage() line:" << __LINE__;
}
}
if (isMyJurisdiction) {
EditPacketBuffer& packetBuffer = _pendingEditPackets[nodeUUID];
packetBuffer._nodeUUID = nodeUUID;
//qDebug() << "queueOctreeEditMessage() line:" << __LINE__;
// If we're switching type, then we send the last one and start over
if ((type != packetBuffer._currentType && packetBuffer._currentSize > 0) ||
(packetBuffer._currentSize + length >= _maxPacketSize)) {
@ -259,11 +262,8 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType type, unsigned ch
// fixup the buffer for any clock skew
if (node->getClockSkewUsec() != 0) {
adjustEditPacketForClockSkew(codeColorBuffer, length, node->getClockSkewUsec());
//qDebug() << "queueOctreeEditMessage() line:" << __LINE__;
}
//qDebug() << "queueOctreeEditMessage() line:" << __LINE__;
memcpy(&packetBuffer._currentBuffer[packetBuffer._currentSize], codeColorBuffer, length);
packetBuffer._currentSize += length;
}
@ -280,14 +280,12 @@ void OctreeEditPacketSender::releaseQueuedMessages() {
} else {
for (std::map<QUuid, EditPacketBuffer>::iterator i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) {
releaseQueuedPacket(i->second);
//qDebug() << "releaseQueuedMessages() line:" << __LINE__;
}
}
}
void OctreeEditPacketSender::releaseQueuedPacket(EditPacketBuffer& packetBuffer) {
if (packetBuffer._currentSize > 0 && packetBuffer._currentType != PacketTypeUnknown) {
//qDebug() << "OctreeEditPacketSender::releaseQueuedPacket() line:" << __LINE__;
queuePacketToNode(packetBuffer._nodeUUID, &packetBuffer._currentBuffer[0], packetBuffer._currentSize);
}
packetBuffer._currentSize = 0;

View file

@ -105,8 +105,9 @@ protected:
// These are packets that are waiting to be processed because we don't yet know if there are servers
int _maxPendingMessages;
bool _releaseQueuedMessagesPending;
std::vector<EditPacketBuffer*> _preServerPackets; // these will get packed into other larger packets
std::vector<EditPacketBuffer*> _preServerSingleMessagePackets; // these will go out as is
QMutex _pendingPacketsLock;
QVector<EditPacketBuffer*> _preServerPackets; // these will get packed into other larger packets
QVector<EditPacketBuffer*> _preServerSingleMessagePackets; // these will go out as is
NodeToJurisdictionMap* _serverJurisdictions;