From b2beeae6b1a445d50de6a565afcb274516b57740 Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Tue, 12 Dec 2017 17:16:07 -0800 Subject: [PATCH 1/7] Keep connection alive as long as the node is connected --- libraries/networking/src/udt/Connection.cpp | 82 ++++++++------------- libraries/networking/src/udt/Connection.h | 9 +-- libraries/networking/src/udt/SendQueue.cpp | 52 ++++++------- libraries/networking/src/udt/SendQueue.h | 8 +- libraries/networking/src/udt/Socket.cpp | 3 - 5 files changed, 57 insertions(+), 97 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index f42049f107..7a31bbeedc 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -11,6 +11,8 @@ #include "Connection.h" +#include + #include #include @@ -60,6 +62,15 @@ Connection::Connection(Socket* parentSocket, HifiSockAddr destination, std::uniq _ack2Packet = ControlPacket::create(ControlPacket::ACK2, ACK2_PAYLOAD_BYTES); _lossReport = ControlPacket::create(ControlPacket::NAK, NAK_PACKET_PAYLOAD_BYTES); _handshakeACK = ControlPacket::create(ControlPacket::HandshakeACK, HANDSHAKE_ACK_PAYLOAD_BYTES); + + + // setup psuedo-random number generation shared by all connections + static std::random_device rd; + static std::mt19937 generator(rd()); + static std::uniform_int_distribution<> distribution(0, SequenceNumber::MAX); + + // randomize the intial sequence number + _initialSequenceNumber = SequenceNumber(distribution(generator)); } Connection::~Connection() { @@ -81,9 +92,6 @@ void Connection::stopSendQueue() { sendQueue->stop(); sendQueue->deleteLater(); - // since we're stopping the send queue we should consider our handshake ACK not receieved - _hasReceivedHandshakeACK = false; - // wait on the send queue thread so we know the send queue is gone sendQueueThread->quit(); sendQueueThread->wait(); @@ -101,13 +109,22 @@ void Connection::setMaxBandwidth(int maxBandwidth) { SendQueue& Connection::getSendQueue() { if (!_sendQueue) { - // we may have a sequence number from the previous inactive queue - re-use that so that the // receiver is getting the sequence numbers it expects (given that the connection must still be active) // Lasily create send queue - _sendQueue = SendQueue::create(_parentSocket, _destination); - _lastReceivedACK = _sendQueue->getCurrentSequenceNumber(); + + if (!_hasReceivedHandshakeACK) { + // First time creating a send queue for this connection + _sendQueue = SendQueue::create(_parentSocket, _destination, _initialSequenceNumber - 1); + _lastReceivedACK = _sendQueue->getCurrentSequenceNumber(); + } else { + // Connection already has a handshake from a previous send queue + _sendQueue = SendQueue::create(_parentSocket, _destination, _lastReceivedACK); + // This connection has already gone through the handshake + // bypass it in the send queue + _sendQueue->handshakeACK(); + } #ifdef UDT_CONNECTION_DEBUG qCDebug(networking) << "Created SendQueue for connection to" << _destination; @@ -142,14 +159,6 @@ void Connection::queueInactive() { #ifdef UDT_CONNECTION_DEBUG qCDebug(networking) << "Connection to" << _destination << "has stopped its SendQueue."; #endif - - if (!_hasReceivedHandshake || !_isReceivingData) { -#ifdef UDT_CONNECTION_DEBUG - qCDebug(networking) << "Connection SendQueue to" << _destination << "stopped and no data is being received - stopping connection."; -#endif - - deactivate(); - } } void Connection::queueTimeout() { @@ -208,19 +217,6 @@ void Connection::sync() { && duration_cast(sincePacketReceive).count() >= MIN_SECONDS_BEFORE_EXPIRY ) { // the receive side of this connection is expired _isReceivingData = false; - - // if we don't have a send queue that means the whole connection has expired and we can emit our signal - // otherwise we'll wait for it to also timeout before cleaning up - if (!_sendQueue) { - -#ifdef UDT_CONNECTION_DEBUG - qCDebug(networking) << "Connection to" << _destination << "no longer receiving any data and there is currently no send queue - stopping connection."; -#endif - - deactivate(); - - return; - } } // reset the number of light ACKs or non SYN ACKs during this sync interval @@ -242,26 +238,6 @@ void Connection::sync() { sendTimeoutNAK(); } } - } else if (!_sendQueue) { - // we haven't received a packet and we're not sending - // this most likely means we were started erroneously - // check the start time for this connection and auto expire it after 5 seconds of not receiving or sending any data - static const int CONNECTION_NOT_USED_EXPIRY_SECONDS = 5; - auto secondsSinceStart = duration_cast(p_high_resolution_clock::now() - _connectionStart).count(); - - if (secondsSinceStart >= CONNECTION_NOT_USED_EXPIRY_SECONDS) { - // it's been CONNECTION_NOT_USED_EXPIRY_SECONDS and nothing has actually happened with this connection - // consider it inactive and emit our inactivity signal - -#ifdef UDT_CONNECTION_DEBUG - qCDebug(networking) << "Connection to" << _destination << "did not receive or send any data in last" - << CONNECTION_NOT_USED_EXPIRY_SECONDS << "seconds - stopping connection."; -#endif - - deactivate(); - - return; - } } } @@ -827,11 +803,13 @@ void Connection::processHandshakeACK(ControlPacketPointer controlPacket) { SequenceNumber initialSequenceNumber; controlPacket->readPrimitive(&initialSequenceNumber); - // hand off this handshake ACK to the send queue so it knows it can start sending - getSendQueue().handshakeACK(initialSequenceNumber); - - // indicate that handshake ACK was received - _hasReceivedHandshakeACK = true; + if (initialSequenceNumber == _initialSequenceNumber) { + // hand off this handshake ACK to the send queue so it knows it can start sending + getSendQueue().handshakeACK(); + + // indicate that handshake ACK was received + _hasReceivedHandshakeACK = true; + } } } diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index c134081dde..a13c29adc8 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -72,8 +72,6 @@ public: void queueReceivedMessagePacket(std::unique_ptr packet); ConnectionStats::Stats sampleStats() { return _stats.sample(); } - - bool isActive() const { return _isActive; } HifiSockAddr getDestination() const { return _destination; } @@ -83,7 +81,6 @@ public: signals: void packetSent(); - void connectionInactive(const HifiSockAddr& sockAddr); void receiverHandshakeRequestComplete(const HifiSockAddr& sockAddr); private slots: @@ -112,8 +109,6 @@ private: void resetReceiveState(); void resetRTT(); - void deactivate() { _isActive = false; emit connectionInactive(_destination); } - SendQueue& getSendQueue(); SequenceNumber nextACK() const; void updateRTT(int rtt); @@ -138,9 +133,9 @@ private: p_high_resolution_clock::time_point _lastReceiveTime; // holds the last time we received anything from sender bool _isReceivingData { false }; // flag used for expiry of receipt portion of connection - bool _isActive { true }; // flag used for inactivity of connection - SequenceNumber _initialReceiveSequenceNumber; // Randomized by peer SendQueue on creation, identifies connection during re-connect requests + SequenceNumber _initialSequenceNumber; // Randomized on Connection creation, identifies connection during re-connect requests + SequenceNumber _initialReceiveSequenceNumber; // Randomized by peer Connection on creation, identifies connection during re-connect requests LossList _lossList; // List of all missing packets SequenceNumber _lastReceivedSequenceNumber; // The largest sequence number received from the peer diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 0c029751aa..ef249b8f75 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -12,7 +12,6 @@ #include "SendQueue.h" #include -#include #include #include @@ -62,10 +61,10 @@ private: Mutex2& _mutex2; }; -std::unique_ptr SendQueue::create(Socket* socket, HifiSockAddr destination) { +std::unique_ptr SendQueue::create(Socket* socket, HifiSockAddr destination, SequenceNumber currentSequenceNumber) { Q_ASSERT_X(socket, "SendQueue::create", "Must be called with a valid Socket*"); - auto queue = std::unique_ptr(new SendQueue(socket, destination)); + auto queue = std::unique_ptr(new SendQueue(socket, destination, currentSequenceNumber)); // Setup queue private thread QThread* thread = new QThread; @@ -84,20 +83,12 @@ std::unique_ptr SendQueue::create(Socket* socket, HifiSockAddr destin return queue; } -SendQueue::SendQueue(Socket* socket, HifiSockAddr dest) : +SendQueue::SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSequenceNumber) : _socket(socket), _destination(dest) { - // setup psuedo-random number generation for all instances of SendQueue - static std::random_device rd; - static std::mt19937 generator(rd()); - static std::uniform_int_distribution<> distribution(0, SequenceNumber::MAX); - - // randomize the intial sequence number - _initialSequenceNumber = SequenceNumber(distribution(generator)); - - // set our member variables from randomized initial number - _currentSequenceNumber = _initialSequenceNumber - 1; + // set our member variables from current sequence number + _currentSequenceNumber = currentSequenceNumber; _atomicCurrentSequenceNumber = uint32_t(_currentSequenceNumber); _lastACKSequenceNumber = uint32_t(_currentSequenceNumber) - 1; @@ -114,8 +105,8 @@ void SendQueue::queuePacket(std::unique_ptr packet) { // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets _emptyCondition.notify_one(); - if (!this->thread()->isRunning() && _state == State::NotStarted) { - this->thread()->start(); + if (!thread()->isRunning() && _state == State::NotStarted) { + thread()->start(); } } @@ -125,8 +116,8 @@ void SendQueue::queuePacketList(std::unique_ptr packetList) { // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets _emptyCondition.notify_one(); - if (!this->thread()->isRunning() && _state == State::NotStarted) { - this->thread()->start(); + if (!thread()->isRunning() && _state == State::NotStarted) { + thread()->start(); } } @@ -225,8 +216,11 @@ void SendQueue::sendHandshake() { std::unique_lock handshakeLock { _handshakeMutex }; if (!_hasReceivedHandshakeACK) { // we haven't received a handshake ACK from the client, send another now + // if the handshake hasn't been completed, then the initial sequence number + // should be the current sequence number + 1 + SequenceNumber initialSequenceNumber = _currentSequenceNumber + 1; auto handshakePacket = ControlPacket::create(ControlPacket::Handshake, sizeof(SequenceNumber)); - handshakePacket->writePrimitive(_initialSequenceNumber); + handshakePacket->writePrimitive(initialSequenceNumber); _socket->writeBasePacket(*handshakePacket, _destination); // we wait for the ACK or the re-send interval to expire @@ -235,18 +229,16 @@ void SendQueue::sendHandshake() { } } -void SendQueue::handshakeACK(SequenceNumber initialSequenceNumber) { - if (initialSequenceNumber == _initialSequenceNumber) { - { - std::lock_guard locker { _handshakeMutex }; - _hasReceivedHandshakeACK = true; - } - - _lastReceiverResponse = QDateTime::currentMSecsSinceEpoch(); - - // Notify on the handshake ACK condition - _handshakeACKCondition.notify_one(); +void SendQueue::handshakeACK() { + { + std::lock_guard locker { _handshakeMutex }; + _hasReceivedHandshakeACK = true; } + + _lastReceiverResponse = QDateTime::currentMSecsSinceEpoch(); + + // Notify on the handshake ACK condition + _handshakeACKCondition.notify_one(); } SequenceNumber SendQueue::getNextSequenceNumber() { diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 484afcb88e..b6cec15ffd 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -50,7 +50,7 @@ public: Stopped }; - static std::unique_ptr create(Socket* socket, HifiSockAddr destination); + static std::unique_ptr create(Socket* socket, HifiSockAddr destination, SequenceNumber currentSequenceNumber); virtual ~SendQueue(); @@ -76,7 +76,7 @@ public slots: void nak(SequenceNumber start, SequenceNumber end); void fastRetransmit(SequenceNumber ack); void overrideNAKListFromPacket(ControlPacket& packet); - void handshakeACK(SequenceNumber initialSequenceNumber); + void handshakeACK(); signals: void packetSent(int wireSize, int payloadSize, SequenceNumber seqNum, p_high_resolution_clock::time_point timePoint); @@ -91,7 +91,7 @@ private slots: void run(); private: - SendQueue(Socket* socket, HifiSockAddr dest); + SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSequenceNumber); SendQueue(SendQueue& other) = delete; SendQueue(SendQueue&& other) = delete; @@ -115,8 +115,6 @@ private: Socket* _socket { nullptr }; // Socket to send packet on HifiSockAddr _destination; // Destination addr - - SequenceNumber _initialSequenceNumber; // Randomized on SendQueue creation, identifies connection during re-connect requests std::atomic _lastACKSequenceNumber { 0 }; // Last ACKed sequence number diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index a3374a0f47..55643985c8 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -257,9 +257,6 @@ Connection* Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) { congestionControl->setMaxBandwidth(_maxBandwidth); auto connection = std::unique_ptr(new Connection(this, sockAddr, std::move(congestionControl))); - // we queue the connection to cleanup connection in case it asks for it during its own rate control sync - QObject::connect(connection.get(), &Connection::connectionInactive, this, &Socket::cleanupConnection); - // allow higher-level classes to find out when connections have completed a handshake QObject::connect(connection.get(), &Connection::receiverHandshakeRequestComplete, this, &Socket::clientHandshakeRequestComplete); From 702d2c34a59bdb32cdea97731759aebd9516b6ad Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Wed, 13 Dec 2017 16:55:08 -0800 Subject: [PATCH 2/7] Reset ACK in ctor --- libraries/networking/src/udt/Connection.cpp | 7 ++----- libraries/networking/src/udt/SendQueue.cpp | 15 +++++++++++---- libraries/networking/src/udt/SendQueue.h | 4 ++-- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index 7a31bbeedc..5f632fa25f 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -116,14 +116,11 @@ SendQueue& Connection::getSendQueue() { if (!_hasReceivedHandshakeACK) { // First time creating a send queue for this connection - _sendQueue = SendQueue::create(_parentSocket, _destination, _initialSequenceNumber - 1); + _sendQueue = SendQueue::create(_parentSocket, _destination, _initialSequenceNumber - 1, _hasReceivedHandshakeACK); _lastReceivedACK = _sendQueue->getCurrentSequenceNumber(); } else { // Connection already has a handshake from a previous send queue - _sendQueue = SendQueue::create(_parentSocket, _destination, _lastReceivedACK); - // This connection has already gone through the handshake - // bypass it in the send queue - _sendQueue->handshakeACK(); + _sendQueue = SendQueue::create(_parentSocket, _destination, _lastReceivedACK, _hasReceivedHandshakeACK); } #ifdef UDT_CONNECTION_DEBUG diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index ef249b8f75..3dbfb35fbe 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -61,10 +61,10 @@ private: Mutex2& _mutex2; }; -std::unique_ptr SendQueue::create(Socket* socket, HifiSockAddr destination, SequenceNumber currentSequenceNumber) { +std::unique_ptr SendQueue::create(Socket* socket, HifiSockAddr destination, SequenceNumber currentSequenceNumber, bool hasReceivedHandshakeACK) { Q_ASSERT_X(socket, "SendQueue::create", "Must be called with a valid Socket*"); - auto queue = std::unique_ptr(new SendQueue(socket, destination, currentSequenceNumber)); + auto queue = std::unique_ptr(new SendQueue(socket, destination, currentSequenceNumber, hasReceivedHandshakeACK)); // Setup queue private thread QThread* thread = new QThread; @@ -83,14 +83,21 @@ std::unique_ptr SendQueue::create(Socket* socket, HifiSockAddr destin return queue; } -SendQueue::SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSequenceNumber) : +SendQueue::SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSequenceNumber, bool hasReceivedHandshakeACK) : _socket(socket), _destination(dest) { // set our member variables from current sequence number _currentSequenceNumber = currentSequenceNumber; _atomicCurrentSequenceNumber = uint32_t(_currentSequenceNumber); - _lastACKSequenceNumber = uint32_t(_currentSequenceNumber) - 1; + + if (hasReceivedHandshakeACK) { + _lastACKSequenceNumber = uint32_t(_currentSequenceNumber); + } else { + _lastACKSequenceNumber = uint32_t(_currentSequenceNumber - 1); + } + + _hasReceivedHandshakeACK = hasReceivedHandshakeACK; // default the last receiver response to the current time _lastReceiverResponse = QDateTime::currentMSecsSinceEpoch(); diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index b6cec15ffd..29fe6b35ff 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -50,7 +50,7 @@ public: Stopped }; - static std::unique_ptr create(Socket* socket, HifiSockAddr destination, SequenceNumber currentSequenceNumber); + static std::unique_ptr create(Socket* socket, HifiSockAddr destination, SequenceNumber currentSequenceNumber, bool hasReceivedHandshakeACK); virtual ~SendQueue(); @@ -91,7 +91,7 @@ private slots: void run(); private: - SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSequenceNumber); + SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSequenceNumber, bool hasReceivedHandshakeACK); SendQueue(SendQueue& other) = delete; SendQueue(SendQueue&& other) = delete; From 34c1c27455a7fdec713bac78b1c9482b6d78a6fc Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Wed, 13 Dec 2017 19:25:24 -0800 Subject: [PATCH 3/7] Keep track of message numbers --- libraries/networking/src/udt/Connection.cpp | 10 ++++++---- libraries/networking/src/udt/Connection.h | 2 ++ libraries/networking/src/udt/PacketQueue.cpp | 2 +- libraries/networking/src/udt/PacketQueue.h | 4 +++- libraries/networking/src/udt/SendQueue.cpp | 19 +++++++++---------- libraries/networking/src/udt/SendQueue.h | 8 ++++++-- 6 files changed, 27 insertions(+), 18 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index 5f632fa25f..fc590c3a66 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -90,6 +90,9 @@ void Connection::stopSendQueue() { // tell the send queue to stop and be deleted sendQueue->stop(); + + _lastMessageNumber = sendQueue->getCurrentMessageNumber(); + sendQueue->deleteLater(); // wait on the send queue thread so we know the send queue is gone @@ -116,11 +119,11 @@ SendQueue& Connection::getSendQueue() { if (!_hasReceivedHandshakeACK) { // First time creating a send queue for this connection - _sendQueue = SendQueue::create(_parentSocket, _destination, _initialSequenceNumber - 1, _hasReceivedHandshakeACK); + _sendQueue = SendQueue::create(_parentSocket, _destination, _initialSequenceNumber - 1, _lastMessageNumber, _hasReceivedHandshakeACK); _lastReceivedACK = _sendQueue->getCurrentSequenceNumber(); } else { // Connection already has a handshake from a previous send queue - _sendQueue = SendQueue::create(_parentSocket, _destination, _lastReceivedACK, _hasReceivedHandshakeACK); + _sendQueue = SendQueue::create(_parentSocket, _destination, _lastReceivedACK, _lastMessageNumber, _hasReceivedHandshakeACK); } #ifdef UDT_CONNECTION_DEBUG @@ -417,7 +420,6 @@ void Connection::sendHandshakeRequest() { } bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, int packetSize, int payloadSize) { - if (!_hasReceivedHandshake) { // Refuse to process any packets until we've received the handshake // Send handshake request to re-request a handshake @@ -509,7 +511,7 @@ bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, in } else { _stats.recordReceivedPackets(payloadSize, packetSize); } - + return !wasDuplicate; } diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index a13c29adc8..4f979750f9 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -137,6 +137,8 @@ private: SequenceNumber _initialSequenceNumber; // Randomized on Connection creation, identifies connection during re-connect requests SequenceNumber _initialReceiveSequenceNumber; // Randomized by peer Connection on creation, identifies connection during re-connect requests + MessageNumber _lastMessageNumber; + LossList _lossList; // List of all missing packets SequenceNumber _lastReceivedSequenceNumber; // The largest sequence number received from the peer SequenceNumber _lastReceivedACK; // The last ACK received diff --git a/libraries/networking/src/udt/PacketQueue.cpp b/libraries/networking/src/udt/PacketQueue.cpp index 9560f2f187..0560855ecb 100644 --- a/libraries/networking/src/udt/PacketQueue.cpp +++ b/libraries/networking/src/udt/PacketQueue.cpp @@ -15,7 +15,7 @@ using namespace udt; -PacketQueue::PacketQueue() { +PacketQueue::PacketQueue(MessageNumber messageNumber) : _currentMessageNumber(messageNumber) { _channels.emplace_back(new std::list()); } diff --git a/libraries/networking/src/udt/PacketQueue.h b/libraries/networking/src/udt/PacketQueue.h index 2b3d3a4b5b..bc4c5e3432 100644 --- a/libraries/networking/src/udt/PacketQueue.h +++ b/libraries/networking/src/udt/PacketQueue.h @@ -34,7 +34,7 @@ class PacketQueue { using Channels = std::vector; public: - PacketQueue(); + PacketQueue(MessageNumber messageNumber = 0); void queuePacket(PacketPointer packet); void queuePacketList(PacketListPointer packetList); @@ -42,6 +42,8 @@ public: PacketPointer takePacket(); Mutex& getLock() { return _packetsLock; } + + MessageNumber getCurrentMessageNumber() const { return _currentMessageNumber; } private: MessageNumber getNextMessageNumber(); diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 3dbfb35fbe..43477e4562 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -61,10 +61,12 @@ private: Mutex2& _mutex2; }; -std::unique_ptr SendQueue::create(Socket* socket, HifiSockAddr destination, SequenceNumber currentSequenceNumber, bool hasReceivedHandshakeACK) { +std::unique_ptr SendQueue::create(Socket* socket, HifiSockAddr destination, SequenceNumber currentSequenceNumber, + MessageNumber currentMessageNumber, bool hasReceivedHandshakeACK) { Q_ASSERT_X(socket, "SendQueue::create", "Must be called with a valid Socket*"); - auto queue = std::unique_ptr(new SendQueue(socket, destination, currentSequenceNumber, hasReceivedHandshakeACK)); + auto queue = std::unique_ptr(new SendQueue(socket, destination, currentSequenceNumber, + currentMessageNumber, hasReceivedHandshakeACK)); // Setup queue private thread QThread* thread = new QThread; @@ -83,19 +85,16 @@ std::unique_ptr SendQueue::create(Socket* socket, HifiSockAddr destin return queue; } -SendQueue::SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSequenceNumber, bool hasReceivedHandshakeACK) : +SendQueue::SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSequenceNumber, + MessageNumber currentMessageNumber, bool hasReceivedHandshakeACK) : _socket(socket), - _destination(dest) + _destination(dest), + _packets(currentMessageNumber) { // set our member variables from current sequence number _currentSequenceNumber = currentSequenceNumber; _atomicCurrentSequenceNumber = uint32_t(_currentSequenceNumber); - - if (hasReceivedHandshakeACK) { - _lastACKSequenceNumber = uint32_t(_currentSequenceNumber); - } else { - _lastACKSequenceNumber = uint32_t(_currentSequenceNumber - 1); - } + _lastACKSequenceNumber = uint32_t(_currentSequenceNumber); _hasReceivedHandshakeACK = hasReceivedHandshakeACK; diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 29fe6b35ff..b33180c1e2 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -50,7 +50,9 @@ public: Stopped }; - static std::unique_ptr create(Socket* socket, HifiSockAddr destination, SequenceNumber currentSequenceNumber, bool hasReceivedHandshakeACK); + static std::unique_ptr create(Socket* socket, HifiSockAddr destination, + SequenceNumber currentSequenceNumber, MessageNumber currentMessageNumber, + bool hasReceivedHandshakeACK); virtual ~SendQueue(); @@ -58,6 +60,7 @@ public: void queuePacketList(std::unique_ptr packetList); SequenceNumber getCurrentSequenceNumber() const { return SequenceNumber(_atomicCurrentSequenceNumber); } + MessageNumber getCurrentMessageNumber() const { return _packets.getCurrentMessageNumber(); } void setFlowWindowSize(int flowWindowSize) { _flowWindowSize = flowWindowSize; } @@ -91,7 +94,8 @@ private slots: void run(); private: - SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSequenceNumber, bool hasReceivedHandshakeACK); + SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSequenceNumber, + MessageNumber currentMessageNumber, bool hasReceivedHandshakeACK); SendQueue(SendQueue& other) = delete; SendQueue(SendQueue&& other) = delete; From cf28fdf188e41bed7304e5f7ac46a562a48e205b Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Thu, 14 Dec 2017 12:07:32 -0800 Subject: [PATCH 4/7] fix message erase in conn, SendQueue ctor order --- libraries/networking/src/udt/Connection.cpp | 13 +++++++++---- libraries/networking/src/udt/Connection.h | 1 - libraries/networking/src/udt/SendQueue.cpp | 4 ++-- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index fc590c3a66..2f57523f79 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -193,11 +193,16 @@ void Connection::queueReceivedMessagePacket(std::unique_ptr packet) { while (pendingMessage.hasAvailablePackets()) { auto packet = pendingMessage.removeNextPacket(); - _parentSocket->messageReceived(std::move(packet)); - } - if (pendingMessage.isComplete()) { - _pendingReceivedMessages.erase(messageNumber); + auto packetPosition = packet->getPacketPosition(); + + _parentSocket->messageReceived(std::move(packet)); + + // if this was the last or only packet, then we can remove the pending message from our hash + if (packetPosition == Packet::PacketPosition::LAST || + packetPosition == Packet::PacketPosition::ONLY) { + _pendingReceivedMessages.erase(messageNumber); + } } } diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index 4f979750f9..8a96b1ded3 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -37,7 +37,6 @@ class Socket; class PendingReceivedMessage { public: void enqueuePacket(std::unique_ptr packet); - bool isComplete() const { return _hasLastPacket && _numPackets == _packets.size(); } bool hasAvailablePackets() const; std::unique_ptr removeNextPacket(); diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 43477e4562..e7d1ab104a 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -87,9 +87,9 @@ std::unique_ptr SendQueue::create(Socket* socket, HifiSockAddr destin SendQueue::SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSequenceNumber, MessageNumber currentMessageNumber, bool hasReceivedHandshakeACK) : + _packets(currentMessageNumber), _socket(socket), - _destination(dest), - _packets(currentMessageNumber) + _destination(dest) { // set our member variables from current sequence number _currentSequenceNumber = currentSequenceNumber; From 8a2c31d3421f49bfe14a45f50afe74e7a20a374c Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Thu, 14 Dec 2017 15:30:25 -0800 Subject: [PATCH 5/7] only de-activate the SendQueue when it has nothing to send --- libraries/networking/src/udt/SendQueue.cpp | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index e7d1ab104a..4a0500f642 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -538,28 +538,6 @@ bool SendQueue::maybeResendPacket() { bool SendQueue::isInactive(bool attemptedToSendPacket) { // check for connection timeout first - // that will be the case if we have had 16 timeouts since hearing back from the client, and it has been - // at least 5 seconds - static const int NUM_TIMEOUTS_BEFORE_INACTIVE = 16; - static const int MIN_MS_BEFORE_INACTIVE = 5 * 1000; - - auto sinceLastResponse = (QDateTime::currentMSecsSinceEpoch() - _lastReceiverResponse); - - if (sinceLastResponse > 0 && - sinceLastResponse >= int64_t(NUM_TIMEOUTS_BEFORE_INACTIVE * (_estimatedTimeout / USECS_PER_MSEC)) && - sinceLastResponse > MIN_MS_BEFORE_INACTIVE) { - // If the flow window has been full for over CONSIDER_INACTIVE_AFTER, - // then signal the queue is inactive and return so it can be cleaned up - -#ifdef UDT_CONNECTION_DEBUG - qCDebug(networking) << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts" - << "and" << MIN_MS_BEFORE_INACTIVE << "milliseconds before receiving any ACK/NAK and is now inactive. Stopping."; -#endif - - deactivate(); - return true; - } - if (!attemptedToSendPacket) { // During our processing above we didn't send any packets From 75e56b5daa725310946d37101f5c0c68abbe17eb Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Thu, 14 Dec 2017 15:30:51 -0800 Subject: [PATCH 6/7] make the _lastMessageNumber in Connection default to 0 --- libraries/networking/src/udt/Connection.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index 8a96b1ded3..0017eb204a 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -136,7 +136,7 @@ private: SequenceNumber _initialSequenceNumber; // Randomized on Connection creation, identifies connection during re-connect requests SequenceNumber _initialReceiveSequenceNumber; // Randomized by peer Connection on creation, identifies connection during re-connect requests - MessageNumber _lastMessageNumber; + MessageNumber _lastMessageNumber { 0 }; LossList _lossList; // List of all missing packets SequenceNumber _lastReceivedSequenceNumber; // The largest sequence number received from the peer From a5c951a6662d9028d1256f4d0ddb4dcdbbef68db Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Thu, 14 Dec 2017 15:36:56 -0800 Subject: [PATCH 7/7] remove tracking of last receiver response from SendQueue --- libraries/networking/src/udt/SendQueue.cpp | 16 +--------------- libraries/networking/src/udt/SendQueue.h | 1 - 2 files changed, 1 insertion(+), 16 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 4a0500f642..b62624aab9 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -97,9 +97,6 @@ SendQueue::SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSe _lastACKSequenceNumber = uint32_t(_currentSequenceNumber); _hasReceivedHandshakeACK = hasReceivedHandshakeACK; - - // default the last receiver response to the current time - _lastReceiverResponse = QDateTime::currentMSecsSinceEpoch(); } SendQueue::~SendQueue() { @@ -141,9 +138,6 @@ int SendQueue::sendPacket(const Packet& packet) { } void SendQueue::ack(SequenceNumber ack) { - // this is a response from the client, re-set our timeout expiry and our last response time - _lastReceiverResponse = QDateTime::currentMSecsSinceEpoch(); - if (_lastACKSequenceNumber == (uint32_t) ack) { return; } @@ -170,10 +164,7 @@ void SendQueue::ack(SequenceNumber ack) { _emptyCondition.notify_one(); } -void SendQueue::nak(SequenceNumber start, SequenceNumber end) { - // this is a response from the client, re-set our timeout expiry - _lastReceiverResponse = QDateTime::currentMSecsSinceEpoch(); - +void SendQueue::nak(SequenceNumber start, SequenceNumber end) { { std::lock_guard nakLocker(_naksLock); _naks.insert(start, end); @@ -194,9 +185,6 @@ void SendQueue::fastRetransmit(udt::SequenceNumber ack) { } void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) { - // this is a response from the client, re-set our timeout expiry - _lastReceiverResponse = QDateTime::currentMSecsSinceEpoch(); - { std::lock_guard nakLocker(_naksLock); _naks.clear(); @@ -241,8 +229,6 @@ void SendQueue::handshakeACK() { _hasReceivedHandshakeACK = true; } - _lastReceiverResponse = QDateTime::currentMSecsSinceEpoch(); - // Notify on the handshake ACK condition _handshakeACKCondition.notify_one(); } diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index b33180c1e2..a11aacdb91 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -130,7 +130,6 @@ private: std::atomic _estimatedTimeout { 0 }; // Estimated timeout, set from CC std::atomic _syncInterval { udt::DEFAULT_SYN_INTERVAL_USECS }; // Sync interval, set from CC - std::atomic _lastReceiverResponse { 0 }; // Timestamp for the last time we got new data from the receiver (ACK/NAK) std::atomic _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC