mirror of
https://github.com/JulianGro/overte.git
synced 2025-04-25 21:15:07 +02:00
Merge pull request #7452 from birarda/udt-fixes
fix unecessary re-sends at capacity on udt, race in SendQueue timeout check
This commit is contained in:
commit
caf2595e13
6 changed files with 91 additions and 55 deletions
|
@ -201,7 +201,7 @@ void DefaultCC::onTimeout() {
|
||||||
|
|
||||||
void DefaultCC::stopSlowStart() {
|
void DefaultCC::stopSlowStart() {
|
||||||
_slowStart = false;
|
_slowStart = false;
|
||||||
|
|
||||||
if (_receiveRate > 0) {
|
if (_receiveRate > 0) {
|
||||||
// Set the sending rate to the receiving rate.
|
// Set the sending rate to the receiving rate.
|
||||||
setPacketSendPeriod(USECS_PER_SECOND / _receiveRate);
|
setPacketSendPeriod(USECS_PER_SECOND / _receiveRate);
|
||||||
|
|
|
@ -103,6 +103,7 @@ SendQueue& Connection::getSendQueue() {
|
||||||
QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission);
|
QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission);
|
||||||
QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive);
|
QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive);
|
||||||
QObject::connect(_sendQueue.get(), &SendQueue::timeout, this, &Connection::queueTimeout);
|
QObject::connect(_sendQueue.get(), &SendQueue::timeout, this, &Connection::queueTimeout);
|
||||||
|
QObject::connect(_sendQueue.get(), &SendQueue::shortCircuitLoss, this, &Connection::queueShortCircuitLoss);
|
||||||
|
|
||||||
// set defaults on the send queue from our congestion control object and estimatedTimeout()
|
// set defaults on the send queue from our congestion control object and estimatedTimeout()
|
||||||
_sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod);
|
_sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod);
|
||||||
|
@ -140,6 +141,12 @@ void Connection::queueTimeout() {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Connection::queueShortCircuitLoss(quint32 sequenceNumber) {
|
||||||
|
updateCongestionControlAndSendQueue([this, sequenceNumber]{
|
||||||
|
_congestionControl->onLoss(SequenceNumber { sequenceNumber }, SequenceNumber { sequenceNumber });
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
void Connection::sendReliablePacket(std::unique_ptr<Packet> packet) {
|
void Connection::sendReliablePacket(std::unique_ptr<Packet> packet) {
|
||||||
Q_ASSERT_X(packet->isReliable(), "Connection::send", "Trying to send an unreliable packet reliably.");
|
Q_ASSERT_X(packet->isReliable(), "Connection::send", "Trying to send an unreliable packet reliably.");
|
||||||
getSendQueue().queuePacket(std::move(packet));
|
getSendQueue().queuePacket(std::move(packet));
|
||||||
|
|
|
@ -87,6 +87,7 @@ private slots:
|
||||||
void recordRetransmission();
|
void recordRetransmission();
|
||||||
void queueInactive();
|
void queueInactive();
|
||||||
void queueTimeout();
|
void queueTimeout();
|
||||||
|
void queueShortCircuitLoss(quint32 sequenceNumber);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void sendACK(bool wasCausedBySyncTimeout = true);
|
void sendACK(bool wasCausedBySyncTimeout = true);
|
||||||
|
|
|
@ -128,13 +128,13 @@ void SendQueue::stop() {
|
||||||
_emptyCondition.notify_one();
|
_emptyCondition.notify_one();
|
||||||
}
|
}
|
||||||
|
|
||||||
void SendQueue::sendPacket(const Packet& packet) {
|
int SendQueue::sendPacket(const Packet& packet) {
|
||||||
_socket->writeDatagram(packet.getData(), packet.getDataSize(), _destination);
|
return _socket->writeDatagram(packet.getData(), packet.getDataSize(), _destination);
|
||||||
}
|
}
|
||||||
|
|
||||||
void SendQueue::ack(SequenceNumber ack) {
|
void SendQueue::ack(SequenceNumber ack) {
|
||||||
// this is a response from the client, re-set our timeout expiry and our last response time
|
// this is a response from the client, re-set our timeout expiry and our last response time
|
||||||
_lastReceiverResponse = uint64_t(QDateTime::currentMSecsSinceEpoch());
|
_lastReceiverResponse = QDateTime::currentMSecsSinceEpoch();
|
||||||
|
|
||||||
if (_lastACKSequenceNumber == (uint32_t) ack) {
|
if (_lastACKSequenceNumber == (uint32_t) ack) {
|
||||||
return;
|
return;
|
||||||
|
@ -164,7 +164,7 @@ void SendQueue::ack(SequenceNumber ack) {
|
||||||
|
|
||||||
void SendQueue::nak(SequenceNumber start, SequenceNumber end) {
|
void SendQueue::nak(SequenceNumber start, SequenceNumber end) {
|
||||||
// this is a response from the client, re-set our timeout expiry
|
// this is a response from the client, re-set our timeout expiry
|
||||||
_lastReceiverResponse = uint64_t(QDateTime::currentMSecsSinceEpoch());
|
_lastReceiverResponse = QDateTime::currentMSecsSinceEpoch();
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> nakLocker(_naksLock);
|
std::lock_guard<std::mutex> nakLocker(_naksLock);
|
||||||
|
@ -177,8 +177,8 @@ void SendQueue::nak(SequenceNumber start, SequenceNumber end) {
|
||||||
|
|
||||||
void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) {
|
void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) {
|
||||||
// this is a response from the client, re-set our timeout expiry
|
// this is a response from the client, re-set our timeout expiry
|
||||||
_lastReceiverResponse = uint64_t(QDateTime::currentMSecsSinceEpoch());
|
_lastReceiverResponse = QDateTime::currentMSecsSinceEpoch();
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> nakLocker(_naksLock);
|
std::lock_guard<std::mutex> nakLocker(_naksLock);
|
||||||
_naks.clear();
|
_naks.clear();
|
||||||
|
@ -232,15 +232,16 @@ SequenceNumber SendQueue::getNextSequenceNumber() {
|
||||||
return _currentSequenceNumber;
|
return _currentSequenceNumber;
|
||||||
}
|
}
|
||||||
|
|
||||||
void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket, SequenceNumber sequenceNumber) {
|
bool SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket, SequenceNumber sequenceNumber) {
|
||||||
// write the sequence number and send the packet
|
// write the sequence number and send the packet
|
||||||
newPacket->writeSequenceNumber(sequenceNumber);
|
newPacket->writeSequenceNumber(sequenceNumber);
|
||||||
sendPacket(*newPacket);
|
|
||||||
|
|
||||||
// Save packet/payload size before we move it
|
// Save packet/payload size before we move it
|
||||||
auto packetSize = newPacket->getDataSize();
|
auto packetSize = newPacket->getDataSize();
|
||||||
auto payloadSize = newPacket->getPayloadSize();
|
auto payloadSize = newPacket->getPayloadSize();
|
||||||
|
|
||||||
|
auto bytesWritten = sendPacket(*newPacket);
|
||||||
|
|
||||||
{
|
{
|
||||||
// Insert the packet we have just sent in the sent list
|
// Insert the packet we have just sent in the sent list
|
||||||
QWriteLocker locker(&_sentLock);
|
QWriteLocker locker(&_sentLock);
|
||||||
|
@ -249,8 +250,24 @@ void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket,
|
||||||
entry.second.swap(newPacket);
|
entry.second.swap(newPacket);
|
||||||
}
|
}
|
||||||
Q_ASSERT_X(!newPacket, "SendQueue::sendNewPacketAndAddToSentList()", "Overriden packet in sent list");
|
Q_ASSERT_X(!newPacket, "SendQueue::sendNewPacketAndAddToSentList()", "Overriden packet in sent list");
|
||||||
|
|
||||||
emit packetSent(packetSize, payloadSize);
|
emit packetSent(packetSize, payloadSize);
|
||||||
|
|
||||||
|
if (bytesWritten < 0) {
|
||||||
|
// this is a short-circuit loss - we failed to put this packet on the wire
|
||||||
|
// so immediately add it to the loss list
|
||||||
|
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> nakLocker(_naksLock);
|
||||||
|
_naks.append(sequenceNumber);
|
||||||
|
}
|
||||||
|
|
||||||
|
emit shortCircuitLoss(quint32(sequenceNumber));
|
||||||
|
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void SendQueue::run() {
|
void SendQueue::run() {
|
||||||
|
@ -285,12 +302,14 @@ void SendQueue::run() {
|
||||||
auto nextPacketTimestamp = p_high_resolution_clock::now();
|
auto nextPacketTimestamp = p_high_resolution_clock::now();
|
||||||
|
|
||||||
while (_state == State::Running) {
|
while (_state == State::Running) {
|
||||||
bool sentAPacket = maybeResendPacket();
|
bool attemptedToSendPacket = maybeResendPacket();
|
||||||
|
|
||||||
// if we didn't find a packet to re-send AND we think we can fit a new packet on the wire
|
// if we didn't find a packet to re-send AND we think we can fit a new packet on the wire
|
||||||
// (this is according to the current flow window size) then we send out a new packet
|
// (this is according to the current flow window size) then we send out a new packet
|
||||||
if (!sentAPacket) {
|
auto newPacketCount = 0;
|
||||||
sentAPacket = maybeSendNewPacket();
|
if (!attemptedToSendPacket) {
|
||||||
|
newPacketCount = maybeSendNewPacket();
|
||||||
|
attemptedToSendPacket = (newPacketCount > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
// since we're a while loop, give the thread a chance to process events
|
// since we're a while loop, give the thread a chance to process events
|
||||||
|
@ -300,12 +319,13 @@ void SendQueue::run() {
|
||||||
// If the send queue has been innactive, skip the sleep for
|
// If the send queue has been innactive, skip the sleep for
|
||||||
// Either _isRunning will have been set to false and we'll break
|
// Either _isRunning will have been set to false and we'll break
|
||||||
// Or something happened and we'll keep going
|
// Or something happened and we'll keep going
|
||||||
if (_state != State::Running || isInactive(sentAPacket)) {
|
if (_state != State::Running || isInactive(attemptedToSendPacket)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// push the next packet timestamp forwards by the current packet send period
|
// push the next packet timestamp forwards by the current packet send period
|
||||||
nextPacketTimestamp += std::chrono::microseconds(_packetSendPeriod);
|
auto nextPacketDelta = (newPacketCount == 2 ? 2 : 1) * _packetSendPeriod;
|
||||||
|
nextPacketTimestamp += std::chrono::microseconds(nextPacketDelta);
|
||||||
|
|
||||||
// sleep as long as we need until next packet send, if we can
|
// sleep as long as we need until next packet send, if we can
|
||||||
const auto timeToSleep = duration_cast<microseconds>(nextPacketTimestamp - p_high_resolution_clock::now());
|
const auto timeToSleep = duration_cast<microseconds>(nextPacketTimestamp - p_high_resolution_clock::now());
|
||||||
|
@ -314,7 +334,7 @@ void SendQueue::run() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool SendQueue::maybeSendNewPacket() {
|
int SendQueue::maybeSendNewPacket() {
|
||||||
if (!isFlowWindowFull()) {
|
if (!isFlowWindowFull()) {
|
||||||
// we didn't re-send a packet, so time to send a new one
|
// we didn't re-send a packet, so time to send a new one
|
||||||
|
|
||||||
|
@ -324,38 +344,43 @@ bool SendQueue::maybeSendNewPacket() {
|
||||||
// grab the first packet we will send
|
// grab the first packet we will send
|
||||||
std::unique_ptr<Packet> firstPacket = _packets.takePacket();
|
std::unique_ptr<Packet> firstPacket = _packets.takePacket();
|
||||||
Q_ASSERT(firstPacket);
|
Q_ASSERT(firstPacket);
|
||||||
|
|
||||||
std::unique_ptr<Packet> secondPacket;
|
|
||||||
bool shouldSendPairTail = false;
|
// attempt to send the first packet
|
||||||
|
if (sendNewPacketAndAddToSentList(move(firstPacket), nextNumber)) {
|
||||||
if (((uint32_t) nextNumber & 0xF) == 0) {
|
std::unique_ptr<Packet> secondPacket;
|
||||||
// the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets
|
bool shouldSendPairTail = false;
|
||||||
// pull off a second packet if we can before we unlock
|
|
||||||
shouldSendPairTail = true;
|
if (((uint32_t) nextNumber & 0xF) == 0) {
|
||||||
|
// the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets
|
||||||
secondPacket = _packets.takePacket();
|
// pull off a second packet if we can before we unlock
|
||||||
|
shouldSendPairTail = true;
|
||||||
|
|
||||||
|
secondPacket = _packets.takePacket();
|
||||||
|
}
|
||||||
|
|
||||||
|
// do we have a second in a pair to send as well?
|
||||||
|
if (secondPacket) {
|
||||||
|
sendNewPacketAndAddToSentList(move(secondPacket), getNextSequenceNumber());
|
||||||
|
} else if (shouldSendPairTail) {
|
||||||
|
// we didn't get a second packet to send in the probe pair
|
||||||
|
// send a control packet of type ProbePairTail so the receiver can still do
|
||||||
|
// proper bandwidth estimation
|
||||||
|
static auto pairTailPacket = ControlPacket::create(ControlPacket::ProbeTail);
|
||||||
|
_socket->writeBasePacket(*pairTailPacket, _destination);
|
||||||
|
}
|
||||||
|
|
||||||
|
// we attempted to send two packets, return 2
|
||||||
|
return 2;
|
||||||
|
} else {
|
||||||
|
// we attempted to send a single packet, return 1
|
||||||
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// definitely send the first packet
|
|
||||||
sendNewPacketAndAddToSentList(move(firstPacket), nextNumber);
|
|
||||||
|
|
||||||
// do we have a second in a pair to send as well?
|
|
||||||
if (secondPacket) {
|
|
||||||
sendNewPacketAndAddToSentList(move(secondPacket), getNextSequenceNumber());
|
|
||||||
} else if (shouldSendPairTail) {
|
|
||||||
// we didn't get a second packet to send in the probe pair
|
|
||||||
// send a control packet of type ProbePairTail so the receiver can still do
|
|
||||||
// proper bandwidth estimation
|
|
||||||
static auto pairTailPacket = ControlPacket::create(ControlPacket::ProbeTail);
|
|
||||||
_socket->writeBasePacket(*pairTailPacket, _destination);
|
|
||||||
}
|
|
||||||
|
|
||||||
// We sent our packet(s), return here
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// No packets were sent
|
// No packets were sent
|
||||||
return false;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool SendQueue::maybeResendPacket() {
|
bool SendQueue::maybeResendPacket() {
|
||||||
|
@ -375,8 +400,9 @@ bool SendQueue::maybeResendPacket() {
|
||||||
|
|
||||||
// see if we can find the packet to re-send
|
// see if we can find the packet to re-send
|
||||||
auto it = _sentPackets.find(resendNumber);
|
auto it = _sentPackets.find(resendNumber);
|
||||||
|
|
||||||
if (it != _sentPackets.end()) {
|
if (it != _sentPackets.end()) {
|
||||||
|
|
||||||
auto& entry = it->second;
|
auto& entry = it->second;
|
||||||
// we found the packet - grab it
|
// we found the packet - grab it
|
||||||
auto& resendPacket = *(entry.second);
|
auto& resendPacket = *(entry.second);
|
||||||
|
@ -437,7 +463,7 @@ bool SendQueue::maybeResendPacket() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool SendQueue::isInactive(bool sentAPacket) {
|
bool SendQueue::isInactive(bool attemptedToSendPacket) {
|
||||||
// check for connection timeout first
|
// check for connection timeout first
|
||||||
|
|
||||||
// that will be the case if we have had 16 timeouts since hearing back from the client, and it has been
|
// that will be the case if we have had 16 timeouts since hearing back from the client, and it has been
|
||||||
|
@ -447,7 +473,8 @@ bool SendQueue::isInactive(bool sentAPacket) {
|
||||||
|
|
||||||
auto sinceLastResponse = (QDateTime::currentMSecsSinceEpoch() - _lastReceiverResponse);
|
auto sinceLastResponse = (QDateTime::currentMSecsSinceEpoch() - _lastReceiverResponse);
|
||||||
|
|
||||||
if (sinceLastResponse >= quint64(NUM_TIMEOUTS_BEFORE_INACTIVE * (_estimatedTimeout / USECS_PER_MSEC)) &&
|
if (sinceLastResponse > 0 &&
|
||||||
|
sinceLastResponse >= int64_t(NUM_TIMEOUTS_BEFORE_INACTIVE * (_estimatedTimeout / USECS_PER_MSEC)) &&
|
||||||
_lastReceiverResponse > 0 &&
|
_lastReceiverResponse > 0 &&
|
||||||
sinceLastResponse > MIN_MS_BEFORE_INACTIVE) {
|
sinceLastResponse > MIN_MS_BEFORE_INACTIVE) {
|
||||||
// If the flow window has been full for over CONSIDER_INACTIVE_AFTER,
|
// If the flow window has been full for over CONSIDER_INACTIVE_AFTER,
|
||||||
|
@ -462,7 +489,7 @@ bool SendQueue::isInactive(bool sentAPacket) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!sentAPacket) {
|
if (!attemptedToSendPacket) {
|
||||||
// During our processing above we didn't send any packets
|
// During our processing above we didn't send any packets
|
||||||
|
|
||||||
// If that is still the case we should use a condition_variable_any to sleep until we have data to handle.
|
// If that is still the case we should use a condition_variable_any to sleep until we have data to handle.
|
||||||
|
|
|
@ -79,6 +79,7 @@ signals:
|
||||||
|
|
||||||
void queueInactive();
|
void queueInactive();
|
||||||
|
|
||||||
|
void shortCircuitLoss(quint32 sequenceNumber);
|
||||||
void timeout();
|
void timeout();
|
||||||
|
|
||||||
private slots:
|
private slots:
|
||||||
|
@ -91,13 +92,13 @@ private:
|
||||||
|
|
||||||
void sendHandshake();
|
void sendHandshake();
|
||||||
|
|
||||||
void sendPacket(const Packet& packet);
|
int sendPacket(const Packet& packet);
|
||||||
void sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket, SequenceNumber sequenceNumber);
|
bool sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket, SequenceNumber sequenceNumber);
|
||||||
|
|
||||||
bool maybeSendNewPacket(); // Figures out what packet to send next
|
int maybeSendNewPacket(); // Figures out what packet to send next
|
||||||
bool maybeResendPacket(); // Determines whether to resend a packet and which one
|
bool maybeResendPacket(); // Determines whether to resend a packet and which one
|
||||||
|
|
||||||
bool isInactive(bool sentAPacket);
|
bool isInactive(bool attemptedToSendPacket);
|
||||||
void deactivate(); // makes the queue inactive and cleans it up
|
void deactivate(); // makes the queue inactive and cleans it up
|
||||||
|
|
||||||
bool isFlowWindowFull() const;
|
bool isFlowWindowFull() const;
|
||||||
|
@ -122,7 +123,7 @@ private:
|
||||||
|
|
||||||
std::atomic<int> _estimatedTimeout { 0 }; // Estimated timeout, set from CC
|
std::atomic<int> _estimatedTimeout { 0 }; // Estimated timeout, set from CC
|
||||||
std::atomic<int> _syncInterval { udt::DEFAULT_SYN_INTERVAL_USECS }; // Sync interval, set from CC
|
std::atomic<int> _syncInterval { udt::DEFAULT_SYN_INTERVAL_USECS }; // Sync interval, set from CC
|
||||||
std::atomic<uint64_t> _lastReceiverResponse { 0 }; // Timestamp for the last time we got new data from the receiver (ACK/NAK)
|
std::atomic<int64_t> _lastReceiverResponse { 0 }; // Timestamp for the last time we got new data from the receiver (ACK/NAK)
|
||||||
|
|
||||||
std::atomic<int> _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC
|
std::atomic<int> _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC
|
||||||
|
|
||||||
|
|
|
@ -77,7 +77,7 @@ UDTTest::UDTTest(int& argc, char** argv) :
|
||||||
|
|
||||||
// randomize the seed for packet size randomization
|
// randomize the seed for packet size randomization
|
||||||
srand(time(NULL));
|
srand(time(NULL));
|
||||||
|
|
||||||
_socket.bind(QHostAddress::AnyIPv4, _argumentParser.value(PORT_OPTION).toUInt());
|
_socket.bind(QHostAddress::AnyIPv4, _argumentParser.value(PORT_OPTION).toUInt());
|
||||||
qDebug() << "Test socket is listening on" << _socket.localPort();
|
qDebug() << "Test socket is listening on" << _socket.localPort();
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue