add handling for short circuit loss in SendQueue

This commit is contained in:
Stephen Birarda 2016-03-23 17:29:46 -07:00
parent dbfea6ec82
commit 57da7c2ba7
6 changed files with 73 additions and 45 deletions

View file

@ -201,7 +201,7 @@ void DefaultCC::onTimeout() {
void DefaultCC::stopSlowStart() {
_slowStart = false;
if (_receiveRate > 0) {
// Set the sending rate to the receiving rate.
setPacketSendPeriod(USECS_PER_SECOND / _receiveRate);

View file

@ -103,6 +103,7 @@ SendQueue& Connection::getSendQueue() {
QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission);
QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive);
QObject::connect(_sendQueue.get(), &SendQueue::timeout, this, &Connection::queueTimeout);
QObject::connect(_sendQueue.get(), &SendQueue::shortCircuitLoss, this, &Connection::queueShortCircuitLoss);
// set defaults on the send queue from our congestion control object and estimatedTimeout()
_sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod);
@ -140,6 +141,12 @@ void Connection::queueTimeout() {
});
}
void Connection::queueShortCircuitLoss(quint32 sequenceNumber) {
updateCongestionControlAndSendQueue([this, sequenceNumber]{
_congestionControl->onLoss(SequenceNumber { sequenceNumber }, SequenceNumber { sequenceNumber });
});
}
void Connection::sendReliablePacket(std::unique_ptr<Packet> packet) {
Q_ASSERT_X(packet->isReliable(), "Connection::send", "Trying to send an unreliable packet reliably.");
getSendQueue().queuePacket(std::move(packet));

View file

@ -87,6 +87,7 @@ private slots:
void recordRetransmission();
void queueInactive();
void queueTimeout();
void queueShortCircuitLoss(quint32 sequenceNumber);
private:
void sendACK(bool wasCausedBySyncTimeout = true);

View file

@ -128,8 +128,8 @@ void SendQueue::stop() {
_emptyCondition.notify_one();
}
void SendQueue::sendPacket(const Packet& packet) {
_socket->writeDatagram(packet.getData(), packet.getDataSize(), _destination);
int SendQueue::sendPacket(const Packet& packet) {
return _socket->writeDatagram(packet.getData(), packet.getDataSize(), _destination);
}
void SendQueue::ack(SequenceNumber ack) {
@ -178,7 +178,7 @@ void SendQueue::nak(SequenceNumber start, SequenceNumber end) {
void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) {
// this is a response from the client, re-set our timeout expiry
_lastReceiverResponse = uint64_t(QDateTime::currentMSecsSinceEpoch());
{
std::lock_guard<std::mutex> nakLocker(_naksLock);
_naks.clear();
@ -232,15 +232,16 @@ SequenceNumber SendQueue::getNextSequenceNumber() {
return _currentSequenceNumber;
}
void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket, SequenceNumber sequenceNumber) {
bool SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket, SequenceNumber sequenceNumber) {
// write the sequence number and send the packet
newPacket->writeSequenceNumber(sequenceNumber);
sendPacket(*newPacket);
// Save packet/payload size before we move it
auto packetSize = newPacket->getDataSize();
auto payloadSize = newPacket->getPayloadSize();
auto bytesWritten = sendPacket(*newPacket);
{
// Insert the packet we have just sent in the sent list
QWriteLocker locker(&_sentLock);
@ -249,8 +250,24 @@ void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket,
entry.second.swap(newPacket);
}
Q_ASSERT_X(!newPacket, "SendQueue::sendNewPacketAndAddToSentList()", "Overriden packet in sent list");
emit packetSent(packetSize, payloadSize);
if (bytesWritten < 0) {
// this is a short-circuit loss - we failed to put this packet on the wire
// so immediately add it to the loss list
{
std::lock_guard<std::mutex> nakLocker(_naksLock);
_naks.append(sequenceNumber);
}
emit shortCircuitLoss(quint32(sequenceNumber));
return false;
} else {
return true;
}
}
void SendQueue::run() {
@ -285,12 +302,12 @@ void SendQueue::run() {
auto nextPacketTimestamp = p_high_resolution_clock::now();
while (_state == State::Running) {
bool sentAPacket = maybeResendPacket();
bool attemptedToSendPacket = maybeResendPacket();
// if we didn't find a packet to re-send AND we think we can fit a new packet on the wire
// (this is according to the current flow window size) then we send out a new packet
if (!sentAPacket) {
sentAPacket = maybeSendNewPacket();
if (!attemptedToSendPacket) {
attemptedToSendPacket = maybeSendNewPacket();
}
// since we're a while loop, give the thread a chance to process events
@ -300,7 +317,7 @@ void SendQueue::run() {
// If the send queue has been innactive, skip the sleep for
// Either _isRunning will have been set to false and we'll break
// Or something happened and we'll keep going
if (_state != State::Running || isInactive(sentAPacket)) {
if (_state != State::Running || isInactive(attemptedToSendPacket)) {
return;
}
@ -324,33 +341,34 @@ bool SendQueue::maybeSendNewPacket() {
// grab the first packet we will send
std::unique_ptr<Packet> firstPacket = _packets.takePacket();
Q_ASSERT(firstPacket);
std::unique_ptr<Packet> secondPacket;
bool shouldSendPairTail = false;
if (((uint32_t) nextNumber & 0xF) == 0) {
// the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets
// pull off a second packet if we can before we unlock
shouldSendPairTail = true;
secondPacket = _packets.takePacket();
// attempt to send the first packet
if (sendNewPacketAndAddToSentList(move(firstPacket), nextNumber)) {
std::unique_ptr<Packet> secondPacket;
bool shouldSendPairTail = false;
if (((uint32_t) nextNumber & 0xF) == 0) {
// the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets
// pull off a second packet if we can before we unlock
shouldSendPairTail = true;
secondPacket = _packets.takePacket();
}
// do we have a second in a pair to send as well?
if (secondPacket) {
sendNewPacketAndAddToSentList(move(secondPacket), getNextSequenceNumber());
} else if (shouldSendPairTail) {
// we didn't get a second packet to send in the probe pair
// send a control packet of type ProbePairTail so the receiver can still do
// proper bandwidth estimation
static auto pairTailPacket = ControlPacket::create(ControlPacket::ProbeTail);
_socket->writeBasePacket(*pairTailPacket, _destination);
}
}
// definitely send the first packet
sendNewPacketAndAddToSentList(move(firstPacket), nextNumber);
// do we have a second in a pair to send as well?
if (secondPacket) {
sendNewPacketAndAddToSentList(move(secondPacket), getNextSequenceNumber());
} else if (shouldSendPairTail) {
// we didn't get a second packet to send in the probe pair
// send a control packet of type ProbePairTail so the receiver can still do
// proper bandwidth estimation
static auto pairTailPacket = ControlPacket::create(ControlPacket::ProbeTail);
_socket->writeBasePacket(*pairTailPacket, _destination);
}
// We sent our packet(s), return here
// We attempted to send packet(s), return here
return true;
}
}
@ -375,8 +393,9 @@ bool SendQueue::maybeResendPacket() {
// see if we can find the packet to re-send
auto it = _sentPackets.find(resendNumber);
if (it != _sentPackets.end()) {
auto& entry = it->second;
// we found the packet - grab it
auto& resendPacket = *(entry.second);
@ -437,7 +456,7 @@ bool SendQueue::maybeResendPacket() {
return false;
}
bool SendQueue::isInactive(bool sentAPacket) {
bool SendQueue::isInactive(bool attemptedToSendPacket) {
// check for connection timeout first
// that will be the case if we have had 16 timeouts since hearing back from the client, and it has been
@ -462,7 +481,7 @@ bool SendQueue::isInactive(bool sentAPacket) {
return true;
}
if (!sentAPacket) {
if (!attemptedToSendPacket) {
// During our processing above we didn't send any packets
// If that is still the case we should use a condition_variable_any to sleep until we have data to handle.

View file

@ -79,6 +79,7 @@ signals:
void queueInactive();
void shortCircuitLoss(quint32 sequenceNumber);
void timeout();
private slots:
@ -91,13 +92,13 @@ private:
void sendHandshake();
void sendPacket(const Packet& packet);
void sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket, SequenceNumber sequenceNumber);
int sendPacket(const Packet& packet);
bool sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket, SequenceNumber sequenceNumber);
bool maybeSendNewPacket(); // Figures out what packet to send next
bool maybeResendPacket(); // Determines whether to resend a packet and which one
bool isInactive(bool sentAPacket);
bool isInactive(bool attemptedToSendPacket);
void deactivate(); // makes the queue inactive and cleans it up
bool isFlowWindowFull() const;

View file

@ -77,7 +77,7 @@ UDTTest::UDTTest(int& argc, char** argv) :
// randomize the seed for packet size randomization
srand(time(NULL));
_socket.bind(QHostAddress::AnyIPv4, _argumentParser.value(PORT_OPTION).toUInt());
qDebug() << "Test socket is listening on" << _socket.localPort();