fix sendVoxelEditMessage() to honor jurisdictions, cleanup some threading/nonthreading behavior in PacketSender

This commit is contained in:
ZappoMan 2013-10-03 10:25:00 -07:00
parent f937170b67
commit 84862310d1
3 changed files with 34 additions and 29 deletions

View file

@ -82,17 +82,10 @@ bool PacketSender::process() {
}
}
bool keepGoing = _packets.size() > 0;
int packetsLeft = _packets.size();
bool keepGoing = packetsLeft > 0;
while (keepGoing) {
// in threaded mode, we go till we're empty
if (isThreaded()) {
keepGoing = _packets.size() > 0;
} else {
// in non-threaded mode, we send as many packets as we need per expected call to process()
keepGoing = (packetsThisCall < packetsPerCall) && (_packets.size() > 0);
}
NetworkPacket& packet = _packets.front();
// send the packet through the NodeList...
@ -109,16 +102,28 @@ bool PacketSender::process() {
_packets.erase(_packets.begin());
unlock();
// dynamically sleep until we need to fire off the next set of voxels we only sleep in threaded mode
packetsLeft = _packets.size();
// in threaded mode, we go till we're empty
if (isThreaded()) {
uint64_t elapsed = now - _lastSendTime;
int usecToSleep = std::max(SEND_INTERVAL_USECS, SEND_INTERVAL_USECS - elapsed);
// we only sleep in non-threaded mode
if (usecToSleep > 0) {
usleep(usecToSleep);
keepGoing = packetsLeft > 0;
// dynamically sleep until we need to fire off the next set of voxels we only sleep in threaded mode
if (keepGoing) {
uint64_t elapsed = now - _lastSendTime;
int usecToSleep = std::max(SEND_INTERVAL_USECS, SEND_INTERVAL_USECS - elapsed);
// we only sleep in non-threaded mode
if (usecToSleep > 0) {
usleep(usecToSleep);
}
}
} else {
// in non-threaded mode, we send as many packets as we need per expected call to process()
keepGoing = (packetsThisCall < packetsPerCall) && (packetsLeft > 0);
}
_lastSendTime = now;
}

View file

@ -60,11 +60,9 @@ void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail&
// This encodes the voxel edit message into a buffer...
if (createVoxelEditMessage(type, 0, 1, &detail, bufferOut, sizeOut)){
// If we don't have voxel jurisdictions, then we will simply queue up these packets and wait till we have
// jurisdictions for processing
if (!voxelServersExist()) {
// If we're asked to save messages while waiting for voxel servers to arrive, then do so...
if (_maxPendingMessages > 0) {
EditPacketBuffer* packet = new EditPacketBuffer(type, bufferOut, sizeOut);
@ -79,7 +77,7 @@ void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail&
}
return; // bail early
} else {
queuePacketToNode(UNKNOWN_NODE_ID, bufferOut, sizeOut); // sends to all servers... not ideal!
queuePacketToNodes(bufferOut, sizeOut);
}
// either way, clean up the created buffer
@ -100,14 +98,14 @@ bool VoxelEditPacketSender::voxelServersExist() const {
// This method is called when the edit packet layer has determined that it has a fully formed packet destined for
// a known nodeID. However, we also want to handle the case where the
void VoxelEditPacketSender::queuePacketToNode(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut) {
void VoxelEditPacketSender::queuePacketToNode(uint16_t nodeID, unsigned char* buffer, ssize_t length) {
NodeList* nodeList = NodeList::getInstance();
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
// only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER
if (node->getActiveSocket() != NULL && node->getType() == NODE_TYPE_VOXEL_SERVER &&
((node->getNodeID() == nodeID) || (nodeID == (uint16_t)UNKNOWN_NODE_ID)) ) {
sockaddr* nodeAddress = node->getActiveSocket();
queuePacketForSending(*nodeAddress, bufferOut, sizeOut);
queuePacketForSending(*nodeAddress, buffer, length);
}
}
}
@ -133,7 +131,7 @@ void VoxelEditPacketSender::processPreServerExistsPackets() {
// First send out all the single message packets...
while (!_preServerSingleMessagePackets.empty()) {
EditPacketBuffer* packet = _preServerSingleMessagePackets.front();
queuePacketToNode(UNKNOWN_NODE_ID, &packet->_currentBuffer[0], packet->_currentSize);
queuePacketToNodes(&packet->_currentBuffer[0], packet->_currentSize);
delete packet;
_preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin());
}
@ -154,12 +152,15 @@ void VoxelEditPacketSender::processPreServerExistsPackets() {
}
}
void VoxelEditPacketSender::queueVoxelEditMessageToNodes(unsigned char* codeColorBuffer, ssize_t length) {
void VoxelEditPacketSender::queuePacketToNodes(unsigned char* buffer, ssize_t length) {
if (!_shouldSend) {
return; // bail early
}
assert(voxelServersExist()); // we must have jurisdictions to be here!!
int headerBytes = numBytesForPacketHeader(buffer) + sizeof(short);
unsigned char* octCode = buffer + headerBytes; // skip the packet header to get to the octcode
// We want to filter out edit messages for voxel servers based on the server's Jurisdiction
// But we can't really do that with a packed message, since each edit message could be destined
@ -171,20 +172,19 @@ void VoxelEditPacketSender::queueVoxelEditMessageToNodes(unsigned char* codeColo
if (node->getActiveSocket() != NULL && node->getType() == NODE_TYPE_VOXEL_SERVER) {
uint16_t nodeID = node->getNodeID();
bool isMyJurisdiction = true;
// we need to get the jurisdiction for this
// here we need to get the "pending packet" for this server
const JurisdictionMap& map = (*_voxelServerJurisdictions)[nodeID];
isMyJurisdiction = (map.isMyJurisdiction(codeColorBuffer, CHECK_NODE_ONLY) == JurisdictionMap::WITHIN);
isMyJurisdiction = (map.isMyJurisdiction(octCode, CHECK_NODE_ONLY) == JurisdictionMap::WITHIN);
if (isMyJurisdiction) {
queuePacketToNode(nodeID, codeColorBuffer, length);
queuePacketToNode(nodeID, buffer, length);
}
}
}
}
// NOTE: codeColorBuffer - is JUST the octcode/color and does not contain the packet header!
void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length) {
if (!_shouldSend) {
return; // bail early

View file

@ -86,8 +86,8 @@ public:
private:
bool _shouldSend;
void queuePacketToNode(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut);
void queueVoxelEditMessageToNodes(unsigned char* codeColorBuffer, ssize_t length);
void queuePacketToNode(uint16_t nodeID, unsigned char* buffer, ssize_t length);
void queuePacketToNodes(unsigned char* buffer, ssize_t length);
void initializePacket(EditPacketBuffer& packetBuffer, PACKET_TYPE type);
void releaseQueuedPacket(EditPacketBuffer& packetBuffer); // releases specific queued packet
bool voxelServersExist() const;