From 1a7e3a859cafe5c344b912b221580f12981abd92 Mon Sep 17 00:00:00 2001 From: wangyix Date: Thu, 5 Jun 2014 18:00:58 -0700 Subject: [PATCH] started work on resending nacked packets --- assignment-client/src/models/ModelServer.cpp | 2 +- .../src/octree/OctreeQueryNode.cpp | 35 ++++++++++++++- .../src/octree/OctreeQueryNode.h | 16 ++++++- .../src/octree/OctreeSendThread.cpp | 36 ++++++++++++++- .../src/octree/OctreeSendThread.h | 3 ++ assignment-client/src/octree/OctreeServer.cpp | 28 ++++++++++-- .../src/octree/SentPacketHistory.cpp | 45 +++++++++++++++++++ .../src/octree/SentPacketHistory.h | 36 +++++++++++++++ .../src/particles/ParticleServer.cpp | 2 +- assignment-client/src/voxels/VoxelServer.cpp | 2 +- libraries/networking/src/PacketHeaders.h | 1 + 11 files changed, 196 insertions(+), 10 deletions(-) create mode 100644 assignment-client/src/octree/SentPacketHistory.cpp create mode 100644 assignment-client/src/octree/SentPacketHistory.h diff --git a/assignment-client/src/models/ModelServer.cpp b/assignment-client/src/models/ModelServer.cpp index ff2367ec6e..07359f001a 100644 --- a/assignment-client/src/models/ModelServer.cpp +++ b/assignment-client/src/models/ModelServer.cpp @@ -106,7 +106,7 @@ int ModelServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodeP //qDebug() << "sending PacketType_MODEL_ERASE packetLength:" << packetLength; NodeList::getInstance()->writeDatagram((char*) outputBuffer, packetLength, SharedNodePointer(node)); - queryNode->incrementSequenceNumber(); + queryNode->packetSent(outputBuffer, packetLength); } nodeData->setLastDeletedModelsSentAt(deletePacketSentAt); diff --git a/assignment-client/src/octree/OctreeQueryNode.cpp b/assignment-client/src/octree/OctreeQueryNode.cpp index 6acd85bff6..e0ca6a4f1e 100644 --- a/assignment-client/src/octree/OctreeQueryNode.cpp +++ b/assignment-client/src/octree/OctreeQueryNode.cpp @@ -41,7 +41,8 @@ OctreeQueryNode::OctreeQueryNode() : _sequenceNumber(0), _lastRootTimestamp(0), _myPacketType(PacketTypeUnknown), - _isShuttingDown(false) + _isShuttingDown(false), + _sentPacketHistory(1000) { } @@ -362,3 +363,35 @@ void OctreeQueryNode::dumpOutOfView() { } } } + +void OctreeQueryNode::packetSent() { + packetSent(_octreePacket, getPacketLength()); +} + +void OctreeQueryNode::packetSent(unsigned char* packet, int packetLength) { + packetSent(QByteArray((char*)packet, packetLength)); +} + +void OctreeQueryNode::packetSent(const QByteArray& packet) { + _sentPacketHistory.packetSent(_sequenceNumber, packet); + _sequenceNumber++; +} + + +void OctreeQueryNode::addSequenceNumbersToResend(const QList& sequenceNumbers) { + _sequenceNumbersToResend.append(sequenceNumbers); +} + +bool OctreeQueryNode::hasNextPacketToResend() const { + return !_sequenceNumbersToResend.isEmpty(); +} + +const QByteArray* OctreeQueryNode::getNextPacketToResend() { + + if (!_sequenceNumbersToResend.isEmpty()) { + const QByteArray* nextPacket = _sentPacketHistory.getPacket(_sequenceNumbersToResend.first()); + _sequenceNumbersToResend.pop_front(); + return nextPacket; // could be null + } + return NULL; +} \ No newline at end of file diff --git a/assignment-client/src/octree/OctreeQueryNode.h b/assignment-client/src/octree/OctreeQueryNode.h index eb420039e6..4a2355f7cc 100644 --- a/assignment-client/src/octree/OctreeQueryNode.h +++ b/assignment-client/src/octree/OctreeQueryNode.h @@ -23,6 +23,9 @@ #include #include #include // for SharedAssignmentPointer +#include "SentPacketHistory.h" + +#include // i added dis class OctreeSendThread; @@ -100,10 +103,16 @@ public: void forceNodeShutdown(); bool isShuttingDown() const { return _isShuttingDown; } - void incrementSequenceNumber() { _sequenceNumber++; } + void packetSent(); + void packetSent(unsigned char* packet, int packetLength); + void packetSent(const QByteArray& packet); OCTREE_PACKET_SEQUENCE getSequenceNumber() const { return _sequenceNumber; } - + + void addSequenceNumbersToResend(const QList& sequenceNumbers); + bool hasNextPacketToResend() const; + const QByteArray* getNextPacketToResend(); + private slots: void sendThreadFinished(); @@ -146,6 +155,9 @@ private: PacketType _myPacketType; bool _isShuttingDown; + +SentPacketHistory _sentPacketHistory; +QQueue _sequenceNumbersToResend; }; #endif // hifi_OctreeQueryNode_h diff --git a/assignment-client/src/octree/OctreeSendThread.cpp b/assignment-client/src/octree/OctreeSendThread.cpp index 9e4dbcd347..5767f2623b 100644 --- a/assignment-client/src/octree/OctreeSendThread.cpp +++ b/assignment-client/src/octree/OctreeSendThread.cpp @@ -274,13 +274,47 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes trueBytesSent += nodeData->getPacketLength(); truePacketsSent++; packetsSent++; - nodeData->incrementSequenceNumber(); + nodeData->packetSent(); nodeData->resetOctreePacket(); } return packetsSent; } + + + + +int OctreeSendThread::resendNackedPackets(OctreeQueryNode* nodeData) { + + int packetsSent = 0; + + const QByteArray* packet; + while (nodeData->hasNextPacketToResend()) { + packet = nodeData->getNextPacketToResend(); + if (packet) { + NodeList::getInstance()->writeDatagram(*packet, _node); + packetsSent++; + + // ?????? + _totalBytes += packet->size(); + _totalPackets++; + _totalWastedBytes += MAX_PACKET_SIZE - packet->size(); // ??? + } + } + +} + + + + + + + + + + + /// Version of voxel distributor that sends the deepest LOD level at once int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrustumChanged) { diff --git a/assignment-client/src/octree/OctreeSendThread.h b/assignment-client/src/octree/OctreeSendThread.h index d8eed27802..cc0cdac3ad 100644 --- a/assignment-client/src/octree/OctreeSendThread.h +++ b/assignment-client/src/octree/OctreeSendThread.h @@ -55,6 +55,9 @@ private: int _nodeMissingCount; bool _isShuttingDown; + +int resendNackedPackets(OctreeQueryNode* nodeData); + }; #endif // hifi_OctreeSendThread_h diff --git a/assignment-client/src/octree/OctreeServer.cpp b/assignment-client/src/octree/OctreeServer.cpp index 5769c15ef1..f60f0fc2d5 100644 --- a/assignment-client/src/octree/OctreeServer.cpp +++ b/assignment-client/src/octree/OctreeServer.cpp @@ -832,14 +832,14 @@ void OctreeServer::readPendingDatagrams() { PacketType packetType = packetTypeForPacket(receivedPacket); SharedNodePointer matchingNode = nodeList->sendingNodeForPacket(receivedPacket); if (packetType == getMyQueryMessageType()) { - + // If we got a query packet, then we're talking to an agent, and we // need to make sure we have it in our nodeList. if (matchingNode) { nodeList->updateNodeWithDataFromPacket(matchingNode, receivedPacket); - OctreeQueryNode* nodeData = (OctreeQueryNode*) matchingNode->getLinkedData(); + OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData(); if (nodeData && !nodeData->isOctreeSendThreadInitalized()) { - + // NOTE: this is an important aspect of the proper ref counting. The send threads/node data need to // know that the OctreeServer/Assignment will not get deleted on it while it's still active. The // solution is to get the shared pointer for the current assignment. We need to make sure this is the @@ -848,6 +848,28 @@ void OctreeServer::readPendingDatagrams() { nodeData->initializeOctreeSendThread(sharedAssignment, matchingNode); } } + + } else if (packetType == PacketTypeOctreeDataNack) { + +// parse packet for sequence numbers that need to be resent +int numBytesPacketHeader = numBytesForPacketHeader(receivedPacket); +const unsigned char* dataAt = reinterpret_cast(receivedPacket.data()) + numBytesPacketHeader; + +uint16_t numSequenceNumbers = (*(uint16_t*)dataAt); +dataAt += sizeof(uint16_t); + +// read sequence numbers +QList sequenceNumbers; +for (int i = 0; i < numSequenceNumbers; i++) { + sequenceNumbers.append(*(OCTREE_PACKET_SEQUENCE*)dataAt); + dataAt += sizeof(OCTREE_PACKET_SEQUENCE); +} + +OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData(); // move this or something +nodeData->addSequenceNumbersToResend(sequenceNumbers); + + + } else if (packetType == PacketTypeJurisdictionRequest) { _jurisdictionSender->queueReceivedPacket(matchingNode, receivedPacket); } else if (_octreeInboundPacketProcessor && getOctree()->handlesEditPacketType(packetType)) { diff --git a/assignment-client/src/octree/SentPacketHistory.cpp b/assignment-client/src/octree/SentPacketHistory.cpp new file mode 100644 index 0000000000..0de163a23d --- /dev/null +++ b/assignment-client/src/octree/SentPacketHistory.cpp @@ -0,0 +1,45 @@ +// +// SentPacketHistory.cpp +// assignement-client/src/octree +// +// Created by Yixin Wang on 6/5/2014 +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#include "SentPacketHistory.h" + +SentPacketHistory::SentPacketHistory(int size) + : _sentPackets(size), + _newestPacketAt(0), + _numExistingPackets(0), + _newestSequenceNumber(0) +{ +} + +void SentPacketHistory::packetSent(OCTREE_PACKET_SEQUENCE sequenceNumber, const QByteArray& packet) { + _newestSequenceNumber = sequenceNumber; + + // increment _newestPacketAt cyclically, insert new packet there + _newestPacketAt = (_newestPacketAt == _sentPackets.size() - 1) ? 0 : _newestPacketAt + 1; + _sentPackets[_newestPacketAt] = packet; + + if (_numExistingPackets < _sentPackets.size()) { + _numExistingPackets++; + } +} + + +const QByteArray* SentPacketHistory::getPacket(OCTREE_PACKET_SEQUENCE sequenceNumber) const { + + OCTREE_PACKET_SEQUENCE seqDiff = _newestSequenceNumber - sequenceNumber; + if (!(seqDiff >= 0 && seqDiff < _numExistingPackets)) { + return NULL; + } + + int packetAt = _newestPacketAt - seqDiff; + if (packetAt < 0) { packetAt += _sentPackets.size(); } + + return &_sentPackets.at(packetAt); +} \ No newline at end of file diff --git a/assignment-client/src/octree/SentPacketHistory.h b/assignment-client/src/octree/SentPacketHistory.h new file mode 100644 index 0000000000..cbc50bc73c --- /dev/null +++ b/assignment-client/src/octree/SentPacketHistory.h @@ -0,0 +1,36 @@ +// +// SentPacketHistory.h +// assignement-client/src/octree +// +// Created by Yixin Wang on 6/5/2014 +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#ifndef hifi_SentPacketHistory_h +#define hifi_SentPacketHistory_h + +#include +#include + +#include "OctreePacketData.h" + +class SentPacketHistory { + +public: + SentPacketHistory(int size); + + void packetSent(OCTREE_PACKET_SEQUENCE sequenceNumber, const QByteArray& packet); + const QByteArray* getPacket(OCTREE_PACKET_SEQUENCE sequenceNumber) const; + +private: + + QVector _sentPackets; // circular buffer + int _newestPacketAt; + int _numExistingPackets; + + OCTREE_PACKET_SEQUENCE _newestSequenceNumber; +}; + +#endif diff --git a/assignment-client/src/particles/ParticleServer.cpp b/assignment-client/src/particles/ParticleServer.cpp index 1dd65f11f3..e7a0f75dfd 100644 --- a/assignment-client/src/particles/ParticleServer.cpp +++ b/assignment-client/src/particles/ParticleServer.cpp @@ -106,7 +106,7 @@ int ParticleServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNo //qDebug() << "sending PacketType_PARTICLE_ERASE packetLength:" << packetLength; NodeList::getInstance()->writeDatagram((char*) outputBuffer, packetLength, SharedNodePointer(node)); - queryNode->incrementSequenceNumber(); + queryNode->packetSent(outputBuffer, packetLength); } nodeData->setLastDeletedParticlesSentAt(deletePacketSentAt); diff --git a/assignment-client/src/voxels/VoxelServer.cpp b/assignment-client/src/voxels/VoxelServer.cpp index 8f4a8bab36..34b01f529a 100644 --- a/assignment-client/src/voxels/VoxelServer.cpp +++ b/assignment-client/src/voxels/VoxelServer.cpp @@ -75,7 +75,7 @@ int VoxelServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodeP } NodeList::getInstance()->writeDatagram((char*) _tempOutputBuffer, envPacketLength, SharedNodePointer(node)); - queryNode->incrementSequenceNumber(); + queryNode->packetSent(_tempOutputBuffer, envPacketLength); return envPacketLength; } diff --git a/libraries/networking/src/PacketHeaders.h b/libraries/networking/src/PacketHeaders.h index 9c764f9f02..a73ffe8564 100644 --- a/libraries/networking/src/PacketHeaders.h +++ b/libraries/networking/src/PacketHeaders.h @@ -66,6 +66,7 @@ enum PacketType { PacketTypeModelAddOrEdit, PacketTypeModelErase, PacketTypeModelAddResponse, + PacketTypeOctreeDataNack }; typedef char PacketVersion;