From cabdee8391a6aa89ae4cd12dcc3058ca211ecfac Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Tue, 25 Aug 2015 17:12:03 -0700 Subject: [PATCH 1/7] send a handshake before first packet send --- libraries/networking/src/udt/Connection.cpp | 19 ++++++++++++++ libraries/networking/src/udt/Connection.h | 3 +++ .../networking/src/udt/ControlPacket.cpp | 2 +- libraries/networking/src/udt/ControlPacket.h | 4 ++- libraries/networking/src/udt/LossList.cpp | 3 ++- libraries/networking/src/udt/SendQueue.cpp | 25 +++++++++++++++++++ libraries/networking/src/udt/SendQueue.h | 3 +++ 7 files changed, 56 insertions(+), 3 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index 9f8b1eb3ee..d5b84ce070 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -400,6 +400,12 @@ void Connection::processControl(std::unique_ptr controlPacket) { case ControlPacket::TimeoutNAK: processTimeoutNAK(move(controlPacket)); break; + case ControlPacket::Handshake: + processHandshake(move(controlPacket)); + break; + case ControlPacket::HandshakeACK: + processHandshakeACK(move(controlPacket)); + break; } } @@ -589,6 +595,19 @@ void Connection::processNAK(std::unique_ptr controlPacket) { _stats.record(ConnectionStats::Stats::ReceivedNAK); } +void Connection::processHandshake(std::unique_ptr controlPacket) { + // immediately respond with a handshake ACK + static auto handshakeACK = ControlPacket::create(ControlPacket::HandshakeACK, 0); + _parentSocket->writeBasePacket(*handshakeACK, _destination); + + _hasReceivedHandshake = true; +} + +void Connection::processHandshakeACK(std::unique_ptr controlPacket) { + // hand off this handshake ACK to the send queue so it knows it can start sending + getSendQueue().handshakeACK(); +} + void Connection::processTimeoutNAK(std::unique_ptr controlPacket) { // Override SendQueue's LossList with the timeout NAK list getSendQueue().overrideNAKListFromPacket(*controlPacket); diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index 307c90eda5..8b52b1cf5e 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -90,6 +90,8 @@ private: void processACK2(std::unique_ptr controlPacket); void processNAK(std::unique_ptr controlPacket); void processTimeoutNAK(std::unique_ptr controlPacket); + void processHandshake(std::unique_ptr controlPacket); + void processHandshakeACK(std::unique_ptr controlPacket); SendQueue& getSendQueue(); SequenceNumber nextACK() const; @@ -106,6 +108,7 @@ private: std::chrono::high_resolution_clock::time_point _lastNAKTime; bool _hasReceivedFirstPacket { false }; + bool _hasReceivedHandshake { false }; // flag for receipt of handshake from server LossList _lossList; // List of all missing packets SequenceNumber _lastReceivedSequenceNumber; // The largest sequence number received from the peer diff --git a/libraries/networking/src/udt/ControlPacket.cpp b/libraries/networking/src/udt/ControlPacket.cpp index b8407d42a4..8c799f97a6 100644 --- a/libraries/networking/src/udt/ControlPacket.cpp +++ b/libraries/networking/src/udt/ControlPacket.cpp @@ -100,7 +100,7 @@ void ControlPacket::readType() { Q_ASSERT_X(bitAndType & CONTROL_BIT_MASK, "ControlPacket::readHeader()", "This should be a control packet"); uint16_t packetType = (bitAndType & ~CONTROL_BIT_MASK) >> (8 * sizeof(Type)); - Q_ASSERT_X(packetType <= ControlPacket::Type::TimeoutNAK, "ControlPacket::readType()", "Received a control packet with wrong type"); + Q_ASSERT_X(packetType <= ControlPacket::Type::HandshakeACK, "ControlPacket::readType()", "Received a control packet with wrong type"); // read the type _type = (Type) packetType; diff --git a/libraries/networking/src/udt/ControlPacket.h b/libraries/networking/src/udt/ControlPacket.h index 83fab5bb12..bcc559f4f6 100644 --- a/libraries/networking/src/udt/ControlPacket.h +++ b/libraries/networking/src/udt/ControlPacket.h @@ -30,7 +30,9 @@ public: ACK, ACK2, NAK, - TimeoutNAK + TimeoutNAK, + Handshake, + HandshakeACK }; static std::unique_ptr create(Type type, qint64 size = -1); diff --git a/libraries/networking/src/udt/LossList.cpp b/libraries/networking/src/udt/LossList.cpp index c02d12c34a..846347142e 100644 --- a/libraries/networking/src/udt/LossList.cpp +++ b/libraries/networking/src/udt/LossList.cpp @@ -169,7 +169,8 @@ SequenceNumber LossList::popFirstSequenceNumber() { void LossList::write(ControlPacket& packet, int maxPairs) { int writtenPairs = 0; - for(const auto& pair : _lossList) { + + for (const auto& pair : _lossList) { packet.writePrimitive(pair.first); packet.writePrimitive(pair.second); diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index cfb969a186..27f25442ea 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -195,6 +195,31 @@ void SendQueue::run() { // Record timing _lastSendTimestamp = high_resolution_clock::now(); + if (!_hasReceivedHandshakeACK) { + // we haven't received a handshake ACK from the client + // if it has been at least 100ms since we last sent a handshake, send another now + + // hold the time of last send in a static + static auto lastSendHandshake = high_resolution_clock::time_point(); + + static const int HANDSHAKE_RESEND_INTERVAL_MS = 100; + + // calculation the duration since the last handshake send + auto sinceLastHandshake = duration_cast(high_resolution_clock::now() - lastSendHandshake); + + if (sinceLastHandshake.count() >= HANDSHAKE_RESEND_INTERVAL_MS) { + + // it has been long enough since last handshake, send another + static auto handshakePacket = ControlPacket::create(ControlPacket::Handshake, 0); + _socket->writeBasePacket(*handshakePacket, _destination); + + lastSendHandshake = high_resolution_clock::now(); + } + + // skip over this iteration since we haven't completed the handshake + continue; + } + bool resentPacket = false; // the following while makes sure that we find a packet to re-send, if there is one diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 901a9f7a87..08d0f697ae 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -59,6 +59,7 @@ public slots: void ack(SequenceNumber ack); void nak(SequenceNumber start, SequenceNumber end); void overrideNAKListFromPacket(ControlPacket& packet); + void handshakeACK() { _hasReceivedHandshakeACK = true; } signals: void packetSent(int dataSize, int payloadSize); @@ -85,6 +86,8 @@ private: Socket* _socket { nullptr }; // Socket to send packet on HifiSockAddr _destination; // Destination addr + std::atomic _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client + std::atomic _lastACKSequenceNumber; // Last ACKed sequence number MessageNumber _currentMessageNumber { 0 }; From 83d76084f1f4c3e97fc65c0367c0dd6598db5d56 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Tue, 25 Aug 2015 17:14:41 -0700 Subject: [PATCH 2/7] don't process packets in Connection without handshake --- libraries/networking/src/udt/Connection.cpp | 11 +++++++++++ libraries/networking/src/udt/Connection.h | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index d5b84ce070..044eb0780f 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -313,6 +313,11 @@ SequenceNumber Connection::nextACK() const { bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, int packetSize, int payloadSize) { + if (!_hasReceivedHandshake) { + // refuse to process any packets until we've received the handshake + return false; + } + _hasReceivedFirstPacket = true; // check if this is a packet pair we should estimate bandwidth from, or just a regular packet @@ -382,6 +387,12 @@ bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, in } void Connection::processControl(std::unique_ptr controlPacket) { + + if (!_hasReceivedHandshake && controlPacket->getType() != ControlPacket::Handshake) { + // we refuse to process any packets until the handshake is received + return; + } + // Simple dispatch to control packets processing methods based on their type switch (controlPacket->getType()) { case ControlPacket::ACK: diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index 8b52b1cf5e..e1933a0f02 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -63,7 +63,7 @@ public: void sync(); // rate control method, fired by Socket for all connections on SYN interval - // return indicates if this packet was a duplicate + // return indicates if this packet should be processed bool processReceivedSequenceNumber(SequenceNumber sequenceNumber, int packetSize, int payloadSize); void processControl(std::unique_ptr controlPacket); From 82f5e2e04ff53630ea0e9c68b82a8527b72c4e90 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Tue, 25 Aug 2015 17:26:25 -0700 Subject: [PATCH 3/7] skip processing of control packets before handshake --- libraries/networking/src/udt/Connection.cpp | 35 ++++++++++++++------- libraries/networking/src/udt/Connection.h | 1 + libraries/networking/src/udt/SendQueue.cpp | 7 +++-- 3 files changed, 28 insertions(+), 15 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index 044eb0780f..6f7ac8643d 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -388,28 +388,35 @@ bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, in void Connection::processControl(std::unique_ptr controlPacket) { - if (!_hasReceivedHandshake && controlPacket->getType() != ControlPacket::Handshake) { - // we refuse to process any packets until the handshake is received - return; - } + // Simple dispatch to control packets processing methods based on their type. + + // Processing of control packets (other than Handshake / Handshake ACK) + // is not performed if the handshake has not been completed. - // Simple dispatch to control packets processing methods based on their type switch (controlPacket->getType()) { case ControlPacket::ACK: - if (controlPacket->getPayloadSize() == sizeof(SequenceNumber)) { - processLightACK(move(controlPacket)); - } else { - processACK(move(controlPacket)); + if (_hasReceivedHandshakeACK) { + if (controlPacket->getPayloadSize() == sizeof(SequenceNumber)) { + processLightACK(move(controlPacket)); + } else { + processACK(move(controlPacket)); + } } break; case ControlPacket::ACK2: - processACK2(move(controlPacket)); + if (_hasReceivedHandshake) { + processACK2(move(controlPacket)); + } break; case ControlPacket::NAK: - processNAK(move(controlPacket)); + if (_hasReceivedHandshakeACK) { + processNAK(move(controlPacket)); + } break; case ControlPacket::TimeoutNAK: - processTimeoutNAK(move(controlPacket)); + if (_hasReceivedHandshakeACK) { + processTimeoutNAK(move(controlPacket)); + } break; case ControlPacket::Handshake: processHandshake(move(controlPacket)); @@ -611,12 +618,16 @@ void Connection::processHandshake(std::unique_ptr controlPacket) static auto handshakeACK = ControlPacket::create(ControlPacket::HandshakeACK, 0); _parentSocket->writeBasePacket(*handshakeACK, _destination); + // indicate that handshake has been received _hasReceivedHandshake = true; } void Connection::processHandshakeACK(std::unique_ptr controlPacket) { // hand off this handshake ACK to the send queue so it knows it can start sending getSendQueue().handshakeACK(); + + // indicate that handshake ACK was received + _hasReceivedHandshakeACK = true; } void Connection::processTimeoutNAK(std::unique_ptr controlPacket) { diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index e1933a0f02..b37801a90f 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -109,6 +109,7 @@ private: bool _hasReceivedFirstPacket { false }; bool _hasReceivedHandshake { false }; // flag for receipt of handshake from server + bool _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client LossList _lossList; // List of all missing packets SequenceNumber _lastReceivedSequenceNumber; // The largest sequence number received from the peer diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 27f25442ea..29036b026c 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -216,8 +216,8 @@ void SendQueue::run() { lastSendHandshake = high_resolution_clock::now(); } - // skip over this iteration since we haven't completed the handshake - continue; + // we allow processing in this while to continue so that processEvents is called + // but we skip over sending of packets until _hasReceivedHandshakeACK is true } bool resentPacket = false; @@ -266,7 +266,8 @@ void SendQueue::run() { // if we didn't find a packet to re-send AND we think we can fit a new packet on the wire // (this is according to the current flow window size) then we send out a new packet - if (!resentPacket + if (_hasReceivedHandshakeACK + && !resentPacket && seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) { // we didn't re-send a packet, so time to send a new one From 0ce8e05f197e30264552c6969c98e588bce06f7b Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Tue, 25 Aug 2015 18:01:37 -0700 Subject: [PATCH 4/7] reset the receiver's state on handshake --- libraries/networking/src/udt/Connection.cpp | 62 ++++++++++++++++--- libraries/networking/src/udt/Connection.h | 5 +- .../networking/src/udt/PacketTimeWindow.cpp | 5 ++ .../networking/src/udt/PacketTimeWindow.h | 2 + 4 files changed, 63 insertions(+), 11 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index 6f7ac8643d..e0943a63f8 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -40,8 +40,7 @@ Connection::Connection(Socket* parentSocket, HifiSockAddr destination, std::uniq // setup default SYN, RTT and RTT Variance based on the SYN interval in CongestionControl object _synInterval = _congestionControl->synInterval(); - _rtt = _synInterval * 10; - _rttVariance = _rtt / 2; + resetRTT(); // set the initial RTT and flow window size on congestion control object _congestionControl->setRTT(_rtt); @@ -64,6 +63,11 @@ Connection::~Connection() { } } +void Connection::resetRTT() { + _rtt = _synInterval * 10; + _rttVariance = _rtt / 2; +} + SendQueue& Connection::getSendQueue() { if (!_sendQueue) { // Lasily create send queue @@ -120,13 +124,15 @@ void Connection::sync() { // we send out a periodic ACK every rate control interval sendACK(); - // check if we need to re-transmit a loss list - // we do this if it has been longer than the current nakInterval since we last sent - auto now = high_resolution_clock::now(); - - if (duration_cast(now - _lastNAKTime).count() >= _nakInterval) { - // Send a timeout NAK packet - sendTimeoutNAK(); + if (_lossList.getLength() > 0) { + // check if we need to re-transmit a loss list + // we do this if it has been longer than the current nakInterval since we last sent + auto now = high_resolution_clock::now(); + + if (duration_cast(now - _lastNAKTime).count() >= _nakInterval) { + // Send a timeout NAK packet + sendTimeoutNAK(); + } } } } @@ -349,7 +355,7 @@ bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, in _nakInterval += (int) (_lossList.getLength() * (USECS_PER_SECOND / receivedPacketsPerSecond)); } - // the NAK interval is at least the _minNAKInterval but might be the estimated timeout + // the NAK interval is at least the _minNAKInterval but might be the value calculated above, if that is larger _nakInterval = std::max(_nakInterval, _minNAKInterval); } @@ -614,6 +620,9 @@ void Connection::processNAK(std::unique_ptr controlPacket) { } void Connection::processHandshake(std::unique_ptr controlPacket) { + // server sent us a handshake - we need to assume this means state should be reset + resetReceiveState(); + // immediately respond with a handshake ACK static auto handshakeACK = ControlPacket::create(ControlPacket::HandshakeACK, 0); _parentSocket->writeBasePacket(*handshakeACK, _destination); @@ -640,6 +649,39 @@ void Connection::processTimeoutNAK(std::unique_ptr controlPacket) _stats.record(ConnectionStats::Stats::ReceivedTimeoutNAK); } +void Connection::resetReceiveState() { + // TODO: this should also reset any queued messages we might be processing + + // reset all SequenceNumber member variables back to default + SequenceNumber defaultSequenceNumber; + + _lastReceivedSequenceNumber = defaultSequenceNumber; + + _lastReceivedAcknowledgedACK = defaultSequenceNumber; + _currentACKSubSequenceNumber = defaultSequenceNumber; + + _lastSentACK = defaultSequenceNumber; + + // clear the loss list and _lastNAKTime + _lossList.clear(); + _lastNAKTime = high_resolution_clock::time_point(); + + // the _nakInterval need not be reset, that will happen on loss + + // clear sync variables + _hasReceivedFirstPacket = false; + + _acksDuringSYN = 1; + _lightACKsDuringSYN = 1; + _packetsSinceACK = 0; + + // reset RTT to initial value + resetRTT(); + + // clear the intervals in the receive window + _receiveWindow.reset(); +} + void Connection::updateRTT(int rtt) { // This updates the RTT using exponential weighted moving average // This is the Jacobson's forumla for RTT estimation diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index b37801a90f..0babfb76e1 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -93,6 +93,9 @@ private: void processHandshake(std::unique_ptr controlPacket); void processHandshakeACK(std::unique_ptr controlPacket); + void resetReceiveState(); + void resetRTT(); + SendQueue& getSendQueue(); SequenceNumber nextACK() const; void updateRTT(int rtt); @@ -103,7 +106,7 @@ private: int _synInterval; // Periodical Rate Control Interval, in microseconds - int _nakInterval; // NAK timeout interval, in microseconds + int _nakInterval { -1 }; // NAK timeout interval, in microseconds, set on loss int _minNAKInterval { 100000 }; // NAK timeout interval lower bound, default of 100ms std::chrono::high_resolution_clock::time_point _lastNAKTime; diff --git a/libraries/networking/src/udt/PacketTimeWindow.cpp b/libraries/networking/src/udt/PacketTimeWindow.cpp index 5c389f6d26..00eb43c7e6 100644 --- a/libraries/networking/src/udt/PacketTimeWindow.cpp +++ b/libraries/networking/src/udt/PacketTimeWindow.cpp @@ -31,6 +31,11 @@ PacketTimeWindow::PacketTimeWindow(int numPacketIntervals, int numProbeIntervals } +void PacketTimeWindow::reset() { + _packetIntervals.assign(_numPacketIntervals, DEFAULT_PACKET_INTERVAL_MICROSECONDS); + _probeIntervals.assign(_numProbeIntervals, DEFAULT_PROBE_INTERVAL_MICROSECONDS); +} + template int median(Iterator begin, Iterator end) { // use std::nth_element to grab the middle - for an even number of elements this is the upper middle diff --git a/libraries/networking/src/udt/PacketTimeWindow.h b/libraries/networking/src/udt/PacketTimeWindow.h index 25e3df8a43..a8a4e0c8b5 100644 --- a/libraries/networking/src/udt/PacketTimeWindow.h +++ b/libraries/networking/src/udt/PacketTimeWindow.h @@ -29,6 +29,8 @@ public: int32_t getPacketReceiveSpeed() const; int32_t getEstimatedBandwidth() const; + + void reset(); private: int _numPacketIntervals { 0 }; // the number of packet intervals to store int _numProbeIntervals { 0 }; // the number of probe intervals to store From 54f2dc54f6a912e9159716fd6a804e2897392769 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Tue, 25 Aug 2015 18:23:50 -0700 Subject: [PATCH 5/7] add a condition_variable wait_until to not lock for handshake --- libraries/networking/src/udt/SendQueue.cpp | 18 ++++++++++++++++-- libraries/networking/src/udt/SendQueue.h | 10 +++++++--- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 29036b026c..5ee2955ebd 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -158,6 +158,12 @@ void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) { } } +void SendQueue::handshakeACK() { + std::unique_lock locker(_handshakeMutex); + _hasReceivedHandshakeACK = true; + _handshakeACKCondition.notify_one(); +} + SequenceNumber SendQueue::getNextSequenceNumber() { _atomicCurrentSequenceNumber = (SequenceNumber::Type)++_currentSequenceNumber; return _currentSequenceNumber; @@ -195,6 +201,8 @@ void SendQueue::run() { // Record timing _lastSendTimestamp = high_resolution_clock::now(); + std::unique_lock handshakeLock { _handshakeMutex }; + if (!_hasReceivedHandshakeACK) { // we haven't received a handshake ACK from the client // if it has been at least 100ms since we last sent a handshake, send another now @@ -216,10 +224,16 @@ void SendQueue::run() { lastSendHandshake = high_resolution_clock::now(); } - // we allow processing in this while to continue so that processEvents is called - // but we skip over sending of packets until _hasReceivedHandshakeACK is true + // we wait for the ACK or the re-send interval to expire + _handshakeACKCondition.wait_until(handshakeLock, + high_resolution_clock::now() + milliseconds(HANDSHAKE_RESEND_INTERVAL_MS)); + + // Once we're here we've either received the handshake ACK or it's going to be time to re-send a handshake. + // Either way let's continue processing - no packets will be sent if no handshake ACK has been received. } + handshakeLock.unlock(); + bool resentPacket = false; // the following while makes sure that we find a packet to re-send, if there is one diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 08d0f697ae..5590e1faa7 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -13,7 +13,9 @@ #define hifi_SendQueue_h #include +#include #include +#include #include #include @@ -59,7 +61,7 @@ public slots: void ack(SequenceNumber ack); void nak(SequenceNumber start, SequenceNumber end); void overrideNAKListFromPacket(ControlPacket& packet); - void handshakeACK() { _hasReceivedHandshakeACK = true; } + void handshakeACK(); signals: void packetSent(int dataSize, int payloadSize); @@ -86,8 +88,6 @@ private: Socket* _socket { nullptr }; // Socket to send packet on HifiSockAddr _destination; // Destination addr - std::atomic _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client - std::atomic _lastACKSequenceNumber; // Last ACKed sequence number MessageNumber _currentMessageNumber { 0 }; @@ -105,6 +105,10 @@ private: mutable QReadWriteLock _sentLock; // Protects the sent packet list std::unordered_map> _sentPackets; // Packets waiting for ACK. + + std::mutex _handshakeMutex; // Protects the handshake ACK condition_variable + bool _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client + std::condition_variable _handshakeACKCondition; }; } From d5e77ba907cbaceb3596216b13911617eff76ef8 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Tue, 25 Aug 2015 18:26:30 -0700 Subject: [PATCH 6/7] clear the pending received messages on handshake receive --- libraries/networking/src/udt/Connection.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index e0943a63f8..f1b69e0039 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -650,8 +650,6 @@ void Connection::processTimeoutNAK(std::unique_ptr controlPacket) } void Connection::resetReceiveState() { - // TODO: this should also reset any queued messages we might be processing - // reset all SequenceNumber member variables back to default SequenceNumber defaultSequenceNumber; @@ -680,6 +678,9 @@ void Connection::resetReceiveState() { // clear the intervals in the receive window _receiveWindow.reset(); + + // clear any pending received messages + _pendingReceivedMessages.clear(); } void Connection::updateRTT(int rtt) { From 1da9eeab1f5b22c3609fae802f6a03f80f1db248 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Thu, 27 Aug 2015 14:27:29 -0700 Subject: [PATCH 7/7] handle double handshake in Connection --- libraries/networking/src/udt/Connection.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index f1b69e0039..61cf7d6e74 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -620,8 +620,12 @@ void Connection::processNAK(std::unique_ptr controlPacket) { } void Connection::processHandshake(std::unique_ptr controlPacket) { - // server sent us a handshake - we need to assume this means state should be reset - resetReceiveState(); + + if (!_hasReceivedHandshake || _hasReceivedFirstPacket) { + // server sent us a handshake - we need to assume this means state should be reset + // as long as we haven't received a handshake yet or we have and we've received some data + resetReceiveState(); + } // immediately respond with a handshake ACK static auto handshakeACK = ControlPacket::create(ControlPacket::HandshakeACK, 0);