diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index c44c0f4221..a599b93af5 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -40,8 +40,7 @@ Connection::Connection(Socket* parentSocket, HifiSockAddr destination, std::uniq // setup default SYN, RTT and RTT Variance based on the SYN interval in CongestionControl object _synInterval = _congestionControl->synInterval(); - _rtt = _synInterval * 10; - _rttVariance = _rtt / 2; + resetRTT(); // set the initial RTT and flow window size on congestion control object _congestionControl->setRTT(_rtt); @@ -64,6 +63,11 @@ Connection::~Connection() { } } +void Connection::resetRTT() { + _rtt = _synInterval * 10; + _rttVariance = _rtt / 2; +} + SendQueue& Connection::getSendQueue() { if (!_sendQueue) { // Lasily create send queue @@ -125,13 +129,15 @@ void Connection::sync() { // we send out a periodic ACK every rate control interval sendACK(); - // check if we need to re-transmit a loss list - // we do this if it has been longer than the current nakInterval since we last sent - auto now = high_resolution_clock::now(); - - if (duration_cast(now - _lastNAKTime).count() >= _nakInterval) { - // Send a timeout NAK packet - sendTimeoutNAK(); + if (_lossList.getLength() > 0) { + // check if we need to re-transmit a loss list + // we do this if it has been longer than the current nakInterval since we last sent + auto now = high_resolution_clock::now(); + + if (duration_cast(now - _lastNAKTime).count() >= _nakInterval) { + // Send a timeout NAK packet + sendTimeoutNAK(); + } } } } @@ -318,6 +324,11 @@ SequenceNumber Connection::nextACK() const { bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, int packetSize, int payloadSize) { + if (!_hasReceivedHandshake) { + // refuse to process any packets until we've received the handshake + return false; + } + _hasReceivedFirstPacket = true; // check if this is a packet pair we should estimate bandwidth from, or just a regular packet @@ -349,7 +360,7 @@ bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, in _nakInterval += (int) (_lossList.getLength() * (USECS_PER_SECOND / receivedPacketsPerSecond)); } - // the NAK interval is at least the _minNAKInterval but might be the estimated timeout + // the NAK interval is at least the _minNAKInterval but might be the value calculated above, if that is larger _nakInterval = std::max(_nakInterval, _minNAKInterval); } @@ -387,23 +398,42 @@ bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, in } void Connection::processControl(std::unique_ptr controlPacket) { - // Simple dispatch to control packets processing methods based on their type + + // Simple dispatch to control packets processing methods based on their type. + + // Processing of control packets (other than Handshake / Handshake ACK) + // is not performed if the handshake has not been completed. + switch (controlPacket->getType()) { case ControlPacket::ACK: - if (controlPacket->getPayloadSize() == sizeof(SequenceNumber)) { - processLightACK(move(controlPacket)); - } else { - processACK(move(controlPacket)); + if (_hasReceivedHandshakeACK) { + if (controlPacket->getPayloadSize() == sizeof(SequenceNumber)) { + processLightACK(move(controlPacket)); + } else { + processACK(move(controlPacket)); + } } break; case ControlPacket::ACK2: - processACK2(move(controlPacket)); + if (_hasReceivedHandshake) { + processACK2(move(controlPacket)); + } break; case ControlPacket::NAK: - processNAK(move(controlPacket)); + if (_hasReceivedHandshakeACK) { + processNAK(move(controlPacket)); + } break; case ControlPacket::TimeoutNAK: - processTimeoutNAK(move(controlPacket)); + if (_hasReceivedHandshakeACK) { + processTimeoutNAK(move(controlPacket)); + } + break; + case ControlPacket::Handshake: + processHandshake(move(controlPacket)); + break; + case ControlPacket::HandshakeACK: + processHandshakeACK(move(controlPacket)); break; } } @@ -594,6 +624,30 @@ void Connection::processNAK(std::unique_ptr controlPacket) { _stats.record(ConnectionStats::Stats::ReceivedNAK); } +void Connection::processHandshake(std::unique_ptr controlPacket) { + + if (!_hasReceivedHandshake || _hasReceivedFirstPacket) { + // server sent us a handshake - we need to assume this means state should be reset + // as long as we haven't received a handshake yet or we have and we've received some data + resetReceiveState(); + } + + // immediately respond with a handshake ACK + static auto handshakeACK = ControlPacket::create(ControlPacket::HandshakeACK, 0); + _parentSocket->writeBasePacket(*handshakeACK, _destination); + + // indicate that handshake has been received + _hasReceivedHandshake = true; +} + +void Connection::processHandshakeACK(std::unique_ptr controlPacket) { + // hand off this handshake ACK to the send queue so it knows it can start sending + getSendQueue().handshakeACK(); + + // indicate that handshake ACK was received + _hasReceivedHandshakeACK = true; +} + void Connection::processTimeoutNAK(std::unique_ptr controlPacket) { // Override SendQueue's LossList with the timeout NAK list getSendQueue().overrideNAKListFromPacket(*controlPacket); @@ -604,6 +658,40 @@ void Connection::processTimeoutNAK(std::unique_ptr controlPacket) _stats.record(ConnectionStats::Stats::ReceivedTimeoutNAK); } +void Connection::resetReceiveState() { + // reset all SequenceNumber member variables back to default + SequenceNumber defaultSequenceNumber; + + _lastReceivedSequenceNumber = defaultSequenceNumber; + + _lastReceivedAcknowledgedACK = defaultSequenceNumber; + _currentACKSubSequenceNumber = defaultSequenceNumber; + + _lastSentACK = defaultSequenceNumber; + + // clear the loss list and _lastNAKTime + _lossList.clear(); + _lastNAKTime = high_resolution_clock::time_point(); + + // the _nakInterval need not be reset, that will happen on loss + + // clear sync variables + _hasReceivedFirstPacket = false; + + _acksDuringSYN = 1; + _lightACKsDuringSYN = 1; + _packetsSinceACK = 0; + + // reset RTT to initial value + resetRTT(); + + // clear the intervals in the receive window + _receiveWindow.reset(); + + // clear any pending received messages + _pendingReceivedMessages.clear(); +} + void Connection::updateRTT(int rtt) { // This updates the RTT using exponential weighted moving average // This is the Jacobson's forumla for RTT estimation diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index ef713ea0c5..20306b5515 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -63,7 +63,7 @@ public: void sync(); // rate control method, fired by Socket for all connections on SYN interval - // return indicates if this packet was a duplicate + // return indicates if this packet should be processed bool processReceivedSequenceNumber(SequenceNumber sequenceNumber, int packetSize, int payloadSize); void processControl(std::unique_ptr controlPacket); @@ -92,6 +92,11 @@ private: void processACK2(std::unique_ptr controlPacket); void processNAK(std::unique_ptr controlPacket); void processTimeoutNAK(std::unique_ptr controlPacket); + void processHandshake(std::unique_ptr controlPacket); + void processHandshakeACK(std::unique_ptr controlPacket); + + void resetReceiveState(); + void resetRTT(); SendQueue& getSendQueue(); SequenceNumber nextACK() const; @@ -103,11 +108,13 @@ private: int _synInterval; // Periodical Rate Control Interval, in microseconds - int _nakInterval; // NAK timeout interval, in microseconds + int _nakInterval { -1 }; // NAK timeout interval, in microseconds, set on loss int _minNAKInterval { 100000 }; // NAK timeout interval lower bound, default of 100ms std::chrono::high_resolution_clock::time_point _lastNAKTime; bool _hasReceivedFirstPacket { false }; + bool _hasReceivedHandshake { false }; // flag for receipt of handshake from server + bool _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client LossList _lossList; // List of all missing packets SequenceNumber _lastReceivedSequenceNumber; // The largest sequence number received from the peer diff --git a/libraries/networking/src/udt/ControlPacket.cpp b/libraries/networking/src/udt/ControlPacket.cpp index b8407d42a4..8c799f97a6 100644 --- a/libraries/networking/src/udt/ControlPacket.cpp +++ b/libraries/networking/src/udt/ControlPacket.cpp @@ -100,7 +100,7 @@ void ControlPacket::readType() { Q_ASSERT_X(bitAndType & CONTROL_BIT_MASK, "ControlPacket::readHeader()", "This should be a control packet"); uint16_t packetType = (bitAndType & ~CONTROL_BIT_MASK) >> (8 * sizeof(Type)); - Q_ASSERT_X(packetType <= ControlPacket::Type::TimeoutNAK, "ControlPacket::readType()", "Received a control packet with wrong type"); + Q_ASSERT_X(packetType <= ControlPacket::Type::HandshakeACK, "ControlPacket::readType()", "Received a control packet with wrong type"); // read the type _type = (Type) packetType; diff --git a/libraries/networking/src/udt/ControlPacket.h b/libraries/networking/src/udt/ControlPacket.h index 83fab5bb12..bcc559f4f6 100644 --- a/libraries/networking/src/udt/ControlPacket.h +++ b/libraries/networking/src/udt/ControlPacket.h @@ -30,7 +30,9 @@ public: ACK, ACK2, NAK, - TimeoutNAK + TimeoutNAK, + Handshake, + HandshakeACK }; static std::unique_ptr create(Type type, qint64 size = -1); diff --git a/libraries/networking/src/udt/LossList.cpp b/libraries/networking/src/udt/LossList.cpp index c02d12c34a..846347142e 100644 --- a/libraries/networking/src/udt/LossList.cpp +++ b/libraries/networking/src/udt/LossList.cpp @@ -169,7 +169,8 @@ SequenceNumber LossList::popFirstSequenceNumber() { void LossList::write(ControlPacket& packet, int maxPairs) { int writtenPairs = 0; - for(const auto& pair : _lossList) { + + for (const auto& pair : _lossList) { packet.writePrimitive(pair.first); packet.writePrimitive(pair.second); diff --git a/libraries/networking/src/udt/PacketTimeWindow.cpp b/libraries/networking/src/udt/PacketTimeWindow.cpp index 5c389f6d26..00eb43c7e6 100644 --- a/libraries/networking/src/udt/PacketTimeWindow.cpp +++ b/libraries/networking/src/udt/PacketTimeWindow.cpp @@ -31,6 +31,11 @@ PacketTimeWindow::PacketTimeWindow(int numPacketIntervals, int numProbeIntervals } +void PacketTimeWindow::reset() { + _packetIntervals.assign(_numPacketIntervals, DEFAULT_PACKET_INTERVAL_MICROSECONDS); + _probeIntervals.assign(_numProbeIntervals, DEFAULT_PROBE_INTERVAL_MICROSECONDS); +} + template int median(Iterator begin, Iterator end) { // use std::nth_element to grab the middle - for an even number of elements this is the upper middle diff --git a/libraries/networking/src/udt/PacketTimeWindow.h b/libraries/networking/src/udt/PacketTimeWindow.h index 25e3df8a43..a8a4e0c8b5 100644 --- a/libraries/networking/src/udt/PacketTimeWindow.h +++ b/libraries/networking/src/udt/PacketTimeWindow.h @@ -29,6 +29,8 @@ public: int32_t getPacketReceiveSpeed() const; int32_t getEstimatedBandwidth() const; + + void reset(); private: int _numPacketIntervals { 0 }; // the number of packet intervals to store int _numProbeIntervals { 0 }; // the number of probe intervals to store diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 9ba7b76a9c..d1baea9e26 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -191,6 +191,12 @@ void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) { _emptyCondition.notify_one(); } +void SendQueue::handshakeACK() { + std::unique_lock locker(_handshakeMutex); + _hasReceivedHandshakeACK = true; + _handshakeACKCondition.notify_one(); +} + SequenceNumber SendQueue::getNextSequenceNumber() { _atomicCurrentSequenceNumber = (SequenceNumber::Type)++_currentSequenceNumber; return _currentSequenceNumber; @@ -228,6 +234,41 @@ void SendQueue::run() { // Record how long the loop takes to execute auto loopStartTimestamp = high_resolution_clock::now(); + std::unique_lock handshakeLock { _handshakeMutex }; + + if (!_hasReceivedHandshakeACK) { + // we haven't received a handshake ACK from the client + // if it has been at least 100ms since we last sent a handshake, send another now + + // hold the time of last send in a static + static auto lastSendHandshake = high_resolution_clock::time_point(); + + static const auto HANDSHAKE_RESEND_INTERVAL_MS = std::chrono::milliseconds(100); + + // calculation the duration since the last handshake send + auto sinceLastHandshake = std::chrono::duration_cast(high_resolution_clock::now() + - lastSendHandshake); + + if (sinceLastHandshake >= HANDSHAKE_RESEND_INTERVAL_MS) { + + // it has been long enough since last handshake, send another + static auto handshakePacket = ControlPacket::create(ControlPacket::Handshake, 0); + _socket->writeBasePacket(*handshakePacket, _destination); + + lastSendHandshake = high_resolution_clock::now(); + } + + // we wait for the ACK or the re-send interval to expire + _handshakeACKCondition.wait_until(handshakeLock, + high_resolution_clock::now() + + HANDSHAKE_RESEND_INTERVAL_MS); + + // Once we're here we've either received the handshake ACK or it's going to be time to re-send a handshake. + // Either way let's continue processing - no packets will be sent if no handshake ACK has been received. + } + + handshakeLock.unlock(); + bool sentAPacket = maybeResendPacket(); bool flowWindowFull = false; @@ -253,7 +294,7 @@ void SendQueue::run() { break; } - if (!sentAPacket) { + if (_hasReceivedHandshakeACK && !sentAPacket) { static const std::chrono::seconds CONSIDER_INACTIVE_AFTER { 5 }; if (flowWindowFull && (high_resolution_clock::now() - _flowWindowFullSince) > CONSIDER_INACTIVE_AFTER) { @@ -313,6 +354,7 @@ bool SendQueue::maybeSendNewPacket() { if (((uint32_t) nextNumber & 0xF) == 0) { // the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets // pull off a second packet if we can before we unlock + if (_packets.size() > 0) { secondPacket.swap(_packets.front()); _packets.pop_front(); diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index ebc6702ac5..89dc2fae8f 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -63,6 +63,7 @@ public slots: void ack(SequenceNumber ack); void nak(SequenceNumber start, SequenceNumber end); void overrideNAKListFromPacket(ControlPacket& packet); + void handshakeACK(); signals: void packetSent(int dataSize, int payloadSize); @@ -115,6 +116,10 @@ private: mutable QReadWriteLock _sentLock; // Protects the sent packet list std::unordered_map> _sentPackets; // Packets waiting for ACK. + std::mutex _handshakeMutex; // Protects the handshake ACK condition_variable + bool _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client + std::condition_variable _handshakeACKCondition; + std::condition_variable_any _emptyCondition; };