Merge branch 'packet_recovery_pull'

This commit is contained in:
wangyix 2014-06-09 12:04:12 -07:00
commit f13ae84da6
13 changed files with 186 additions and 55 deletions

View file

@ -310,21 +310,14 @@ void AudioMixer::addBufferToMixForListeningNodeWithBuffer(PositionalAudioRingBuf
// stereo buffer - do attenuation but no sample delay for spatialization
for (int s = 0; s < NETWORK_BUFFER_LENGTH_SAMPLES_STEREO; s += 4) {
// use MMX to clamp four additions at a time
__m64 bufferSamples = _mm_set_pi16(_clientSamples[s], _clientSamples[s + 1],
_clientSamples[s + 2], _clientSamples[s + 3]);
__m64 addSamples = _mm_set_pi16(nextOutputStart[s] * attenuationCoefficient,
nextOutputStart[s + 1] * attenuationCoefficient,
nextOutputStart[s + 2] * attenuationCoefficient,
nextOutputStart[s + 3] * attenuationCoefficient);
__m64 mmxResult = _mm_adds_pi16(bufferSamples, addSamples);
int16_t* shortResults = reinterpret_cast<int16_t*>(&mmxResult);
_clientSamples[s] = shortResults[3];
_clientSamples[s + 1] = shortResults[2];
_clientSamples[s + 2] = shortResults[1];
_clientSamples[s + 3] = shortResults[0];
_clientSamples[s] = glm::clamp(_clientSamples[s] + (int) (nextOutputStart[s] * attenuationCoefficient),
MIN_SAMPLE_VALUE, MAX_SAMPLE_VALUE);
_clientSamples[s + 1] = glm::clamp(_clientSamples[s + 1] + (int) (nextOutputStart[s + 1] * attenuationCoefficient),
MIN_SAMPLE_VALUE, MAX_SAMPLE_VALUE);
_clientSamples[s + 2] = glm::clamp(_clientSamples[s + 2] + (int) (nextOutputStart[s + 2] * attenuationCoefficient),
MIN_SAMPLE_VALUE, MAX_SAMPLE_VALUE);
_clientSamples[s + 3] = glm::clamp(_clientSamples[s + 3] + (int) (nextOutputStart[s + 3] * attenuationCoefficient),
MIN_SAMPLE_VALUE, MAX_SAMPLE_VALUE);
}
}
}

View file

@ -377,21 +377,14 @@ void OctreeQueryNode::packetSent(const QByteArray& packet) {
_sequenceNumber++;
}
void OctreeQueryNode::addSequenceNumbersToResend(const QList<OCTREE_PACKET_SEQUENCE>& sequenceNumbers) {
_nackedSequenceNumbers.append(sequenceNumbers);
}
bool OctreeQueryNode::hasNextNackedPacket() const {
return !_nackedSequenceNumbers.isEmpty();
}
const QByteArray* OctreeQueryNode::getNextNackedPacket() {
if (!_nackedSequenceNumbers.isEmpty()) {
const QByteArray* nextPacket = _sentPacketHistory.getPacket(_nackedSequenceNumbers.first());
_nackedSequenceNumbers.pop_front();
return nextPacket; // could be null
// could return null if packet is not in the history
return _sentPacketHistory.getPacket(_nackedSequenceNumbers.takeFirst());
}
return NULL;
}
}

View file

@ -109,7 +109,9 @@ public:
OCTREE_PACKET_SEQUENCE getSequenceNumber() const { return _sequenceNumber; }
void addSequenceNumbersToResend(const QList<OCTREE_PACKET_SEQUENCE>& sequenceNumbers);
void addNackedSequenceNumber(OCTREE_PACKET_SEQUENCE sequenceNumber) {
_nackedSequenceNumbers.append(sequenceNumber);
}
bool hasNextNackedPacket() const;
const QByteArray* getNextNackedPacket();

View file

@ -181,7 +181,7 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
// actually send it
OctreeServer::didCallWriteDatagram(this);
NodeList::getInstance()->writeDatagram((char*) statsMessage, statsMessageLength, _node);
NodeList::getInstance()->writeDatagram2(nodeData->getSequenceNumber(), (char*) statsMessage, statsMessageLength, _node);
packetSent = true;
} else {
// not enough room in the packet, send two packets
@ -215,8 +215,7 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
packetsSent++;
OctreeServer::didCallWriteDatagram(this);
NodeList::getInstance()->writeDatagram((char*) nodeData->getPacket(), nodeData->getPacketLength(), _node);
NodeList::getInstance()->writeDatagram2(nodeData->getSequenceNumber(), (char*)nodeData->getPacket(), nodeData->getPacketLength(), _node);
packetSent = true;
thisWastedBytes = MAX_PACKET_SIZE - nodeData->getPacketLength();
@ -245,7 +244,7 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
if (nodeData->isPacketWaiting() && !nodeData->isShuttingDown()) {
// just send the voxel packet
OctreeServer::didCallWriteDatagram(this);
NodeList::getInstance()->writeDatagram((char*) nodeData->getPacket(), nodeData->getPacketLength(), _node);
NodeList::getInstance()->writeDatagram2(nodeData->getSequenceNumber(), (char*)nodeData->getPacket(), nodeData->getPacketLength(), _node);
packetSent = true;
int thisWastedBytes = MAX_PACKET_SIZE - nodeData->getPacketLength();
@ -288,25 +287,23 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
int OctreeSendThread::resendNackedPackets(OctreeQueryNode* nodeData) {
const int maxPacketsSent = 10;
const int MAX_PACKETS_RESEND = 10;
int packetsSent = 0;
const QByteArray* packet;
while (nodeData->hasNextNackedPacket() && packetsSent < maxPacketsSent) {
while (nodeData->hasNextNackedPacket() && packetsSent < MAX_PACKETS_RESEND) {
packet = nodeData->getNextNackedPacket();
// packet will be NULL if it's not in nodeData's packet history
if (packet) {
NodeList::getInstance()->writeDatagram(*packet, _node);
packetsSent++;
// ??????
_totalBytes += packet->size();
_totalPackets++;
_totalWastedBytes += MAX_PACKET_SIZE - packet->size(); // ???
}
}
if (packetsSent > 0)
printf("\t\t re-sent %d packets!\n", packetsSent);
return packetsSent;
}

View file

@ -832,10 +832,9 @@ void OctreeServer::readPendingDatagrams() {
PacketType packetType = packetTypeForPacket(receivedPacket);
SharedNodePointer matchingNode = nodeList->sendingNodeForPacket(receivedPacket);
if (packetType == getMyQueryMessageType()) {
// If we got a query packet, then we're talking to an agent, and we
// need to make sure we have it in our nodeList.
if (matchingNode) {
if (matchingNode) {
nodeList->updateNodeWithDataFromPacket(matchingNode, receivedPacket);
OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData();
if (nodeData && !nodeData->isOctreeSendThreadInitalized()) {
@ -852,25 +851,29 @@ void OctreeServer::readPendingDatagrams() {
} else if (packetType == PacketTypeOctreeDataNack) {
// parse packet for sequence numbers that need to be resent
int numBytesPacketHeader = numBytesForPacketHeader(receivedPacket);
const unsigned char* dataAt = reinterpret_cast<const unsigned char*>(receivedPacket.data()) + numBytesPacketHeader;
uint16_t numSequenceNumbers = (*(uint16_t*)dataAt);
dataAt += sizeof(uint16_t);
if (matchingNode) {
// read sequence numbers
QList<OCTREE_PACKET_SEQUENCE> sequenceNumbers;
for (int i = 0; i < numSequenceNumbers; i++) {
OCTREE_PACKET_SEQUENCE sequenceNumber = (*(OCTREE_PACKET_SEQUENCE*)dataAt);
sequenceNumbers.append(sequenceNumber);
dataAt += sizeof(OCTREE_PACKET_SEQUENCE);
OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData();
printf("\t\t\t nacked packet: seq = %d\n", sequenceNumber);
int numBytesPacketHeader = numBytesForPacketHeader(receivedPacket);
const unsigned char* dataAt = reinterpret_cast<const unsigned char*>(receivedPacket.data()) + numBytesPacketHeader;
uint16_t numSequenceNumbers = (*(uint16_t*)dataAt);
dataAt += sizeof(uint16_t);
printf("\t received nack packet containing %d seq nums\n", numSequenceNumbers);
// read sequence numbers
for (int i = 0; i < numSequenceNumbers; i++) {
OCTREE_PACKET_SEQUENCE sequenceNumber = (*(OCTREE_PACKET_SEQUENCE*)dataAt);
nodeData->addNackedSequenceNumber(sequenceNumber);
dataAt += sizeof(OCTREE_PACKET_SEQUENCE);
printf("\t seq = %d\n", sequenceNumber);
}
}
OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData(); // move this or something
nodeData->addSequenceNumbersToResend(sequenceNumbers);
} else if (packetType == PacketTypeJurisdictionRequest) {

View file

@ -167,7 +167,8 @@ Application::Application(int& argc, char** argv, QElapsedTimer &startup_time) :
_applicationOverlay(),
_runningScriptsWidget(new RunningScriptsWidget(_window)),
_runningScriptsWidgetWasVisible(false),
_trayIcon(new QSystemTrayIcon(_window))
_trayIcon(new QSystemTrayIcon(_window)),
_lastNackTime(usecTimestampNow())
{
// read the ApplicationInfo.ini file for Name/Version/Domain information
QSettings applicationInfo(Application::resourcesPath() + "info/ApplicationInfo.ini", QSettings::IniFormat);
@ -2093,7 +2094,103 @@ void Application::updateMyAvatar(float deltaTime) {
_lastQueriedViewFrustum = _viewFrustum;
}
}
// sent a nack packet containing missing sequence numbers of received packets
{
quint64 now = usecTimestampNow();
quint64 sinceLastNack = now - _lastNackTime;
const quint64 TOO_LONG_SINCE_LAST_NACK = 250 * MSECS_PER_SECOND;
if (sinceLastNack > TOO_LONG_SINCE_LAST_NACK) {
_lastNackTime = now;
sendNack();
}
}
}
/*/ Attempt to identify the sender from it's address.
if (sendingNode) {
QUuid nodeUUID = sendingNode->getUUID();
// now that we know the node ID, let's add these stats to the stats for that node...
_octreeSceneStatsLock.lockForWrite();
if (_octreeServerSceneStats.find(nodeUUID) != _octreeServerSceneStats.end()) {
OctreeSceneStats& stats = _octreeServerSceneStats[nodeUUID];
stats.trackIncomingOctreePacket(packet, wasStatsPacket, sendingNode->getClockSkewUsec());
}
_octreeSceneStatsLock.unlock();
}
*/
void Application::sendNack() {
char packet[MAX_PACKET_SIZE];
NodeList* nodeList = NodeList::getInstance();
// iterates thru all nodes in NodeList
foreach(const SharedNodePointer& node, NodeList::getInstance()->getNodeHash()) {
if (node->getActiveSocket() &&
( node->getType() == NodeType::VoxelServer
|| node->getType() == NodeType::ParticleServer
|| node->getType() == NodeType::ModelServer)
) {
QUuid nodeUUID = node->getUUID();
_octreeSceneStatsLock.lockForWrite();
// retreive octree scene stats of this node
if (_octreeServerSceneStats.find(nodeUUID) == _octreeServerSceneStats.end()) {
_octreeSceneStatsLock.unlock();
continue;
}
OctreeSceneStats& stats = _octreeServerSceneStats[nodeUUID];
// check if there are any sequence numbers that need to be nacked
int numSequenceNumbersAvailable = stats.getNumSequenceNumbersToNack();
if (numSequenceNumbersAvailable == 0) {
_octreeSceneStatsLock.unlock();
continue;
}
char* dataAt = packet;
int bytesRemaining = MAX_PACKET_SIZE;
// pack header
int numBytesPacketHeader = populatePacketHeader(packet, PacketTypeOctreeDataNack);
dataAt += numBytesPacketHeader;
bytesRemaining -= numBytesPacketHeader;
int numSequenceNumbersRoomFor = (bytesRemaining - sizeof(uint16_t)) / sizeof(OCTREE_PACKET_SEQUENCE);
// calculate and pack the number of sequence numbers
uint16_t numSequenceNumbers = min(numSequenceNumbersAvailable, numSequenceNumbersRoomFor);
uint16_t* numSequenceNumbersAt = (uint16_t*)dataAt;
*numSequenceNumbersAt = numSequenceNumbers;
dataAt += sizeof(uint16_t);
// pack sequence numbers
//printf("\n\t sending nack...\n");
//printf("\t\t packed %d seq #s:", numSequenceNumbers);
for (int i = 0; i < numSequenceNumbers; i++) {
OCTREE_PACKET_SEQUENCE* sequenceNumberAt = (OCTREE_PACKET_SEQUENCE*)dataAt;
*sequenceNumberAt = stats.getNextSequenceNumberToNack();
dataAt += sizeof(OCTREE_PACKET_SEQUENCE);
//printf(" %d,", *sequenceNumberAt);
}
//printf("\n");
_octreeSceneStatsLock.unlock();
nodeList->writeUnverifiedDatagram(packet, dataAt - packet, node);
}
}
}
void Application::queryOctree(NodeType_t serverType, PacketType packetType, NodeToJurisdictionMap& jurisdictions) {

View file

@ -411,6 +411,10 @@ private:
static void attachNewHeadToNode(Node *newNode);
static void* networkReceive(void* args); // network receive thread
void sendNack();
MainWindow* _window;
GLCanvas* _glWidget; // our GLCanvas has a couple extra features
@ -580,6 +584,8 @@ private:
bool _runningScriptsWidgetWasVisible;
QSystemTrayIcon* _trayIcon;
quint64 _lastNackTime;
};
#endif // hifi_Application_h

View file

@ -271,6 +271,23 @@ qint64 LimitedNodeList::writeDatagram(const char* data, qint64 size, const Share
return writeDatagram(QByteArray(data, size), destinationNode, overridenSockAddr);
}
qint64 LimitedNodeList::writeDatagram2(int seq, const char* data, qint64 size, const SharedNodePointer& destinationNode,
const HifiSockAddr& overridenSockAddr) {
qint64 ret = -1;
if (randFloat() < 0.8f) {
ret = writeDatagram(QByteArray(data, size), destinationNode, overridenSockAddr);
}
else {
printf("dropped packet seq = %d --------------------------\n", seq);
}
return ret;
}
qint64 LimitedNodeList::writeUnverifiedDatagram(const char* data, qint64 size, const SharedNodePointer& destinationNode,
const HifiSockAddr& overridenSockAddr) {
return writeUnverifiedDatagram(QByteArray(data, size), destinationNode, overridenSockAddr);

View file

@ -72,6 +72,9 @@ public:
qint64 writeDatagram(const char* data, qint64 size, const SharedNodePointer& destinationNode,
const HifiSockAddr& overridenSockAddr = HifiSockAddr());
qint64 writeDatagram2(int seq, const char* data, qint64 size, const SharedNodePointer& destinationNode,
const HifiSockAddr& overridenSockAddr = HifiSockAddr());
qint64 writeUnverifiedDatagram(const char* data, qint64 size, const SharedNodePointer& destinationNode,
const HifiSockAddr& overridenSockAddr = HifiSockAddr());

View file

@ -108,7 +108,7 @@ int populatePacketHeader(char* packet, PacketType type, const QUuid& connectionU
position += NUM_BYTES_RFC4122_UUID;
if (!NON_VERIFIED_PACKETS.contains(type)) {
// pack 16 bytes of zeros where the md5 hash will be placed one data is packed
// pack 16 bytes of zeros where the md5 hash will be placed once data is packed
memset(position, 0, NUM_BYTES_MD5_HASH);
position += NUM_BYTES_MD5_HASH;
}

View file

@ -75,7 +75,8 @@ const QSet<PacketType> NON_VERIFIED_PACKETS = QSet<PacketType>()
<< PacketTypeDomainServerRequireDTLS << PacketTypeDomainConnectRequest
<< PacketTypeDomainList << PacketTypeDomainListRequest << PacketTypeDomainOAuthRequest
<< PacketTypeCreateAssignment << PacketTypeRequestAssignment << PacketTypeStunResponse
<< PacketTypeNodeJsonStats << PacketTypeVoxelQuery << PacketTypeParticleQuery << PacketTypeModelQuery;
<< PacketTypeNodeJsonStats << PacketTypeVoxelQuery << PacketTypeParticleQuery << PacketTypeModelQuery
<< PacketTypeOctreeDataNack;
const int NUM_BYTES_MD5_HASH = 16;
const int NUM_STATIC_HEADER_BYTES = sizeof(PacketVersion) + NUM_BYTES_RFC4122_UUID;

View file

@ -46,6 +46,7 @@ OctreeSceneStats::OctreeSceneStats() :
_incomingReallyLate(0),
_incomingPossibleDuplicate(0),
_missingSequenceNumbers(),
_sequenceNumbersToNack(),
_incomingFlightTimeAverage(samples),
_jurisdictionRoot(NULL)
{
@ -158,6 +159,7 @@ void OctreeSceneStats::copyFromOther(const OctreeSceneStats& other) {
_incomingPossibleDuplicate = other._incomingPossibleDuplicate;
_missingSequenceNumbers = other._missingSequenceNumbers;
_sequenceNumbersToNack = other._sequenceNumbersToNack;
}
@ -926,6 +928,7 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet,
qDebug() << "found it in _missingSequenceNumbers";
}
_missingSequenceNumbers.remove(sequence);
_sequenceNumbersToNack.remove(sequence);
_incomingLikelyLost--;
_incomingRecovered++;
} else {
@ -955,6 +958,7 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet,
_incomingLikelyLost += missing;
for(unsigned int missingSequence = expected; missingSequence < sequence; missingSequence++) {
_missingSequenceNumbers << missingSequence;
_sequenceNumbersToNack << missingSequence;
}
}
}
@ -982,9 +986,20 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet,
qDebug() << "pruning really old missing sequence:" << missingItem;
}
_missingSequenceNumbers.remove(missingItem);
_sequenceNumbersToNack.remove(missingItem);
}
}
}
}
int OctreeSceneStats::getNumSequenceNumbersToNack() const {
return _sequenceNumbersToNack.size();
}
uint16_t OctreeSceneStats::getNextSequenceNumberToNack() {
QSet<uint16_t>::Iterator it = _sequenceNumbersToNack.begin();
uint16_t sequenceNumber = *it;
_sequenceNumbersToNack.remove(sequenceNumber);
return sequenceNumber;
}

View file

@ -172,6 +172,9 @@ public:
quint32 getIncomingPossibleDuplicate() const { return _incomingPossibleDuplicate; }
float getIncomingFlightTimeAverage() { return _incomingFlightTimeAverage.getAverage(); }
int getNumSequenceNumbersToNack() const;
uint16_t getNextSequenceNumberToNack();
private:
void copyFromOther(const OctreeSceneStats& other);
@ -273,6 +276,7 @@ private:
quint32 _incomingReallyLate; /// out of order and later than MAX_MISSING_SEQUENCE_OLD_AGE late
quint32 _incomingPossibleDuplicate; /// out of order possibly a duplicate
QSet<uint16_t> _missingSequenceNumbers;
QSet<uint16_t> _sequenceNumbersToNack;
SimpleMovingAverage _incomingFlightTimeAverage;
// features related items