started work on resending nacked packets

This commit is contained in:
wangyix 2014-06-05 18:00:58 -07:00
parent 124bd88b01
commit 1a7e3a859c
11 changed files with 196 additions and 10 deletions

View file

@ -106,7 +106,7 @@ int ModelServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodeP
//qDebug() << "sending PacketType_MODEL_ERASE packetLength:" << packetLength;
NodeList::getInstance()->writeDatagram((char*) outputBuffer, packetLength, SharedNodePointer(node));
queryNode->incrementSequenceNumber();
queryNode->packetSent(outputBuffer, packetLength);
}
nodeData->setLastDeletedModelsSentAt(deletePacketSentAt);

View file

@ -41,7 +41,8 @@ OctreeQueryNode::OctreeQueryNode() :
_sequenceNumber(0),
_lastRootTimestamp(0),
_myPacketType(PacketTypeUnknown),
_isShuttingDown(false)
_isShuttingDown(false),
_sentPacketHistory(1000)
{
}
@ -362,3 +363,35 @@ void OctreeQueryNode::dumpOutOfView() {
}
}
}
void OctreeQueryNode::packetSent() {
packetSent(_octreePacket, getPacketLength());
}
void OctreeQueryNode::packetSent(unsigned char* packet, int packetLength) {
packetSent(QByteArray((char*)packet, packetLength));
}
void OctreeQueryNode::packetSent(const QByteArray& packet) {
_sentPacketHistory.packetSent(_sequenceNumber, packet);
_sequenceNumber++;
}
void OctreeQueryNode::addSequenceNumbersToResend(const QList<OCTREE_PACKET_SEQUENCE>& sequenceNumbers) {
_sequenceNumbersToResend.append(sequenceNumbers);
}
bool OctreeQueryNode::hasNextPacketToResend() const {
return !_sequenceNumbersToResend.isEmpty();
}
const QByteArray* OctreeQueryNode::getNextPacketToResend() {
if (!_sequenceNumbersToResend.isEmpty()) {
const QByteArray* nextPacket = _sentPacketHistory.getPacket(_sequenceNumbersToResend.first());
_sequenceNumbersToResend.pop_front();
return nextPacket; // could be null
}
return NULL;
}

View file

@ -23,6 +23,9 @@
#include <OctreeQuery.h>
#include <OctreeSceneStats.h>
#include <ThreadedAssignment.h> // for SharedAssignmentPointer
#include "SentPacketHistory.h"
#include <qqueue.h> // i added dis
class OctreeSendThread;
@ -100,10 +103,16 @@ public:
void forceNodeShutdown();
bool isShuttingDown() const { return _isShuttingDown; }
void incrementSequenceNumber() { _sequenceNumber++; }
void packetSent();
void packetSent(unsigned char* packet, int packetLength);
void packetSent(const QByteArray& packet);
OCTREE_PACKET_SEQUENCE getSequenceNumber() const { return _sequenceNumber; }
void addSequenceNumbersToResend(const QList<OCTREE_PACKET_SEQUENCE>& sequenceNumbers);
bool hasNextPacketToResend() const;
const QByteArray* getNextPacketToResend();
private slots:
void sendThreadFinished();
@ -146,6 +155,9 @@ private:
PacketType _myPacketType;
bool _isShuttingDown;
SentPacketHistory _sentPacketHistory;
QQueue<OCTREE_PACKET_SEQUENCE> _sequenceNumbersToResend;
};
#endif // hifi_OctreeQueryNode_h

View file

@ -274,13 +274,47 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
trueBytesSent += nodeData->getPacketLength();
truePacketsSent++;
packetsSent++;
nodeData->incrementSequenceNumber();
nodeData->packetSent();
nodeData->resetOctreePacket();
}
return packetsSent;
}
int OctreeSendThread::resendNackedPackets(OctreeQueryNode* nodeData) {
int packetsSent = 0;
const QByteArray* packet;
while (nodeData->hasNextPacketToResend()) {
packet = nodeData->getNextPacketToResend();
if (packet) {
NodeList::getInstance()->writeDatagram(*packet, _node);
packetsSent++;
// ??????
_totalBytes += packet->size();
_totalPackets++;
_totalWastedBytes += MAX_PACKET_SIZE - packet->size(); // ???
}
}
}
/// Version of voxel distributor that sends the deepest LOD level at once
int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrustumChanged) {

View file

@ -55,6 +55,9 @@ private:
int _nodeMissingCount;
bool _isShuttingDown;
int resendNackedPackets(OctreeQueryNode* nodeData);
};
#endif // hifi_OctreeSendThread_h

View file

@ -832,14 +832,14 @@ void OctreeServer::readPendingDatagrams() {
PacketType packetType = packetTypeForPacket(receivedPacket);
SharedNodePointer matchingNode = nodeList->sendingNodeForPacket(receivedPacket);
if (packetType == getMyQueryMessageType()) {
// If we got a query packet, then we're talking to an agent, and we
// need to make sure we have it in our nodeList.
if (matchingNode) {
nodeList->updateNodeWithDataFromPacket(matchingNode, receivedPacket);
OctreeQueryNode* nodeData = (OctreeQueryNode*) matchingNode->getLinkedData();
OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData();
if (nodeData && !nodeData->isOctreeSendThreadInitalized()) {
// NOTE: this is an important aspect of the proper ref counting. The send threads/node data need to
// know that the OctreeServer/Assignment will not get deleted on it while it's still active. The
// solution is to get the shared pointer for the current assignment. We need to make sure this is the
@ -848,6 +848,28 @@ void OctreeServer::readPendingDatagrams() {
nodeData->initializeOctreeSendThread(sharedAssignment, matchingNode);
}
}
} else if (packetType == PacketTypeOctreeDataNack) {
// parse packet for sequence numbers that need to be resent
int numBytesPacketHeader = numBytesForPacketHeader(receivedPacket);
const unsigned char* dataAt = reinterpret_cast<const unsigned char*>(receivedPacket.data()) + numBytesPacketHeader;
uint16_t numSequenceNumbers = (*(uint16_t*)dataAt);
dataAt += sizeof(uint16_t);
// read sequence numbers
QList<OCTREE_PACKET_SEQUENCE> sequenceNumbers;
for (int i = 0; i < numSequenceNumbers; i++) {
sequenceNumbers.append(*(OCTREE_PACKET_SEQUENCE*)dataAt);
dataAt += sizeof(OCTREE_PACKET_SEQUENCE);
}
OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData(); // move this or something
nodeData->addSequenceNumbersToResend(sequenceNumbers);
} else if (packetType == PacketTypeJurisdictionRequest) {
_jurisdictionSender->queueReceivedPacket(matchingNode, receivedPacket);
} else if (_octreeInboundPacketProcessor && getOctree()->handlesEditPacketType(packetType)) {

View file

@ -0,0 +1,45 @@
//
// SentPacketHistory.cpp
// assignement-client/src/octree
//
// Created by Yixin Wang on 6/5/2014
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "SentPacketHistory.h"
SentPacketHistory::SentPacketHistory(int size)
: _sentPackets(size),
_newestPacketAt(0),
_numExistingPackets(0),
_newestSequenceNumber(0)
{
}
void SentPacketHistory::packetSent(OCTREE_PACKET_SEQUENCE sequenceNumber, const QByteArray& packet) {
_newestSequenceNumber = sequenceNumber;
// increment _newestPacketAt cyclically, insert new packet there
_newestPacketAt = (_newestPacketAt == _sentPackets.size() - 1) ? 0 : _newestPacketAt + 1;
_sentPackets[_newestPacketAt] = packet;
if (_numExistingPackets < _sentPackets.size()) {
_numExistingPackets++;
}
}
const QByteArray* SentPacketHistory::getPacket(OCTREE_PACKET_SEQUENCE sequenceNumber) const {
OCTREE_PACKET_SEQUENCE seqDiff = _newestSequenceNumber - sequenceNumber;
if (!(seqDiff >= 0 && seqDiff < _numExistingPackets)) {
return NULL;
}
int packetAt = _newestPacketAt - seqDiff;
if (packetAt < 0) { packetAt += _sentPackets.size(); }
return &_sentPackets.at(packetAt);
}

View file

@ -0,0 +1,36 @@
//
// SentPacketHistory.h
// assignement-client/src/octree
//
// Created by Yixin Wang on 6/5/2014
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_SentPacketHistory_h
#define hifi_SentPacketHistory_h
#include <qbytearray.h>
#include <qvector.h>
#include "OctreePacketData.h"
class SentPacketHistory {
public:
SentPacketHistory(int size);
void packetSent(OCTREE_PACKET_SEQUENCE sequenceNumber, const QByteArray& packet);
const QByteArray* getPacket(OCTREE_PACKET_SEQUENCE sequenceNumber) const;
private:
QVector<QByteArray> _sentPackets; // circular buffer
int _newestPacketAt;
int _numExistingPackets;
OCTREE_PACKET_SEQUENCE _newestSequenceNumber;
};
#endif

View file

@ -106,7 +106,7 @@ int ParticleServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNo
//qDebug() << "sending PacketType_PARTICLE_ERASE packetLength:" << packetLength;
NodeList::getInstance()->writeDatagram((char*) outputBuffer, packetLength, SharedNodePointer(node));
queryNode->incrementSequenceNumber();
queryNode->packetSent(outputBuffer, packetLength);
}
nodeData->setLastDeletedParticlesSentAt(deletePacketSentAt);

View file

@ -75,7 +75,7 @@ int VoxelServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodeP
}
NodeList::getInstance()->writeDatagram((char*) _tempOutputBuffer, envPacketLength, SharedNodePointer(node));
queryNode->incrementSequenceNumber();
queryNode->packetSent(_tempOutputBuffer, envPacketLength);
return envPacketLength;
}

View file

@ -66,6 +66,7 @@ enum PacketType {
PacketTypeModelAddOrEdit,
PacketTypeModelErase,
PacketTypeModelAddResponse,
PacketTypeOctreeDataNack
};
typedef char PacketVersion;