From c7ae4d5e59a2d5746647162f04ea8e96b0a6ab4e Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Tue, 28 Jul 2015 11:47:57 -0700 Subject: [PATCH] implement more of processACK in Connection --- libraries/networking/src/udt/Connection.cpp | 82 +++++++++++++++++++-- libraries/networking/src/udt/Connection.h | 13 +++- libraries/networking/src/udt/SendQueue.cpp | 1 + libraries/networking/src/udt/SendQueue.h | 4 + 4 files changed, 92 insertions(+), 8 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index 184597004e..abdfcbcdb8 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -39,6 +39,7 @@ void Connection::sendACK(bool wasCausedBySyncTimeout) { ackPacket->reset(); // We need to reset it every time. auto currentTime = high_resolution_clock::now(); + static high_resolution_clock::time_point lastACKSendTime; SeqNum nextACKNumber = nextACK(); @@ -58,7 +59,8 @@ void Connection::sendACK(bool wasCausedBySyncTimeout) { } else if (nextACKNumber == _lastSentACK) { // We already sent this ACK, but check if we should re-send it. // We will re-send if it has been more than RTT + (4 * RTT variance) since the last ACK - milliseconds sinceLastACK = duration_cast(currentTime - _lastACKTime); + milliseconds sinceLastACK = duration_cast(currentTime - lastACKSendTime); + if (sinceLastACK.count() < (_rtt + (4 * _rttVariance))) { return; } @@ -80,7 +82,7 @@ void Connection::sendACK(bool wasCausedBySyncTimeout) { // pack in the receive speed and bandwidth // record this as the last ACK send time - _lastACKTime = high_resolution_clock::now(); + lastACKSendTime = high_resolution_clock::now(); } // have the send queue send off our packet @@ -181,14 +183,84 @@ void Connection::processACK(std::unique_ptr controlPacket) { SeqNum currentACKSubSequenceNumber; controlPacket->readPrimitive(¤tACKSubSequenceNumber); - // read the ACK number - SeqNum nextACKNumber; - controlPacket->readPrimitive(&nextACKNumber); + // check if we need send an ACK2 for this ACK + auto currentTime = high_resolution_clock::now(); + static high_resolution_clock::time_point lastACK2SendTime; + + milliseconds sinceLastACK2 = duration_cast(currentTime - lastACK2SendTime); + + if (sinceLastACK2.count() > _synInterval || currentACKSubSequenceNumber == _lastSentACK2) { + // setup a static ACK2 packet we will re-use + static const int ACK2_PAYLOAD_BYTES = sizeof(SeqNum); + static auto ack2Packet = ControlPacket::create(ControlPacket::ACK2, ACK2_PAYLOAD_BYTES); + + // reset the ACK2 Packet before writing the sub-sequence number to it + ack2Packet->reset(); + + // write the sub sequence number for this ACK2 + ack2Packet->writePrimitive(currentACKSubSequenceNumber); + + // update the last sent ACK2 and the last ACK2 send time + _lastSentACK2 = currentACKSubSequenceNumber; + lastACK2SendTime = high_resolution_clock::now(); + } + + // read the ACKed sequence number + SeqNum ack; + controlPacket->readPrimitive(&ack); + + // validate that this isn't a BS ACK + if (ack > (_sendQueue->getCurrentSeqNum() + 1)) { + // in UDT they specifically break the connection here - do we want to do anything? + return; + } // read the RTT and variance int32_t rtt, rttVariance; controlPacket->readPrimitive(&rtt); controlPacket->readPrimitive(&rttVariance); + + // read the desired flow window size + int flowWindowSize; + controlPacket->readPrimitive(&flowWindowSize); + + if (ack <= _lastReceivedACK) { + // this is a valid ACKed sequence number - update the flow window size and the last received ACK + _flowWindowSize = flowWindowSize; + _lastReceivedACK = ack; + } + + // make sure this isn't a repeated ACK + if (ack <= SeqNum(_atomicLastReceivedACK)) { + return; + } + + // ACK the send queue so it knows what was received + _sendQueue->ack(ack); + + // update the atomic for last received ACK, the send queue uses this to re-transmit + _atomicLastReceivedACK.store((uint32_t) _lastReceivedACK); + + // remove everything up to this ACK from the sender loss list + + // update the RTT + _rttVariance = (_rttVariance * 3 + abs(rtt - _rtt)) >> 2; + _rtt = (_rtt * 7 + rtt) >> 3; + + // set the RTT for congestion control + + if (controlPacket->getPayloadSize() > (qint64) (sizeof(SeqNum) + sizeof(SeqNum) + sizeof(rtt) + sizeof(rttVariance))) { + int32_t deliveryRate, bandwidth; + controlPacket->readPrimitive(&deliveryRate); + controlPacket->readPrimitive(&bandwidth); + + // set the delivery rate and bandwidth for congestion control + } + + // fire the onACK callback for congestion control + + // update the total count of received ACKs + ++_totalReceivedACKs; } void Connection::processLightACK(std::unique_ptr controlPacket) { diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index f1d2210b2f..de60b88b37 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -38,6 +38,8 @@ public: SeqNum nextACK() const; + SeqNum getLastReceivedACK() const { return SeqNum(_atomicLastReceivedACK); } + void processReceivedSeqNum(SeqNum seq); void processControl(std::unique_ptr controlPacket); @@ -47,19 +49,24 @@ private: void processACK2(std::unique_ptr controlPacket); void processNAK(std::unique_ptr controlPacket); + int _synInterval; // Periodical Rate Control Interval, defaults to 10ms + LossList _lossList; // List of all missing packets SeqNum _largestReceivedSeqNum; // The largest sequence number received from the peer - SeqNum _lastSentACK; // The last sent ACK SeqNum _lastReceivedACK; // The last ACK received + std::atomic _atomicLastReceivedACK; // Atomic for thread-safe get of last ACK received SeqNum _lastReceivedAcknowledgedACK; // The last sent ACK that has been acknowledged via an ACK2 from the peer SeqNum _currentACKSubSequenceNumber; // The current ACK sub-sequence number (used for Acknowledgment of ACKs) + SeqNum _lastSentACK; // The last sent ACK + SeqNum _lastSentACK2; // The last sent ACK sub-sequence number in an ACK2 + + int _totalReceivedACKs { 0 }; + int32_t _rtt; // RTT, in milliseconds int32_t _rttVariance; // RTT variance int _flowWindowSize; // Flow control window size - std::chrono::high_resolution_clock::time_point _lastACKTime; - std::unique_ptr _sendQueue; }; diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 65c1807d19..93b90379c7 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -114,6 +114,7 @@ void SendQueue::sendNextPacket() { if (_nextPacket) { _nextPacket->writeSequenceNumber(++_currentSeqNum); sendPacket(*_nextPacket); + _atomicCurrentSeqNum.store((uint32_t) _currentSeqNum); // Insert the packet we have just sent in the sent list QWriteLocker locker(&_sentLock); diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 9c14c19ede..729ed9e305 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -42,6 +42,8 @@ public: quint64 getLastSendTimestamp() const { return _lastSendTimestamp; } + SeqNum getCurrentSeqNum() const { return SeqNum(_atomicCurrentSeqNum); } + int getPacketSendPeriod() const { return _packetSendPeriod; } void setPacketSendPeriod(int newPeriod) { _packetSendPeriod = newPeriod; } @@ -73,6 +75,8 @@ private: SeqNum _currentSeqNum; // Last sequence number sent out SeqNum _lastAck; // ACKed sequence number + std::atomic _atomicCurrentSeqNum; // Atomic for last sequence number sent out + std::unique_ptr _sendTimer; // Send timer std::atomic _packetSendPeriod { 0 }; // Interval between two packet send envent in msec std::atomic _lastSendTimestamp { 0 }; // Record last time of packet departure