added sendNackPackets() to OctreeInboundPacketProcessor

added rollover handling in _missingSequenceNumbers pruning; added
EditNack packet types; added getMyEditNackType() to OctreeServer
subclasses; added code to randomly skip edit packet sequence numbers for
testing in OctreeEditPacketSender
This commit is contained in:
wangyix 2014-06-16 09:57:05 -07:00
parent b210b07b81
commit 2b20720f51
11 changed files with 133 additions and 19 deletions

View file

@ -33,6 +33,7 @@ public:
virtual const char* getMyServerName() const { return MODEL_SERVER_NAME; }
virtual const char* getMyLoggingServerTargetName() const { return MODEL_SERVER_LOGGING_TARGET_NAME; }
virtual const char* getMyDefaultPersistFilename() const { return LOCAL_MODELS_PERSIST_FILE; }
virtual PacketType getMyEditNackType() const { return PacketTypeModelEditNack; }
// subclass may implement these method
virtual void beforeRun();

View file

@ -39,7 +39,6 @@ void OctreeInboundPacketProcessor::resetStats() {
_singleSenderStats.clear();
}
void OctreeInboundPacketProcessor::processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) {
bool debugProcessPacket = _myServer->wantsVerboseDebug();
@ -150,6 +149,76 @@ void OctreeInboundPacketProcessor::trackInboundPacket(const QUuid& nodeUUID, uns
}
}
int OctreeInboundPacketProcessor::sendNackPackets() {
printf("\t\t sendNackPackets()\n");
int packetsSent = 0;
// if there are packets from _node that are waiting to be processed,
// don't send a NACK since the missing packets may be among those waiting packets.
NodeToSenderStatsMap::const_iterator begin = _singleSenderStats.begin(), end = _singleSenderStats.end();
for (NodeToSenderStatsMap::const_iterator i = begin; i != end; i++) {
QUuid nodeUUID = i.key();
SingleSenderStats nodeStats = i.value();
if (hasPacketsToProcessFrom(nodeUUID)) {
continue;
}
const SharedNodePointer& destinationNode = NodeList::getInstance()->getNodeHash().value(nodeUUID);
const QSet<unsigned short int>& missingSequenceNumbers = nodeStats.getMissingSequenceNumbers();
// check if there are any sequence numbers that need to be nacked
int numSequenceNumbersAvailable = missingSequenceNumbers.size();
// construct nack packet(s) for this node
QSet<unsigned short int>::const_iterator missingSequenceNumberIterator = missingSequenceNumbers.begin();
char packet[MAX_PACKET_SIZE];
while (numSequenceNumbersAvailable > 0) {
char* dataAt = packet;
int bytesRemaining = MAX_PACKET_SIZE;
// pack header
int numBytesPacketHeader = populatePacketHeader(packet, _myServer->getMyEditNackType());
dataAt += numBytesPacketHeader;
bytesRemaining -= numBytesPacketHeader;
// calculate and pack the number of sequence numbers to nack
int numSequenceNumbersRoomFor = (bytesRemaining - sizeof(uint16_t)) / sizeof(unsigned short int);
uint16_t numSequenceNumbers = std::min(numSequenceNumbersAvailable, numSequenceNumbersRoomFor);
uint16_t* numSequenceNumbersAt = (uint16_t*)dataAt;
*numSequenceNumbersAt = numSequenceNumbers;
dataAt += sizeof(uint16_t);
// pack sequence numbers to nack
printf("\t\t sending NACK with %d seq numbers:\n\t\t", numSequenceNumbers);
for (uint16_t i = 0; i < numSequenceNumbers; i++) {
unsigned short int* sequenceNumberAt = (unsigned short int*)dataAt;
*sequenceNumberAt = *missingSequenceNumberIterator;
dataAt += sizeof(unsigned short int);
printf("%d, ", *missingSequenceNumberIterator);
missingSequenceNumberIterator++;
}
numSequenceNumbersAvailable -= numSequenceNumbers;
// send it
qint64 bytesWritten = NodeList::getInstance()->writeDatagram(packet, dataAt - packet, destinationNode);
printf("\t\t wrote %lld bytes\n\n", bytesWritten);
packetsSent++;
}
}
return packetsSent;
}
SingleSenderStats::SingleSenderStats()
: _totalTransitTime(0),
_totalProcessTime(0),
@ -165,17 +234,19 @@ SingleSenderStats::SingleSenderStats()
void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, quint64 transitTime,
int editsInPacket, quint64 processTime, quint64 lockWaitTime) {
const int MAX_REASONABLE_SEQUENCE_GAP = 1000;
printf("\t\t tracked seq %d\n", incomingSequence);
const int UINT16_RANGE = 65536;
const int MAX_REASONABLE_SEQUENCE_GAP = 1000; // this must be less than UINT16_RANGE / 2 for rollover handling to work
const int MAX_MISSING_SEQUENCE_SIZE = 100;
unsigned short int expectedSequence = _totalPackets == 0 ? incomingSequence : _incomingLastSequence + 1;
if (incomingSequence == expectedSequence) { // on time
if (incomingSequence == expectedSequence) { // on time
_incomingLastSequence = incomingSequence;
}
else { // out of order
const int UINT16_RANGE = 65536;
else { // out of order
int incoming = (int)incomingSequence;
int expected = (int)expectedSequence;
@ -202,6 +273,8 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence,
if (incoming > expected) { // early
printf("\t\t\t packet is early! %d packets were skipped\n", incoming - expected);
// add all sequence numbers that were skipped to the missing sequence numbers list
for (int missingSequence = expected; missingSequence < incoming; missingSequence++) {
_missingSequenceNumbers.insert(missingSequence < 0 ? missingSequence + UINT16_RANGE : missingSequence);
@ -210,6 +283,8 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence,
} else { // late
printf("\t\t\t packet is late!\n");
// remove this from missing sequence number if it's in there
_missingSequenceNumbers.remove(incomingSequence);
@ -217,11 +292,27 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence,
}
}
// prune missing sequence list if it gets too big
// prune missing sequence list if it gets too big; sequence numbers that are older than MAX_REASONABLE_SEQUENCE_GAP
// will be removed.
if (_missingSequenceNumbers.size() > MAX_MISSING_SEQUENCE_SIZE) {
foreach(unsigned short int missingSequence, _missingSequenceNumbers) {
if (missingSequence <= std::max(0, _incomingLastSequence - MAX_REASONABLE_SEQUENCE_GAP)) {
_missingSequenceNumbers.remove(missingSequence);
// the acceptable range of older sequence numbers may contain a rollover point; this must be handled.
// some sequence number in this list may be larger than _incomingLastSequence, indicating that they were received
// before the most recent rollover.
int cutoff = (int)_incomingLastSequence - MAX_REASONABLE_SEQUENCE_GAP;
if (cutoff >= 0) {
foreach(unsigned short int missingSequence, _missingSequenceNumbers) {
unsigned short int nonRolloverCutoff = (unsigned short int)cutoff;
if (missingSequence > _incomingLastSequence || missingSequence <= nonRolloverCutoff) {
_missingSequenceNumbers.remove(missingSequence);
}
}
} else {
unsigned short int rolloverCutoff = (unsigned short int)(cutoff + UINT16_RANGE);
foreach(unsigned short int missingSequence, _missingSequenceNumbers) {
if (missingSequence > _incomingLastSequence && missingSequence <= rolloverCutoff) {
_missingSequenceNumbers.remove(missingSequence);
}
}
}
}

View file

@ -32,6 +32,7 @@ public:
{ return _totalElementsInPacket == 0 ? 0 : _totalProcessTime / _totalElementsInPacket; }
quint64 getAverageLockWaitTimePerElement() const
{ return _totalElementsInPacket == 0 ? 0 : _totalLockWaitTime / _totalElementsInPacket; }
const QSet<unsigned short int>& getMissingSequenceNumbers() const { return _missingSequenceNumbers; }
void trackInboundPacket(unsigned short int incomingSequence, quint64 transitTime,
int editsInPacket, quint64 processTime, quint64 lockWaitTime);
@ -47,8 +48,9 @@ public:
QSet<unsigned short int> _missingSequenceNumbers;
};
typedef std::map<QUuid, SingleSenderStats> NodeToSenderStatsMap;
typedef std::map<QUuid, SingleSenderStats>::iterator NodeToSenderStatsMapIterator;
typedef QHash<QUuid, SingleSenderStats> NodeToSenderStatsMap;
typedef QHash<QUuid, SingleSenderStats>::iterator NodeToSenderStatsMapIterator;
typedef QHash<QUuid, SingleSenderStats>::const_iterator NodeToSenderStatsMapConstIterator;
/// Handles processing of incoming network packets for the voxel-server. As with other ReceivedPacketProcessor classes
@ -75,6 +77,9 @@ public:
protected:
virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet);
public slots:
int sendNackPackets();
private:
void trackInboundPacket(const QUuid& nodeUUID, unsigned short int sequence, quint64 transitTime,
int voxelsInPacket, quint64 processTime, quint64 lockWaitTime);
@ -87,7 +92,7 @@ private:
quint64 _totalLockWaitTime;
quint64 _totalElementsInPacket;
quint64 _totalPackets;
NodeToSenderStatsMap _singleSenderStats;
};
#endif // hifi_OctreeInboundPacketProcessor_h

View file

@ -648,10 +648,10 @@ bool OctreeServer::handleHTTPRequest(HTTPConnection* connection, const QUrl& url
int senderNumber = 0;
NodeToSenderStatsMap& allSenderStats = _octreeInboundPacketProcessor->getSingleSenderStats();
for (NodeToSenderStatsMapIterator i = allSenderStats.begin(); i != allSenderStats.end(); i++) {
for (NodeToSenderStatsMapConstIterator i = allSenderStats.begin(); i != allSenderStats.end(); i++) {
senderNumber++;
QUuid senderID = i->first;
SingleSenderStats& senderStats = i->second;
QUuid senderID = i.key();
const SingleSenderStats& senderStats = i.value();
statsString += QString("\r\n Stats for sender %1 uuid: %2\r\n")
.arg(senderNumber).arg(senderID.toString());

View file

@ -68,6 +68,7 @@ public:
virtual const char* getMyServerName() const = 0;
virtual const char* getMyLoggingServerTargetName() const = 0;
virtual const char* getMyDefaultPersistFilename() const = 0;
virtual PacketType getMyEditNackType() const = 0;
// subclass may implement these method
virtual void beforeRun() { };

View file

@ -33,6 +33,7 @@ public:
virtual const char* getMyServerName() const { return PARTICLE_SERVER_NAME; }
virtual const char* getMyLoggingServerTargetName() const { return PARTICLE_SERVER_LOGGING_TARGET_NAME; }
virtual const char* getMyDefaultPersistFilename() const { return LOCAL_PARTICLES_PERSIST_FILE; }
virtual PacketType getMyEditNackType() const { return PacketTypeParticleEditNack; }
// subclass may implement these method
virtual void beforeRun();

View file

@ -42,6 +42,7 @@ public:
virtual const char* getMyServerName() const { return VOXEL_SERVER_NAME; }
virtual const char* getMyLoggingServerTargetName() const { return VOXEL_SERVER_LOGGING_TARGET_NAME; }
virtual const char* getMyDefaultPersistFilename() const { return LOCAL_VOXELS_PERSIST_FILE; }
virtual PacketType getMyEditNackType() const { return PacketTypeVoxelEditNack; }
// subclass may implement these method
virtual void beforeRun();

View file

@ -66,7 +66,10 @@ enum PacketType {
PacketTypeModelAddOrEdit,
PacketTypeModelErase,
PacketTypeModelAddResponse,
PacketTypeOctreeDataNack
PacketTypeOctreeDataNack, // 45
PacketTypeVoxelEditNack,
PacketTypeParticleEditNack,
PacketTypeModelEditNack,
};
typedef char PacketVersion;

View file

@ -35,7 +35,11 @@ public:
/// Are there received packets waiting to be processed from a certain node
bool hasPacketsToProcessFrom(const SharedNodePointer& sendingNode) const {
return _nodePacketCounts[sendingNode->getUUID()] > 0;
return hasPacketsToProcessFrom(sendingNode->getUUID());
}
bool hasPacketsToProcessFrom(const QUuid& nodeUUID) const {
return _nodePacketCounts[nodeUUID] > 0;
}
/// How many received packets waiting are to be processed

View file

@ -308,7 +308,14 @@ void OctreeEditPacketSender::initializePacket(EditPacketBuffer& packetBuffer, Pa
unsigned short int* sequenceAt = (unsigned short int*)&packetBuffer._currentBuffer[packetBuffer._currentSize];
*sequenceAt = _sequenceNumber;
packetBuffer._currentSize += sizeof(unsigned short int); // nudge past sequence
if (randFloat() < 0.6f)
_sequenceNumber++;
else
{
int x = randIntInRange(2, 4);
printf("\t\t seq number jumped from %d to %d\n", _sequenceNumber, _sequenceNumber + x);
_sequenceNumber += x;
}
// pack in timestamp
quint64 now = usecTimestampNow();

View file

@ -57,7 +57,7 @@ protected:
bool isStillRunning() const { return !_stopThread; }
private:
protected:
QMutex _mutex;
bool _stopThread;