repairs to cleanup of send queue

This commit is contained in:
Stephen Birarda 2015-08-28 14:19:20 -07:00
parent 9575b47e4e
commit b7d0aa062a
4 changed files with 29 additions and 13 deletions

View file

@ -48,6 +48,10 @@ Connection::Connection(Socket* parentSocket, HifiSockAddr destination, std::uniq
}
Connection::~Connection() {
stopSendQueue();
}
void Connection::stopSendQueue() {
if (_sendQueue) {
// grab the send queue thread so we can wait on it
QThread* sendQueueThread = _sendQueue->thread();
@ -73,6 +77,8 @@ SendQueue& Connection::getSendQueue() {
// Lasily create send queue
_sendQueue = SendQueue::create(_parentSocket, _destination);
qDebug() << "Created SendQueue for connection to" << _destination;
QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::packetSent);
QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::recordSentPackets);
QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission);
@ -88,7 +94,9 @@ SendQueue& Connection::getSendQueue() {
}
void Connection::queueInactive() {
emit connectionInactive(_destination);
// tell our current send queue to go down and reset our ptr to it to null
stopSendQueue();
qDebug() << "Connection to" << _destination << "has stopped its SendQueue.";
}
void Connection::sendReliablePacket(std::unique_ptr<Packet> packet) {

View file

@ -106,6 +106,8 @@ private:
void updateCongestionControlAndSendQueue(std::function<void()> congestionCallback);
void stopSendQueue();
int _synInterval; // Periodical Rate Control Interval, in microseconds
int _nakInterval { -1 }; // NAK timeout interval, in microseconds, set on loss

View file

@ -161,7 +161,8 @@ void SendQueue::ack(SequenceNumber ack) {
return;
}
{ // remove any ACKed packets from the map of sent packets
{
// remove any ACKed packets from the map of sent packets
QWriteLocker locker(&_sentLock);
for (auto seq = SequenceNumber { (uint32_t) _lastACKSequenceNumber }; seq <= ack; ++seq) {
_sentPackets.erase(seq);
@ -335,9 +336,7 @@ void SendQueue::run() {
qDebug() << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts and is"
<< "considered inactive. It is now being stopped.";
emit queueInactive();
_isRunning = false;
deactivate();
return;
} else {
@ -364,14 +363,11 @@ void SendQueue::run() {
if (cvStatus == std::cv_status::timeout) {
qDebug() << "SendQueue to" << _destination << "has been empty for"
<< EMPTY_QUEUES_INACTIVE_TIMEOUT.count()
<< "seconds and receiver has ACKed all packets."
<< "The queue is considered inactive and will be stopped.";
<< EMPTY_QUEUES_INACTIVE_TIMEOUT.count()
<< "seconds and receiver has ACKed all packets."
<< "The queue is considered inactive and will be stopped.";
// this queue is inactive - emit that signal and stop the while
emit queueInactive();
_isRunning = false;
deactivate();
return;
}
@ -464,11 +460,12 @@ bool SendQueue::maybeSendNewPacket() {
}
bool SendQueue::maybeResendPacket() {
std::unique_lock<std::mutex> naksLocker(_naksLock);
// the following while makes sure that we find a packet to re-send, if there is one
while (true) {
std::unique_lock<std::mutex> naksLocker(_naksLock);
if (_naks.getLength() > 0) {
// pull the sequence number we need to re-send
SequenceNumber resendNumber = _naks.popFirstSequenceNumber();
@ -507,3 +504,10 @@ bool SendQueue::maybeResendPacket() {
// No packet was resent
return false;
}
void SendQueue::deactivate() {
// this queue is inactive - emit that signal and stop the while
emit queueInactive();
_isRunning = false;
}

View file

@ -87,6 +87,8 @@ private:
bool maybeSendNewPacket(); // Figures out what packet to send next
bool maybeResendPacket(); // Determines whether to resend a packet and which one
void deactivate(); // makes the queue inactive and cleans it up
// Increments current sequence number and return it
SequenceNumber getNextSequenceNumber();
MessageNumber getNextMessageNumber();