Merge pull request #1012 from ZappoMan/voxeleditsender_improvements

fix sendVoxelEditMessage() to honor jurisdictions, cleanup some threadin...
This commit is contained in:
Stephen Birarda 2013-10-03 10:37:09 -07:00
commit a36ea50154
3 changed files with 34 additions and 29 deletions

View file

@ -82,17 +82,10 @@ bool PacketSender::process() {
} }
} }
bool keepGoing = _packets.size() > 0; int packetsLeft = _packets.size();
bool keepGoing = packetsLeft > 0;
while (keepGoing) { while (keepGoing) {
// in threaded mode, we go till we're empty
if (isThreaded()) {
keepGoing = _packets.size() > 0;
} else {
// in non-threaded mode, we send as many packets as we need per expected call to process()
keepGoing = (packetsThisCall < packetsPerCall) && (_packets.size() > 0);
}
NetworkPacket& packet = _packets.front(); NetworkPacket& packet = _packets.front();
// send the packet through the NodeList... // send the packet through the NodeList...
@ -109,8 +102,14 @@ bool PacketSender::process() {
_packets.erase(_packets.begin()); _packets.erase(_packets.begin());
unlock(); unlock();
// dynamically sleep until we need to fire off the next set of voxels we only sleep in threaded mode packetsLeft = _packets.size();
// in threaded mode, we go till we're empty
if (isThreaded()) { if (isThreaded()) {
keepGoing = packetsLeft > 0;
// dynamically sleep until we need to fire off the next set of voxels we only sleep in threaded mode
if (keepGoing) {
uint64_t elapsed = now - _lastSendTime; uint64_t elapsed = now - _lastSendTime;
int usecToSleep = std::max(SEND_INTERVAL_USECS, SEND_INTERVAL_USECS - elapsed); int usecToSleep = std::max(SEND_INTERVAL_USECS, SEND_INTERVAL_USECS - elapsed);
@ -119,6 +118,12 @@ bool PacketSender::process() {
usleep(usecToSleep); usleep(usecToSleep);
} }
} }
} else {
// in non-threaded mode, we send as many packets as we need per expected call to process()
keepGoing = (packetsThisCall < packetsPerCall) && (packetsLeft > 0);
}
_lastSendTime = now; _lastSendTime = now;
} }

View file

@ -60,11 +60,9 @@ void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail&
// This encodes the voxel edit message into a buffer... // This encodes the voxel edit message into a buffer...
if (createVoxelEditMessage(type, 0, 1, &detail, bufferOut, sizeOut)){ if (createVoxelEditMessage(type, 0, 1, &detail, bufferOut, sizeOut)){
// If we don't have voxel jurisdictions, then we will simply queue up these packets and wait till we have // If we don't have voxel jurisdictions, then we will simply queue up these packets and wait till we have
// jurisdictions for processing // jurisdictions for processing
if (!voxelServersExist()) { if (!voxelServersExist()) {
// If we're asked to save messages while waiting for voxel servers to arrive, then do so... // If we're asked to save messages while waiting for voxel servers to arrive, then do so...
if (_maxPendingMessages > 0) { if (_maxPendingMessages > 0) {
EditPacketBuffer* packet = new EditPacketBuffer(type, bufferOut, sizeOut); EditPacketBuffer* packet = new EditPacketBuffer(type, bufferOut, sizeOut);
@ -79,7 +77,7 @@ void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail&
} }
return; // bail early return; // bail early
} else { } else {
queuePacketToNode(UNKNOWN_NODE_ID, bufferOut, sizeOut); // sends to all servers... not ideal! queuePacketToNodes(bufferOut, sizeOut);
} }
// either way, clean up the created buffer // either way, clean up the created buffer
@ -100,14 +98,14 @@ bool VoxelEditPacketSender::voxelServersExist() const {
// This method is called when the edit packet layer has determined that it has a fully formed packet destined for // This method is called when the edit packet layer has determined that it has a fully formed packet destined for
// a known nodeID. However, we also want to handle the case where the // a known nodeID. However, we also want to handle the case where the
void VoxelEditPacketSender::queuePacketToNode(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut) { void VoxelEditPacketSender::queuePacketToNode(uint16_t nodeID, unsigned char* buffer, ssize_t length) {
NodeList* nodeList = NodeList::getInstance(); NodeList* nodeList = NodeList::getInstance();
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
// only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER // only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER
if (node->getActiveSocket() != NULL && node->getType() == NODE_TYPE_VOXEL_SERVER && if (node->getActiveSocket() != NULL && node->getType() == NODE_TYPE_VOXEL_SERVER &&
((node->getNodeID() == nodeID) || (nodeID == (uint16_t)UNKNOWN_NODE_ID)) ) { ((node->getNodeID() == nodeID) || (nodeID == (uint16_t)UNKNOWN_NODE_ID)) ) {
sockaddr* nodeAddress = node->getActiveSocket(); sockaddr* nodeAddress = node->getActiveSocket();
queuePacketForSending(*nodeAddress, bufferOut, sizeOut); queuePacketForSending(*nodeAddress, buffer, length);
} }
} }
} }
@ -133,7 +131,7 @@ void VoxelEditPacketSender::processPreServerExistsPackets() {
// First send out all the single message packets... // First send out all the single message packets...
while (!_preServerSingleMessagePackets.empty()) { while (!_preServerSingleMessagePackets.empty()) {
EditPacketBuffer* packet = _preServerSingleMessagePackets.front(); EditPacketBuffer* packet = _preServerSingleMessagePackets.front();
queuePacketToNode(UNKNOWN_NODE_ID, &packet->_currentBuffer[0], packet->_currentSize); queuePacketToNodes(&packet->_currentBuffer[0], packet->_currentSize);
delete packet; delete packet;
_preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin()); _preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin());
} }
@ -154,13 +152,16 @@ void VoxelEditPacketSender::processPreServerExistsPackets() {
} }
} }
void VoxelEditPacketSender::queueVoxelEditMessageToNodes(unsigned char* codeColorBuffer, ssize_t length) { void VoxelEditPacketSender::queuePacketToNodes(unsigned char* buffer, ssize_t length) {
if (!_shouldSend) { if (!_shouldSend) {
return; // bail early return; // bail early
} }
assert(voxelServersExist()); // we must have jurisdictions to be here!! assert(voxelServersExist()); // we must have jurisdictions to be here!!
int headerBytes = numBytesForPacketHeader(buffer) + sizeof(short);
unsigned char* octCode = buffer + headerBytes; // skip the packet header to get to the octcode
// We want to filter out edit messages for voxel servers based on the server's Jurisdiction // We want to filter out edit messages for voxel servers based on the server's Jurisdiction
// But we can't really do that with a packed message, since each edit message could be destined // But we can't really do that with a packed message, since each edit message could be destined
// for a different voxel server... So we need to actually manage multiple queued packets... one // for a different voxel server... So we need to actually manage multiple queued packets... one
@ -171,20 +172,19 @@ void VoxelEditPacketSender::queueVoxelEditMessageToNodes(unsigned char* codeColo
if (node->getActiveSocket() != NULL && node->getType() == NODE_TYPE_VOXEL_SERVER) { if (node->getActiveSocket() != NULL && node->getType() == NODE_TYPE_VOXEL_SERVER) {
uint16_t nodeID = node->getNodeID(); uint16_t nodeID = node->getNodeID();
bool isMyJurisdiction = true; bool isMyJurisdiction = true;
// we need to get the jurisdiction for this // we need to get the jurisdiction for this
// here we need to get the "pending packet" for this server // here we need to get the "pending packet" for this server
const JurisdictionMap& map = (*_voxelServerJurisdictions)[nodeID]; const JurisdictionMap& map = (*_voxelServerJurisdictions)[nodeID];
isMyJurisdiction = (map.isMyJurisdiction(codeColorBuffer, CHECK_NODE_ONLY) == JurisdictionMap::WITHIN); isMyJurisdiction = (map.isMyJurisdiction(octCode, CHECK_NODE_ONLY) == JurisdictionMap::WITHIN);
if (isMyJurisdiction) { if (isMyJurisdiction) {
queuePacketToNode(nodeID, codeColorBuffer, length); queuePacketToNode(nodeID, buffer, length);
} }
} }
} }
} }
// NOTE: codeColorBuffer - is JUST the octcode/color and does not contain the packet header!
void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length) { void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length) {
if (!_shouldSend) { if (!_shouldSend) {
return; // bail early return; // bail early

View file

@ -86,8 +86,8 @@ public:
private: private:
bool _shouldSend; bool _shouldSend;
void queuePacketToNode(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut); void queuePacketToNode(uint16_t nodeID, unsigned char* buffer, ssize_t length);
void queueVoxelEditMessageToNodes(unsigned char* codeColorBuffer, ssize_t length); void queuePacketToNodes(unsigned char* buffer, ssize_t length);
void initializePacket(EditPacketBuffer& packetBuffer, PACKET_TYPE type); void initializePacket(EditPacketBuffer& packetBuffer, PACKET_TYPE type);
void releaseQueuedPacket(EditPacketBuffer& packetBuffer); // releases specific queued packet void releaseQueuedPacket(EditPacketBuffer& packetBuffer); // releases specific queued packet
bool voxelServersExist() const; bool voxelServersExist() const;