From a34e1d85a500e5a65bbb5f93d50570fb9bfb9c2e Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Thu, 17 Sep 2015 16:10:01 +0200 Subject: [PATCH 01/22] Close current packet fix --- libraries/networking/src/udt/PacketList.cpp | 2 +- tools/udt-test/src/UDTTest.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libraries/networking/src/udt/PacketList.cpp b/libraries/networking/src/udt/PacketList.cpp index 53edfb32f1..64ae8f54db 100644 --- a/libraries/networking/src/udt/PacketList.cpp +++ b/libraries/networking/src/udt/PacketList.cpp @@ -105,7 +105,7 @@ std::unique_ptr PacketList::createPacketWithExtendedHeader() { } void PacketList::closeCurrentPacket(bool shouldSendEmpty) { - if (shouldSendEmpty && !_currentPacket) { + if (shouldSendEmpty && !_currentPacket && _packets.empty()) { _currentPacket = createPacketWithExtendedHeader(); } diff --git a/tools/udt-test/src/UDTTest.cpp b/tools/udt-test/src/UDTTest.cpp index d57ff7a874..32ab1780e0 100644 --- a/tools/udt-test/src/UDTTest.cpp +++ b/tools/udt-test/src/UDTTest.cpp @@ -282,7 +282,7 @@ void UDTTest::sendPacket() { packetList->write(randomPaddedData); } - packetList->closeCurrentPacket(false); + packetList->closeCurrentPacket(); _totalQueuedBytes += packetList->getDataSize(); _totalQueuedPackets += packetList->getNumPackets(); From 3a61e6b6a3a0ad206fc61a3d181b8da7edeb8fdf Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Thu, 17 Sep 2015 16:27:31 +0200 Subject: [PATCH 02/22] Coding standart for locks --- libraries/networking/src/udt/SendQueue.cpp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index ac6c8238e8..d814dfdcdc 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -230,13 +230,12 @@ void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) { } void SendQueue::handshakeACK() { - std::unique_lock locker { _handshakeMutex }; - - _hasReceivedHandshakeACK = true; - - // unlock the mutex and notify on the handshake ACK condition - locker.unlock(); + { + std::unique_lock locker { _handshakeMutex }; + _hasReceivedHandshakeACK = true; + } + // Notify on the handshake ACK condition _handshakeACKCondition.notify_one(); } From a4d383b38499e479ad14a7c105e5fe990da922c7 Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Thu, 17 Sep 2015 16:29:34 +0200 Subject: [PATCH 03/22] Check handshake separately --- libraries/networking/src/udt/SendQueue.cpp | 65 ++++++++++------------ libraries/networking/src/udt/SendQueue.h | 2 + 2 files changed, 30 insertions(+), 37 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index d814dfdcdc..45caec3aa9 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -229,6 +229,22 @@ void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) { _emptyCondition.notify_one(); } +void SendQueue::sendHandshake() { + std::unique_lock handshakeLock { _handshakeMutex }; + if (!_hasReceivedHandshakeACK) { + // we haven't received a handshake ACK from the client, send another now + static const auto handshakePacket = ControlPacket::create(ControlPacket::Handshake, 0); + _socket->writeBasePacket(*handshakePacket, _destination); + + // we wait for the ACK or the re-send interval to expire + static const auto HANDSHAKE_RESEND_INTERVAL = std::chrono::milliseconds(100); + _handshakeACKCondition.wait_for(handshakeLock, HANDSHAKE_RESEND_INTERVAL); + + // Once we're here we've either received the handshake ACK or it's going to be time to re-send a handshake. + // Either way let's continue processing - no packets will be sent if no handshake ACK has been received. + } +} + void SendQueue::handshakeACK() { { std::unique_lock locker { _handshakeMutex }; @@ -286,44 +302,21 @@ void SendQueue::run() { _state = State::Running; + // Wait for handshake to be complete + while (_state == State::Running && !_hasReceivedHandshakeACK) { + sendHandshake(); + QCoreApplication::processEvents(); + } + while (_state == State::Running) { // Record how long the loop takes to execute - auto loopStartTimestamp = p_high_resolution_clock::now(); - - std::unique_lock handshakeLock { _handshakeMutex }; - - if (!_hasReceivedHandshakeACK) { - // we haven't received a handshake ACK from the client - // if it has been at least 100ms since we last sent a handshake, send another now - - static const auto HANDSHAKE_RESEND_INTERVAL_MS = std::chrono::milliseconds(100); - - // hold the time of last send in a static - static auto lastSendHandshake = p_high_resolution_clock::now() - HANDSHAKE_RESEND_INTERVAL_MS; - - if (p_high_resolution_clock::now() - lastSendHandshake >= HANDSHAKE_RESEND_INTERVAL_MS) { - - // it has been long enough since last handshake, send another - static auto handshakePacket = ControlPacket::create(ControlPacket::Handshake, 0); - _socket->writeBasePacket(*handshakePacket, _destination); - - lastSendHandshake = p_high_resolution_clock::now(); - } - - // we wait for the ACK or the re-send interval to expire - _handshakeACKCondition.wait_until(handshakeLock, p_high_resolution_clock::now() + HANDSHAKE_RESEND_INTERVAL_MS); - - // Once we're here we've either received the handshake ACK or it's going to be time to re-send a handshake. - // Either way let's continue processing - no packets will be sent if no handshake ACK has been received. - } - - handshakeLock.unlock(); + const auto loopStartTimestamp = p_high_resolution_clock::now(); bool sentAPacket = maybeResendPacket(); // if we didn't find a packet to re-send AND we think we can fit a new packet on the wire // (this is according to the current flow window size) then we send out a new packet - if (_hasReceivedHandshakeACK && !sentAPacket) { + if (!sentAPacket) { if (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) { sentAPacket = maybeSendNewPacket(); } @@ -337,7 +330,7 @@ void SendQueue::run() { return; } - if (_hasReceivedHandshakeACK && !sentAPacket) { + if (!sentAPacket) { // check if it is time to break this connection // that will be the case if we have had 16 timeouts since hearing back from the client, and it has been @@ -430,13 +423,11 @@ void SendQueue::run() { } } - auto loopEndTimestamp = p_high_resolution_clock::now(); + const auto loopEndTimestamp = p_high_resolution_clock::now(); // sleep as long as we need until next packet send, if we can - auto timeToSleep = (loopStartTimestamp + std::chrono::microseconds(_packetSendPeriod)) - loopEndTimestamp; - if (timeToSleep > timeToSleep.zero()) { - std::this_thread::sleep_for(timeToSleep); - } + const auto timeToSleep = (loopStartTimestamp + std::chrono::microseconds(_packetSendPeriod)) - loopEndTimestamp; + std::this_thread::sleep_for(timeToSleep); } } diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 88b6b045b0..80e0195455 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -87,6 +87,8 @@ private: SendQueue(SendQueue& other) = delete; SendQueue(SendQueue&& other) = delete; + void sendHandshake(); + void sendPacket(const Packet& packet); void sendNewPacketAndAddToSentList(std::unique_ptr newPacket, SequenceNumber sequenceNumber); From eb8b37309d82d1cbbd5c51a9cf529d557778f981 Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Thu, 17 Sep 2015 17:22:24 +0200 Subject: [PATCH 04/22] Use lock_guard when possible --- libraries/networking/src/udt/SendQueue.cpp | 39 ++++++++++------------ 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 45caec3aa9..1e7206c1ee 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -191,12 +191,10 @@ void SendQueue::nak(SequenceNumber start, SequenceNumber end) { _timeoutExpiryCount = 0; _lastReceiverResponse = uint64_t(QDateTime::currentMSecsSinceEpoch()); - std::unique_lock nakLocker(_naksLock); - - _naks.insert(start, end); - - // unlock the locked mutex before we notify - nakLocker.unlock(); + { + std::lock_guard nakLocker(_naksLock); + _naks.insert(start, end); + } // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for losses to re-send _emptyCondition.notify_one(); @@ -207,24 +205,23 @@ void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) { _timeoutExpiryCount = 0; _lastReceiverResponse = uint64_t(QDateTime::currentMSecsSinceEpoch()); - std::unique_lock nakLocker(_naksLock); - _naks.clear(); - - SequenceNumber first, second; - while (packet.bytesLeftToRead() >= (qint64)(2 * sizeof(SequenceNumber))) { - packet.readPrimitive(&first); - packet.readPrimitive(&second); + { + std::lock_guard nakLocker(_naksLock); + _naks.clear(); - if (first == second) { - _naks.append(first); - } else { - _naks.append(first, second); + SequenceNumber first, second; + while (packet.bytesLeftToRead() >= (qint64)(2 * sizeof(SequenceNumber))) { + packet.readPrimitive(&first); + packet.readPrimitive(&second); + + if (first == second) { + _naks.append(first); + } else { + _naks.append(first, second); + } } } - // unlock the mutex before we notify - nakLocker.unlock(); - // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for losses to re-send _emptyCondition.notify_one(); } @@ -247,7 +244,7 @@ void SendQueue::sendHandshake() { void SendQueue::handshakeACK() { { - std::unique_lock locker { _handshakeMutex }; + std::lock_guard locker { _handshakeMutex }; _hasReceivedHandshakeACK = true; } From b0fe8535fdee0129503262c3187b68ddbe1dbf8e Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Mon, 21 Sep 2015 17:17:29 +0200 Subject: [PATCH 05/22] Release _sendQueue before cleanup --- libraries/networking/src/udt/Connection.cpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index e8b22f6ab8..d7b851fcf4 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -52,15 +52,14 @@ Connection::~Connection() { } void Connection::stopSendQueue() { - if (_sendQueue) { + if (auto sendQueue = _sendQueue.release()) { // grab the send queue thread so we can wait on it - QThread* sendQueueThread = _sendQueue->thread(); + QThread* sendQueueThread = sendQueue->thread(); // tell the send queue to stop and be deleted - _sendQueue->stop(); - _sendQueue->deleteLater(); - _sendQueue.release(); + sendQueue->stop(); + sendQueue->deleteLater(); // since we're stopping the send queue we should consider our handshake ACK not receieved _hasReceivedHandshakeACK = false; From 290a0e573d34e888b6fd3e942b051f45fdcffe8f Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Mon, 21 Sep 2015 18:49:15 +0200 Subject: [PATCH 06/22] House cleaning --- libraries/networking/src/udt/Connection.cpp | 9 +++++---- libraries/networking/src/udt/LossList.h | 1 + libraries/networking/src/udt/SendQueue.cpp | 15 +++++++-------- libraries/networking/src/udt/SendQueue.h | 2 +- 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index d7b851fcf4..e8b22f6ab8 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -52,14 +52,15 @@ Connection::~Connection() { } void Connection::stopSendQueue() { - if (auto sendQueue = _sendQueue.release()) { + if (_sendQueue) { // grab the send queue thread so we can wait on it - QThread* sendQueueThread = sendQueue->thread(); + QThread* sendQueueThread = _sendQueue->thread(); // tell the send queue to stop and be deleted - sendQueue->stop(); - sendQueue->deleteLater(); + _sendQueue->stop(); + _sendQueue->deleteLater(); + _sendQueue.release(); // since we're stopping the send queue we should consider our handshake ACK not receieved _hasReceivedHandshakeACK = false; diff --git a/libraries/networking/src/udt/LossList.h b/libraries/networking/src/udt/LossList.h index f0f2b92988..3921364872 100644 --- a/libraries/networking/src/udt/LossList.h +++ b/libraries/networking/src/udt/LossList.h @@ -37,6 +37,7 @@ public: void remove(SequenceNumber start, SequenceNumber end); int getLength() const { return _length; } + bool isEmpty() const { return _length == 0; } SequenceNumber getFirstSequenceNumber() const; SequenceNumber popFirstSequenceNumber(); diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 1e7206c1ee..de056b6850 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -50,10 +50,10 @@ private: }; std::unique_ptr SendQueue::create(Socket* socket, HifiSockAddr destination) { - auto queue = std::unique_ptr(new SendQueue(socket, destination)); - Q_ASSERT_X(socket, "SendQueue::create", "Must be called with a valid Socket*"); + auto queue = std::unique_ptr(new SendQueue(socket, destination)); + // Setup queue private thread QThread* thread = new QThread; thread->setObjectName("Networking: SendQueue " + destination.objectName()); // Name thread for easier debug @@ -66,8 +66,6 @@ std::unique_ptr SendQueue::create(Socket* socket, HifiSockAddr destin // Move queue to private thread and start it queue->moveToThread(thread); - thread->start(); - return std::move(queue); } @@ -75,7 +73,6 @@ SendQueue::SendQueue(Socket* socket, HifiSockAddr dest) : _socket(socket), _destination(dest) { - } void SendQueue::queuePacket(std::unique_ptr packet) { @@ -178,7 +175,7 @@ void SendQueue::ack(SequenceNumber ack) { { // remove any sequence numbers equal to or lower than this ACK in the loss list std::lock_guard nakLocker(_naksLock); - if (_naks.getLength() > 0 && _naks.getFirstSequenceNumber() <= ack) { + if (!_naks.isEmpty() && _naks.getFirstSequenceNumber() <= ack) { _naks.remove(_naks.getFirstSequenceNumber(), ack); } } @@ -302,7 +299,9 @@ void SendQueue::run() { // Wait for handshake to be complete while (_state == State::Running && !_hasReceivedHandshakeACK) { sendHandshake(); - QCoreApplication::processEvents(); + + // Keep processing events + QCoreApplication::sendPostedEvents(this); } while (_state == State::Running) { @@ -486,7 +485,7 @@ bool SendQueue::maybeResendPacket() { std::unique_lock naksLocker(_naksLock); - if (_naks.getLength() > 0) { + if (!_naks.isEmpty()) { // pull the sequence number we need to re-send SequenceNumber resendNumber = _naks.popFirstSequenceNumber(); naksLocker.unlock(); diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 80e0195455..39d18a544d 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -111,7 +111,7 @@ private: MessageNumber _currentMessageNumber { 0 }; SequenceNumber _currentSequenceNumber; // Last sequence number sent out - std::atomic _atomicCurrentSequenceNumber { 0 };// Atomic for last sequence number sent out + std::atomic _atomicCurrentSequenceNumber { 0 }; // Atomic for last sequence number sent out std::atomic _packetSendPeriod { 0 }; // Interval between two packet send event in microseconds, set from CC std::atomic _state { State::NotStarted }; From 709dab6beb6e5fe24efe4741f47d5ae85eefd1c7 Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Tue, 22 Sep 2015 15:11:17 +0200 Subject: [PATCH 07/22] Release send queue before cleanup --- libraries/networking/src/udt/Connection.cpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index e8b22f6ab8..d7b851fcf4 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -52,15 +52,14 @@ Connection::~Connection() { } void Connection::stopSendQueue() { - if (_sendQueue) { + if (auto sendQueue = _sendQueue.release()) { // grab the send queue thread so we can wait on it - QThread* sendQueueThread = _sendQueue->thread(); + QThread* sendQueueThread = sendQueue->thread(); // tell the send queue to stop and be deleted - _sendQueue->stop(); - _sendQueue->deleteLater(); - _sendQueue.release(); + sendQueue->stop(); + sendQueue->deleteLater(); // since we're stopping the send queue we should consider our handshake ACK not receieved _hasReceivedHandshakeACK = false; From 732ad410804414ef5514bf9218a21c3c6cdee4d5 Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Tue, 22 Sep 2015 15:12:36 +0200 Subject: [PATCH 08/22] Introduce PacketQueue --- libraries/networking/src/udt/PacketList.h | 1 + libraries/networking/src/udt/PacketQueue.cpp | 71 ++++ libraries/networking/src/udt/PacketQueue.h | 53 +++ libraries/networking/src/udt/SendQueue.cpp | 341 ++++++++----------- libraries/networking/src/udt/SendQueue.h | 9 +- 5 files changed, 266 insertions(+), 209 deletions(-) create mode 100644 libraries/networking/src/udt/PacketQueue.cpp create mode 100644 libraries/networking/src/udt/PacketQueue.h diff --git a/libraries/networking/src/udt/PacketList.h b/libraries/networking/src/udt/PacketList.h index 7978e77ad7..c873e53711 100644 --- a/libraries/networking/src/udt/PacketList.h +++ b/libraries/networking/src/udt/PacketList.h @@ -70,6 +70,7 @@ protected: private: friend class ::LimitedNodeList; + friend class PacketQueue; friend class SendQueue; friend class Socket; diff --git a/libraries/networking/src/udt/PacketQueue.cpp b/libraries/networking/src/udt/PacketQueue.cpp new file mode 100644 index 0000000000..f5c1b4a9a0 --- /dev/null +++ b/libraries/networking/src/udt/PacketQueue.cpp @@ -0,0 +1,71 @@ +// +// PacketQueue.cpp +// libraries/networking/src/udt +// +// Created by Clement on 9/16/15. +// Copyright 2015 High Fidelity, Inc. +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#include "PacketQueue.h" + +#include "Packet.h" +#include "PacketList.h" + +using namespace udt; + + +MessageNumber PacketQueue::getNextMessageNumber() { + static const MessageNumber MAX_MESSAGE_NUMBER = MessageNumber(1) << MESSAGE_NUMBER_BITS; + _currentMessageNumber = (_currentMessageNumber + 1) % MAX_MESSAGE_NUMBER; + return _currentMessageNumber; +} + +bool PacketQueue::isEmpty() const { + LockGuard locker(_packetsLock); + return _packets.empty(); +} + +PacketQueue::PacketPointer PacketQueue::takeFront() { + LockGuard locker(_packetsLock); + if (!_packets.empty()) { + auto packet = std::move(_packets.front()); + _packets.pop_front(); + return std::move(packet); + } + + return PacketPointer(); +} + +void PacketQueue::queuePacket(PacketPointer packet) { + LockGuard locker(_packetsLock); + _packets.push_back(std::move(packet)); +} + +void PacketQueue::queuePacketList(PacketListPointer packetList) { + Q_ASSERT(packetList->_packets.size() > 0); + + auto messageNumber = getNextMessageNumber(); + auto markPacket = [&messageNumber](const PacketPointer& packet, Packet::PacketPosition position) { + packet->setPacketPosition(position); + packet->writeMessageNumber(messageNumber); + }; + + if (packetList->_packets.size() == 1) { + markPacket(packetList->_packets.front(), Packet::PacketPosition::ONLY); + } else { + const auto second = ++packetList->_packets.begin(); + const auto last = --packetList->_packets.end(); + std::for_each(second, last, [&](const PacketPointer& packet) { + markPacket(packet, Packet::PacketPosition::MIDDLE); + }); + + markPacket(packetList->_packets.front(), Packet::PacketPosition::FIRST); + markPacket(packetList->_packets.back(), Packet::PacketPosition::LAST); + } + + LockGuard locker(_packetsLock); + _packets.splice(_packets.end(), packetList->_packets); +} \ No newline at end of file diff --git a/libraries/networking/src/udt/PacketQueue.h b/libraries/networking/src/udt/PacketQueue.h new file mode 100644 index 0000000000..b305e9b560 --- /dev/null +++ b/libraries/networking/src/udt/PacketQueue.h @@ -0,0 +1,53 @@ +// +// PacketQueue.h +// libraries/networking/src/udt +// +// Created by Clement on 9/16/15. +// Copyright 2015 High Fidelity, Inc. +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#ifndef hifi_PacketQueue_h +#define hifi_PacketQueue_h + +#include +#include +#include + +namespace udt { + +class Packet; +class PacketList; + +using MessageNumber = uint32_t; + +class PacketQueue { + using Mutex = std::recursive_mutex; + using LockGuard = std::lock_guard; + using PacketPointer = std::unique_ptr; + using PacketListPointer = std::unique_ptr; + using PacketList = std::list; + +public: + void queuePacket(PacketPointer packet); + void queuePacketList(PacketListPointer packetList); + + bool isEmpty() const; + PacketPointer takeFront(); + + MessageNumber getNextMessageNumber(); + Mutex& getLock() { return _packetsLock; } + +private: + MessageNumber _currentMessageNumber { 0 }; + + mutable Mutex _packetsLock; // Protects the packets to be sent list. + PacketList _packets; // List of packets to be sent +}; + +} + + +#endif // hifi_PacketQueue_h \ No newline at end of file diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index de056b6850..44fa51bca8 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -30,7 +30,7 @@ using namespace udt; class DoubleLock { public: - DoubleLock(std::mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { } + DoubleLock(std::recursive_mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { } DoubleLock(const DoubleLock&) = delete; DoubleLock& operator=(const DoubleLock&) = delete; @@ -45,7 +45,7 @@ public: void unlock() { _mutex1.unlock(); _mutex2.unlock(); } private: - std::mutex& _mutex1; + std::recursive_mutex& _mutex1; std::mutex& _mutex2; }; @@ -76,17 +76,10 @@ SendQueue::SendQueue(Socket* socket, HifiSockAddr dest) : } void SendQueue::queuePacket(std::unique_ptr packet) { - { - std::unique_lock locker(_packetsLock); - - _packets.push_back(std::move(packet)); - - // unlock the mutex before we notify - locker.unlock(); - - // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets - _emptyCondition.notify_one(); - } + _packets.queuePacket(std::move(packet)); + + // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets + _emptyCondition.notify_one(); if (!this->thread()->isRunning() && _state == State::NotStarted) { this->thread()->start(); @@ -94,46 +87,10 @@ void SendQueue::queuePacket(std::unique_ptr packet) { } void SendQueue::queuePacketList(std::unique_ptr packetList) { - Q_ASSERT(packetList->_packets.size() > 0); - - { - auto messageNumber = getNextMessageNumber(); - - if (packetList->_packets.size() == 1) { - auto& packet = packetList->_packets.front(); - - packet->setPacketPosition(Packet::PacketPosition::ONLY); - packet->writeMessageNumber(messageNumber); - } else { - bool haveMarkedFirstPacket = false; - auto end = packetList->_packets.end(); - auto lastElement = --packetList->_packets.end(); - for (auto it = packetList->_packets.begin(); it != end; ++it) { - auto& packet = *it; - - if (!haveMarkedFirstPacket) { - packet->setPacketPosition(Packet::PacketPosition::FIRST); - haveMarkedFirstPacket = true; - } else if (it == lastElement) { - packet->setPacketPosition(Packet::PacketPosition::LAST); - } else { - packet->setPacketPosition(Packet::PacketPosition::MIDDLE); - } - - packet->writeMessageNumber(messageNumber); - } - } - - std::unique_lock locker(_packetsLock); - - _packets.splice(_packets.end(), packetList->_packets); - - // unlock the mutex so we can notify - locker.unlock(); - - // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets - _emptyCondition.notify_one(); - } + _packets.queuePacketList(std::move(packetList)); + + // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets + _emptyCondition.notify_one(); if (!this->thread()->isRunning() && _state == State::NotStarted) { this->thread()->start(); @@ -144,10 +101,8 @@ void SendQueue::stop() { _state = State::Stopped; - // in case we're waiting to send another handshake, release the condition_variable now so we cleanup sooner + // Notify all conditions in case we're waiting somewhere _handshakeACKCondition.notify_one(); - - // in case the empty condition is waiting for packets/loss release it now so that the queue is cleaned up _emptyCondition.notify_one(); } @@ -254,12 +209,6 @@ SequenceNumber SendQueue::getNextSequenceNumber() { return _currentSequenceNumber; } -uint32_t SendQueue::getNextMessageNumber() { - static const MessageNumber MAX_MESSAGE_NUMBER = MessageNumber(1) << MESSAGE_NUMBER_BITS; - _currentMessageNumber = (_currentMessageNumber + 1) % MAX_MESSAGE_NUMBER; - return _currentMessageNumber; -} - void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr newPacket, SequenceNumber sequenceNumber) { // write the sequence number and send the packet newPacket->writeSequenceNumber(sequenceNumber); @@ -313,167 +262,67 @@ void SendQueue::run() { // if we didn't find a packet to re-send AND we think we can fit a new packet on the wire // (this is according to the current flow window size) then we send out a new packet if (!sentAPacket) { - if (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) { - sentAPacket = maybeSendNewPacket(); - } + sentAPacket = maybeSendNewPacket(); } // since we're a while loop, give the thread a chance to process events - QCoreApplication::sendPostedEvents(this, 0); + QCoreApplication::sendPostedEvents(this); // we just processed events so check now if we were just told to stop - if (_state != State::Running) { - return; + // If the send queue has been innactive, skip the sleep for + // Either _isRunning will have been set to false and we'll break + // Or something happened and we'll keep going + if (_state != State::Running || isInactive(sentAPacket)) { + continue; } - if (!sentAPacket) { - // check if it is time to break this connection - - // that will be the case if we have had 16 timeouts since hearing back from the client, and it has been - // at least 5 seconds - - static const int NUM_TIMEOUTS_BEFORE_INACTIVE = 16; - static const int MIN_SECONDS_BEFORE_INACTIVE_MS = 5 * 1000; - - auto sinceEpochNow = QDateTime::currentMSecsSinceEpoch(); - - if (_timeoutExpiryCount >= NUM_TIMEOUTS_BEFORE_INACTIVE - && (sinceEpochNow - _lastReceiverResponse) > MIN_SECONDS_BEFORE_INACTIVE_MS) { - // If the flow window has been full for over CONSIDER_INACTIVE_AFTER, - // then signal the queue is inactive and return so it can be cleaned up - -#ifdef UDT_CONNECTION_DEBUG - qCDebug(networking) << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts" - << "and 10s before receiving any ACK/NAK and is now inactive. Stopping."; -#endif - - deactivate(); - - return; - } else { - // During our processing above we didn't send any packets - - // If that is still the case we should use a condition_variable_any to sleep until we have data to handle. - // To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock - DoubleLock doubleLock(_packetsLock, _naksLock); - - if (doubleLock.try_lock()) { - // The packets queue and loss list mutexes are now both locked - check if they're still both empty - - if (_packets.empty() && _naks.getLength() == 0) { - if (uint32_t(_lastACKSequenceNumber) == uint32_t(_currentSequenceNumber)) { - // we've sent the client as much data as we have (and they've ACKed it) - // either wait for new data to send or 5 seconds before cleaning up the queue - static const auto EMPTY_QUEUES_INACTIVE_TIMEOUT = std::chrono::seconds(5); - - // use our condition_variable_any to wait - auto cvStatus = _emptyCondition.wait_for(doubleLock, EMPTY_QUEUES_INACTIVE_TIMEOUT); - - // we have the double lock again - Make sure to unlock it - doubleLock.unlock(); - - if (cvStatus == std::cv_status::timeout) { -#ifdef UDT_CONNECTION_DEBUG - qCDebug(networking) << "SendQueue to" << _destination << "has been empty for" - << EMPTY_QUEUES_INACTIVE_TIMEOUT.count() - << "seconds and receiver has ACKed all packets." - << "The queue is now inactive and will be stopped."; -#endif - - deactivate(); - - return; - } - } else { - // We think the client is still waiting for data (based on the sequence number gap) - // Let's wait either for a response from the client or until the estimated timeout - auto waitDuration = std::chrono::microseconds(_estimatedTimeout); - - // use our condition_variable_any to wait - auto cvStatus = _emptyCondition.wait_for(doubleLock, waitDuration); - - if (cvStatus == std::cv_status::timeout) { - // increase the number of timeouts - ++_timeoutExpiryCount; - - if (SequenceNumber(_lastACKSequenceNumber) < _currentSequenceNumber) { - // after a timeout if we still have sent packets that the client hasn't ACKed we - // add them to the loss list - - // Note that thanks to the DoubleLock we have the _naksLock right now - _naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber); - } - } - - // we have the double lock again - Make sure to unlock it - doubleLock.unlock(); - - // skip to the next iteration - continue; - } - } else { - // we got the try_lock but failed the other conditionals so we need to unlock - doubleLock.unlock(); - } - } - } - } - - const auto loopEndTimestamp = p_high_resolution_clock::now(); - // sleep as long as we need until next packet send, if we can + const auto loopEndTimestamp = p_high_resolution_clock::now(); const auto timeToSleep = (loopStartTimestamp + std::chrono::microseconds(_packetSendPeriod)) - loopEndTimestamp; std::this_thread::sleep_for(timeToSleep); } } bool SendQueue::maybeSendNewPacket() { - // we didn't re-send a packet, so time to send a new one - std::unique_lock locker(_packetsLock); - - if (_packets.size() > 0) { - SequenceNumber nextNumber = getNextSequenceNumber(); + if (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) { + // we didn't re-send a packet, so time to send a new one - // grab the first packet we will send - std::unique_ptr firstPacket; - firstPacket.swap(_packets.front()); - _packets.pop_front(); - std::unique_ptr secondPacket; - bool shouldSendPairTail = false; - - if (((uint32_t) nextNumber & 0xF) == 0) { - // the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets - // pull off a second packet if we can before we unlock - shouldSendPairTail = true; + if (!_packets.isEmpty()) { + SequenceNumber nextNumber = getNextSequenceNumber(); - if (_packets.size() > 0) { - secondPacket.swap(_packets.front()); - _packets.pop_front(); + // grab the first packet we will send + std::unique_ptr firstPacket = _packets.takeFront(); + + std::unique_ptr secondPacket; + bool shouldSendPairTail = false; + + if (((uint32_t) nextNumber & 0xF) == 0) { + // the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets + // pull off a second packet if we can before we unlock + shouldSendPairTail = true; + + secondPacket = _packets.takeFront(); } + + // definitely send the first packet + sendNewPacketAndAddToSentList(move(firstPacket), nextNumber); + + // do we have a second in a pair to send as well? + if (secondPacket) { + sendNewPacketAndAddToSentList(move(secondPacket), getNextSequenceNumber()); + } else if (shouldSendPairTail) { + // we didn't get a second packet to send in the probe pair + // send a control packet of type ProbePairTail so the receiver can still do + // proper bandwidth estimation + static auto pairTailPacket = ControlPacket::create(ControlPacket::ProbeTail); + _socket->writeBasePacket(*pairTailPacket, _destination); + } + + // We sent our packet(s), return here + return true; } - - // unlock the packets, we're done pulling - locker.unlock(); - - // definitely send the first packet - sendNewPacketAndAddToSentList(move(firstPacket), nextNumber); - - // do we have a second in a pair to send as well? - if (secondPacket) { - sendNewPacketAndAddToSentList(move(secondPacket), getNextSequenceNumber()); - } else if (shouldSendPairTail) { - // we didn't get a second packet to send in the probe pair - // send a control packet of type ProbePairTail so the receiver can still do - // proper bandwidth estimation - static auto pairTailPacket = ControlPacket::create(ControlPacket::ProbeTail); - _socket->writeBasePacket(*pairTailPacket, _destination); - } - - // We sent our packet(s), return here - return true; } - // No packets were sent return false; } @@ -524,6 +373,92 @@ bool SendQueue::maybeResendPacket() { return false; } +bool SendQueue::isInactive(bool sentAPacket) { + if (!sentAPacket) { + // check if it is time to break this connection + + // that will be the case if we have had 16 timeouts since hearing back from the client, and it has been + // at least 5 seconds + static const int NUM_TIMEOUTS_BEFORE_INACTIVE = 16; + static const int MIN_SECONDS_BEFORE_INACTIVE_MS = 5 * 1000; + if (_timeoutExpiryCount >= NUM_TIMEOUTS_BEFORE_INACTIVE && + (QDateTime::currentMSecsSinceEpoch() - _lastReceiverResponse) > MIN_SECONDS_BEFORE_INACTIVE_MS) { + // If the flow window has been full for over CONSIDER_INACTIVE_AFTER, + // then signal the queue is inactive and return so it can be cleaned up + +#ifdef UDT_CONNECTION_DEBUG + qCDebug(networking) << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts" + << "and 5s before receiving any ACK/NAK and is now inactive. Stopping."; +#endif + + deactivate(); + + return true; + } + + // During our processing above we didn't send any packets + + // If that is still the case we should use a condition_variable_any to sleep until we have data to handle. + // To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock + DoubleLock doubleLock(_packets.getLock(), _naksLock); + std::unique_lock locker(doubleLock, std::try_to_lock); + + if (locker.owns_lock() && _packets.isEmpty() && _naks.isEmpty()) { + // The packets queue and loss list mutexes are now both locked and they're both empty + + if (uint32_t(_lastACKSequenceNumber) == uint32_t(_currentSequenceNumber)) { + // we've sent the client as much data as we have (and they've ACKed it) + // either wait for new data to send or 5 seconds before cleaning up the queue + static const auto EMPTY_QUEUES_INACTIVE_TIMEOUT = std::chrono::seconds(5); + + // use our condition_variable_any to wait + auto cvStatus = _emptyCondition.wait_for(locker, EMPTY_QUEUES_INACTIVE_TIMEOUT); + + // we have the lock again - Make sure to unlock it + locker.unlock(); + + if (cvStatus == std::cv_status::timeout) { +#ifdef UDT_CONNECTION_DEBUG + qCDebug(networking) << "SendQueue to" << _destination << "has been empty for" + << EMPTY_QUEUES_INACTIVE_TIMEOUT.count() + << "seconds and receiver has ACKed all packets." + << "The queue is now inactive and will be stopped."; +#endif + + // Deactivate queue + deactivate(); + return true; + } + } else { + // We think the client is still waiting for data (based on the sequence number gap) + // Let's wait either for a response from the client or until the estimated timeout + auto waitDuration = std::chrono::microseconds(_estimatedTimeout); + + // use our condition_variable_any to wait + auto cvStatus = _emptyCondition.wait_for(locker, waitDuration); + + if (cvStatus == std::cv_status::timeout) { + // increase the number of timeouts + ++_timeoutExpiryCount; + + if (SequenceNumber(_lastACKSequenceNumber) < _currentSequenceNumber) { + // after a timeout if we still have sent packets that the client hasn't ACKed we + // add them to the loss list + + // Note that thanks to the DoubleLock we have the _naksLock right now + _naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber); + } + } + + // skip to the next iteration + return true; + } + } + } + + return false; +} + void SendQueue::deactivate() { // this queue is inactive - emit that signal and stop the while emit queueInactive(); diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 39d18a544d..96de12a971 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -28,6 +28,7 @@ #include "../HifiSockAddr.h" #include "Constants.h" +#include "PacketQueue.h" #include "SequenceNumber.h" #include "LossList.h" @@ -38,8 +39,6 @@ class ControlPacket; class Packet; class PacketList; class Socket; - -using MessageNumber = uint32_t; class SendQueue : public QObject { Q_OBJECT @@ -95,21 +94,19 @@ private: bool maybeSendNewPacket(); // Figures out what packet to send next bool maybeResendPacket(); // Determines whether to resend a packet and which one + bool isInactive(bool sentAPacket); void deactivate(); // makes the queue inactive and cleans it up // Increments current sequence number and return it SequenceNumber getNextSequenceNumber(); - MessageNumber getNextMessageNumber(); - mutable std::mutex _packetsLock; // Protects the packets to be sent list. - std::list> _packets; // List of packets to be sent + PacketQueue _packets; Socket* _socket { nullptr }; // Socket to send packet on HifiSockAddr _destination; // Destination addr std::atomic _lastACKSequenceNumber { 0 }; // Last ACKed sequence number - MessageNumber _currentMessageNumber { 0 }; SequenceNumber _currentSequenceNumber; // Last sequence number sent out std::atomic _atomicCurrentSequenceNumber { 0 }; // Atomic for last sequence number sent out From 91c779ddd8055b4c5d7762a9c0e82ed31af383c7 Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Tue, 22 Sep 2015 15:37:57 +0200 Subject: [PATCH 09/22] Templatized DoubleLock --- libraries/networking/src/udt/PacketQueue.cpp | 2 +- libraries/networking/src/udt/PacketQueue.h | 4 ++-- libraries/networking/src/udt/SendQueue.cpp | 12 ++++++++---- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/libraries/networking/src/udt/PacketQueue.cpp b/libraries/networking/src/udt/PacketQueue.cpp index f5c1b4a9a0..13002745bd 100644 --- a/libraries/networking/src/udt/PacketQueue.cpp +++ b/libraries/networking/src/udt/PacketQueue.cpp @@ -17,7 +17,7 @@ using namespace udt; -MessageNumber PacketQueue::getNextMessageNumber() { +PacketQueue::MessageNumber PacketQueue::getNextMessageNumber() { static const MessageNumber MAX_MESSAGE_NUMBER = MessageNumber(1) << MESSAGE_NUMBER_BITS; _currentMessageNumber = (_currentMessageNumber + 1) % MAX_MESSAGE_NUMBER; return _currentMessageNumber; diff --git a/libraries/networking/src/udt/PacketQueue.h b/libraries/networking/src/udt/PacketQueue.h index b305e9b560..4bd0feeb25 100644 --- a/libraries/networking/src/udt/PacketQueue.h +++ b/libraries/networking/src/udt/PacketQueue.h @@ -28,7 +28,7 @@ class PacketQueue { using LockGuard = std::lock_guard; using PacketPointer = std::unique_ptr; using PacketListPointer = std::unique_ptr; - using PacketList = std::list; + using PacketContainer = std::list; public: void queuePacket(PacketPointer packet); @@ -44,7 +44,7 @@ private: MessageNumber _currentMessageNumber { 0 }; mutable Mutex _packetsLock; // Protects the packets to be sent list. - PacketList _packets; // List of packets to be sent + PacketContainer _packets; // List of packets to be sent }; } diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 44fa51bca8..aa37c8f23b 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -28,9 +28,12 @@ using namespace udt; +template class DoubleLock { public: - DoubleLock(std::recursive_mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { } + using Lock = std::unique_lock>; + + DoubleLock(Mutex1& mutex1, Mutex2& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { } DoubleLock(const DoubleLock&) = delete; DoubleLock& operator=(const DoubleLock&) = delete; @@ -45,8 +48,8 @@ public: void unlock() { _mutex1.unlock(); _mutex2.unlock(); } private: - std::recursive_mutex& _mutex1; - std::mutex& _mutex2; + Mutex1& _mutex1; + Mutex2& _mutex2; }; std::unique_ptr SendQueue::create(Socket* socket, HifiSockAddr destination) { @@ -400,8 +403,9 @@ bool SendQueue::isInactive(bool sentAPacket) { // If that is still the case we should use a condition_variable_any to sleep until we have data to handle. // To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock + using DoubleLock = DoubleLock; DoubleLock doubleLock(_packets.getLock(), _naksLock); - std::unique_lock locker(doubleLock, std::try_to_lock); + DoubleLock::Lock locker(doubleLock, std::try_to_lock); if (locker.owns_lock() && _packets.isEmpty() && _naks.isEmpty()) { // The packets queue and loss list mutexes are now both locked and they're both empty From f1a9aba70448d8915c90b913e732ea31597ac17c Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Tue, 22 Sep 2015 18:06:47 +0200 Subject: [PATCH 10/22] Magic number --- libraries/networking/src/udt/Packet.cpp | 7 ++++--- libraries/networking/src/udt/Packet.h | 4 ++-- libraries/networking/src/udt/PacketQueue.cpp | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/libraries/networking/src/udt/Packet.cpp b/libraries/networking/src/udt/Packet.cpp index 56c65e0657..0582ef6487 100644 --- a/libraries/networking/src/udt/Packet.cpp +++ b/libraries/networking/src/udt/Packet.cpp @@ -124,7 +124,8 @@ static const uint32_t RELIABILITY_BIT_MASK = uint32_t(1) << (SEQUENCE_NUMBER_BIT static const uint32_t MESSAGE_BIT_MASK = uint32_t(1) << (SEQUENCE_NUMBER_BITS - 3); static const uint32_t BIT_FIELD_MASK = CONTROL_BIT_MASK | RELIABILITY_BIT_MASK | MESSAGE_BIT_MASK; -static const uint32_t PACKET_POSITION_MASK = uint32_t(0x03) << 30; +static const uint8_t PACKET_POSITION_OFFSET = 30; +static const uint32_t PACKET_POSITION_MASK = uint32_t(0x03) << PACKET_POSITION_OFFSET; static const uint32_t MESSAGE_NUMBER_MASK = ~PACKET_POSITION_MASK; void Packet::readHeader() const { @@ -139,7 +140,7 @@ void Packet::readHeader() const { if (_isPartOfMessage) { MessageNumberAndBitField* messageNumberAndBitField = seqNumBitField + 1; _messageNumber = *messageNumberAndBitField & MESSAGE_NUMBER_MASK; - _packetPosition = static_cast(*messageNumberAndBitField >> 30); + _packetPosition = static_cast(*messageNumberAndBitField >> PACKET_POSITION_OFFSET); } } @@ -164,6 +165,6 @@ void Packet::writeHeader() const { MessageNumberAndBitField* messageNumberAndBitField = seqNumBitField + 1; *messageNumberAndBitField = _messageNumber; - *messageNumberAndBitField |= _packetPosition << 30; + *messageNumberAndBitField |= _packetPosition << PACKET_POSITION_OFFSET; } } diff --git a/libraries/networking/src/udt/Packet.h b/libraries/networking/src/udt/Packet.h index 565fc24616..6bf7c569aa 100644 --- a/libraries/networking/src/udt/Packet.h +++ b/libraries/networking/src/udt/Packet.h @@ -28,7 +28,7 @@ public: // NOTE: The SequenceNumber is only actually 29 bits to leave room for a bit field using SequenceNumberAndBitField = uint32_t; - // NOTE: The MessageNumber is only actually 29 bits to leave room for a bit field + // NOTE: The MessageNumber is only actually 30 bits to leave room for a bit field using MessageNumber = uint32_t; using MessageNumberAndBitField = uint32_t; @@ -83,7 +83,7 @@ private: // Simple holders to prevent multiple reading and bitwise ops mutable bool _isReliable { false }; mutable bool _isPartOfMessage { false }; - mutable SequenceNumber _sequenceNumber; + mutable SequenceNumber _sequenceNumber { 0 }; mutable PacketPosition _packetPosition { PacketPosition::ONLY }; mutable MessageNumber _messageNumber { 0 }; }; diff --git a/libraries/networking/src/udt/PacketQueue.cpp b/libraries/networking/src/udt/PacketQueue.cpp index 13002745bd..f5c1b4a9a0 100644 --- a/libraries/networking/src/udt/PacketQueue.cpp +++ b/libraries/networking/src/udt/PacketQueue.cpp @@ -17,7 +17,7 @@ using namespace udt; -PacketQueue::MessageNumber PacketQueue::getNextMessageNumber() { +MessageNumber PacketQueue::getNextMessageNumber() { static const MessageNumber MAX_MESSAGE_NUMBER = MessageNumber(1) << MESSAGE_NUMBER_BITS; _currentMessageNumber = (_currentMessageNumber + 1) % MAX_MESSAGE_NUMBER; return _currentMessageNumber; From 8cecb95bf265d3c1454c175244d327919e37246d Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Tue, 22 Sep 2015 18:49:56 +0200 Subject: [PATCH 11/22] Use array for stats --- libraries/networking/src/udt/ConnectionStats.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libraries/networking/src/udt/ConnectionStats.h b/libraries/networking/src/udt/ConnectionStats.h index e04a836ca8..8163ed5250 100644 --- a/libraries/networking/src/udt/ConnectionStats.h +++ b/libraries/networking/src/udt/ConnectionStats.h @@ -13,7 +13,7 @@ #define hifi_ConnectionStats_h #include -#include +#include namespace udt { @@ -42,7 +42,7 @@ public: }; // construct a vector for the events of the size of our Enum - default value is zero - std::vector events = std::vector((int) Event::NumEvents, 0); + std::array events {{ 0 }}; // packet counts and sizes int sentPackets { 0 }; From c3fc6f4f79c5e1c128f9cff6e1f6da848ae6253e Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Wed, 23 Sep 2015 17:32:12 +0200 Subject: [PATCH 12/22] Add message part to Packet --- libraries/networking/src/udt/Packet.cpp | 12 ++++++++-- libraries/networking/src/udt/Packet.h | 17 ++++++++------- libraries/networking/src/udt/PacketList.cpp | 18 +++++++++++++++ libraries/networking/src/udt/PacketList.h | 23 +++++++++++--------- libraries/networking/src/udt/PacketQueue.cpp | 21 +----------------- 5 files changed, 51 insertions(+), 40 deletions(-) diff --git a/libraries/networking/src/udt/Packet.cpp b/libraries/networking/src/udt/Packet.cpp index 0582ef6487..ebcf4baafd 100644 --- a/libraries/networking/src/udt/Packet.cpp +++ b/libraries/networking/src/udt/Packet.cpp @@ -15,7 +15,7 @@ using namespace udt; int Packet::localHeaderSize(bool isPartOfMessage) { return sizeof(Packet::SequenceNumberAndBitField) + - (isPartOfMessage ? sizeof(Packet::MessageNumberAndBitField) : 0); + (isPartOfMessage ? sizeof(Packet::MessageNumberAndBitField) + sizeof(MessagePart) : 0); } int Packet::totalHeaderSize(bool isPartOfMessage) { @@ -109,9 +109,11 @@ Packet& Packet::operator=(Packet&& other) { return *this; } -void Packet::writeMessageNumber(MessageNumber messageNumber) { +void Packet::writeMessageNumber(MessageNumber messageNumber, PacketPosition position, MessagePart messagePart) { _isPartOfMessage = true; _messageNumber = messageNumber; + _packetPosition = position; + _messagePart = messagePart; writeHeader(); } @@ -141,6 +143,9 @@ void Packet::readHeader() const { MessageNumberAndBitField* messageNumberAndBitField = seqNumBitField + 1; _messageNumber = *messageNumberAndBitField & MESSAGE_NUMBER_MASK; _packetPosition = static_cast(*messageNumberAndBitField >> PACKET_POSITION_OFFSET); + + MessagePart* messagePart = messageNumberAndBitField + 1; + _messagePart = *messagePart; } } @@ -166,5 +171,8 @@ void Packet::writeHeader() const { MessageNumberAndBitField* messageNumberAndBitField = seqNumBitField + 1; *messageNumberAndBitField = _messageNumber; *messageNumberAndBitField |= _packetPosition << PACKET_POSITION_OFFSET; + + MessagePart* messagePart = messageNumberAndBitField + 1; + *messagePart = _messagePart; } } diff --git a/libraries/networking/src/udt/Packet.h b/libraries/networking/src/udt/Packet.h index 6bf7c569aa..71fb22eb98 100644 --- a/libraries/networking/src/udt/Packet.h +++ b/libraries/networking/src/udt/Packet.h @@ -31,6 +31,7 @@ public: // NOTE: The MessageNumber is only actually 30 bits to leave room for a bit field using MessageNumber = uint32_t; using MessageNumberAndBitField = uint32_t; + using MessagePart = uint32_t; // Use same size as MessageNumberAndBitField so we can use the enum with bitwise operations enum PacketPosition : MessageNumberAndBitField { @@ -55,14 +56,13 @@ public: bool isPartOfMessage() const { return _isPartOfMessage; } bool isReliable() const { return _isReliable; } - SequenceNumber getSequenceNumber() const { return _sequenceNumber; } - - MessageNumber getMessageNumber() const { return _messageNumber; } - - void setPacketPosition(PacketPosition position) { _packetPosition = position; } - PacketPosition getPacketPosition() const { return _packetPosition; } - void writeMessageNumber(MessageNumber messageNumber); + SequenceNumber getSequenceNumber() const { return _sequenceNumber; } + MessageNumber getMessageNumber() const { return _messageNumber; } + PacketPosition getPacketPosition() const { return _packetPosition; } + MessagePart getMessagePart() const { return _messagePart; } + + void writeMessageNumber(MessageNumber messageNumber, PacketPosition position, MessagePart messagePart); void writeSequenceNumber(SequenceNumber sequenceNumber) const; protected: @@ -84,8 +84,9 @@ private: mutable bool _isReliable { false }; mutable bool _isPartOfMessage { false }; mutable SequenceNumber _sequenceNumber { 0 }; - mutable PacketPosition _packetPosition { PacketPosition::ONLY }; mutable MessageNumber _messageNumber { 0 }; + mutable PacketPosition _packetPosition { PacketPosition::ONLY }; + mutable MessagePart _messagePart { 0 }; }; } // namespace udt diff --git a/libraries/networking/src/udt/PacketList.cpp b/libraries/networking/src/udt/PacketList.cpp index 64ae8f54db..8f6a65abc9 100644 --- a/libraries/networking/src/udt/PacketList.cpp +++ b/libraries/networking/src/udt/PacketList.cpp @@ -132,6 +132,24 @@ QByteArray PacketList::getMessage() { return data; } +void PacketList::preparePackets(MessageNumber messageNumber) { + Q_ASSERT(_packets.size() > 0); + + if (_packets.size() == 1) { + _packets.front()->writeMessageNumber(messageNumber, Packet::PacketPosition::ONLY, 0); + } else { + const auto second = ++_packets.begin(); + const auto last = --_packets.end(); + Packet::MessagePart messagePart = 0; + std::for_each(second, last, [&](const PacketPointer& packet) { + packet->writeMessageNumber(messageNumber, Packet::PacketPosition::MIDDLE, ++messagePart); + }); + + _packets.front()->writeMessageNumber(messageNumber, Packet::PacketPosition::FIRST, 0); + _packets.back()->writeMessageNumber(messageNumber, Packet::PacketPosition::LAST, ++messagePart); + } +} + qint64 PacketList::writeData(const char* data, qint64 maxSize) { auto sizeRemaining = maxSize; diff --git a/libraries/networking/src/udt/PacketList.h b/libraries/networking/src/udt/PacketList.h index c873e53711..5337094d1f 100644 --- a/libraries/networking/src/udt/PacketList.h +++ b/libraries/networking/src/udt/PacketList.h @@ -28,28 +28,29 @@ class Packet; class PacketList : public QIODevice { Q_OBJECT public: + using MessageNumber = uint32_t; + using PacketPointer = std::unique_ptr; + static std::unique_ptr create(PacketType packetType, QByteArray extendedHeader = QByteArray(), bool isReliable = false, bool isOrdered = false); static std::unique_ptr fromReceivedPackets(std::list>&& packets); + PacketType getType() const { return _packetType; } bool isReliable() const { return _isReliable; } bool isOrdered() const { return _isOrdered; } + int getNumPackets() const { return _packets.size() + (_currentPacket ? 1 : 0); } + size_t getDataSize() const; + size_t getMessageSize() const; + QByteArray getMessage(); + + QByteArray getExtendedHeader() const { return _extendedHeader; } + void startSegment(); void endSegment(); - PacketType getType() const { return _packetType; } - int getNumPackets() const { return _packets.size() + (_currentPacket ? 1 : 0); } - - QByteArray getExtendedHeader() const { return _extendedHeader; } - - size_t getDataSize() const; - size_t getMessageSize() const; - void closeCurrentPacket(bool shouldSendEmpty = false); - QByteArray getMessage(); - // QIODevice virtual functions virtual bool isSequential() const { return false; } virtual qint64 size() const { return getDataSize(); } @@ -60,6 +61,8 @@ public: protected: PacketList(PacketType packetType, QByteArray extendedHeader = QByteArray(), bool isReliable = false, bool isOrdered = false); PacketList(PacketList&& other); + + void preparePackets(MessageNumber messageNumber); virtual qint64 writeData(const char* data, qint64 maxSize); // Not implemented, added an assert so that it doesn't get used by accident diff --git a/libraries/networking/src/udt/PacketQueue.cpp b/libraries/networking/src/udt/PacketQueue.cpp index f5c1b4a9a0..48d5982eb3 100644 --- a/libraries/networking/src/udt/PacketQueue.cpp +++ b/libraries/networking/src/udt/PacketQueue.cpp @@ -45,26 +45,7 @@ void PacketQueue::queuePacket(PacketPointer packet) { } void PacketQueue::queuePacketList(PacketListPointer packetList) { - Q_ASSERT(packetList->_packets.size() > 0); - - auto messageNumber = getNextMessageNumber(); - auto markPacket = [&messageNumber](const PacketPointer& packet, Packet::PacketPosition position) { - packet->setPacketPosition(position); - packet->writeMessageNumber(messageNumber); - }; - - if (packetList->_packets.size() == 1) { - markPacket(packetList->_packets.front(), Packet::PacketPosition::ONLY); - } else { - const auto second = ++packetList->_packets.begin(); - const auto last = --packetList->_packets.end(); - std::for_each(second, last, [&](const PacketPointer& packet) { - markPacket(packet, Packet::PacketPosition::MIDDLE); - }); - - markPacket(packetList->_packets.front(), Packet::PacketPosition::FIRST); - markPacket(packetList->_packets.back(), Packet::PacketPosition::LAST); - } + packetList->preparePackets(getNextMessageNumber()); LockGuard locker(_packetsLock); _packets.splice(_packets.end(), packetList->_packets); From 7007d9f22380727ca67cbf7f4b2aa6cafe95d6e9 Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Wed, 23 Sep 2015 17:32:52 +0200 Subject: [PATCH 13/22] Use message part to check recieved packet list --- libraries/networking/src/udt/Connection.cpp | 39 +++++++-------------- libraries/networking/src/udt/Connection.h | 9 ++--- 2 files changed, 15 insertions(+), 33 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index d7b851fcf4..c8264a6737 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -857,37 +857,22 @@ void PendingReceivedMessage::enqueuePacket(std::unique_ptr packet) { "PendingReceivedMessage::enqueuePacket", "called with a packet that is not part of a message"); - if (_isComplete) { - qCDebug(networking) << "UNEXPECTED: Received packet for a message that is already complete"; - return; - } - - auto sequenceNumber = packet->getSequenceNumber(); - - if (packet->getPacketPosition() == Packet::PacketPosition::FIRST) { - _hasFirstSequenceNumber = true; - _firstSequenceNumber = sequenceNumber; - } else if (packet->getPacketPosition() == Packet::PacketPosition::LAST) { - _hasLastSequenceNumber = true; - _lastSequenceNumber = sequenceNumber; - } else if (packet->getPacketPosition() == Packet::PacketPosition::ONLY) { - _hasFirstSequenceNumber = true; - _hasLastSequenceNumber = true; - _firstSequenceNumber = sequenceNumber; - _lastSequenceNumber = sequenceNumber; + if (packet->getPacketPosition() == Packet::PacketPosition::LAST || + packet->getPacketPosition() == Packet::PacketPosition::ONLY) { + _hasLastPacket = true; + _numPackets = packet->getMessagePart() + 1; } // Insert into the packets list in sorted order. Because we generally expect to receive packets in order, begin // searching from the end of the list. - auto it = find_if(_packets.rbegin(), _packets.rend(), - [&](const std::unique_ptr& packet) { return sequenceNumber > packet->getSequenceNumber(); }); + auto messagePart = packet->getMessagePart(); + auto it = std::find_if(_packets.rbegin(), _packets.rend(), + [&](const std::unique_ptr& value) { return messagePart >= value->getMessagePart(); }); - _packets.insert(it.base(), std::move(packet)); - - if (_hasFirstSequenceNumber && _hasLastSequenceNumber) { - auto numPackets = udt::seqlen(_firstSequenceNumber, _lastSequenceNumber); - if (uint64_t(numPackets) == _packets.size()) { - _isComplete = true; - } + if (it != _packets.rend() && ((*it)->getMessagePart() == messagePart)) { + qCDebug(networking) << "PendingReceivedMessage::enqueuePacket: This is a duplicate packet"; + return; } + + _packets.insert(it.base(), std::move(packet)); } diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index 13756c12f9..7f9978c326 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -37,16 +37,13 @@ class Socket; class PendingReceivedMessage { public: void enqueuePacket(std::unique_ptr packet); - bool isComplete() const { return _isComplete; } + bool isComplete() const { return _hasLastPacket && _numPackets == _packets.size(); } std::list> _packets; private: - bool _isComplete { false }; - bool _hasFirstSequenceNumber { false }; - bool _hasLastSequenceNumber { false }; - SequenceNumber _firstSequenceNumber; - SequenceNumber _lastSequenceNumber; + bool _hasLastPacket { false }; + unsigned int _numPackets { 0 }; }; class Connection : public QObject { From 3db99f50e5980b2d6e811396749f5a3916352e46 Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Wed, 23 Sep 2015 18:54:13 +0200 Subject: [PATCH 14/22] Change takeFront name --- libraries/networking/src/udt/PacketQueue.cpp | 2 +- libraries/networking/src/udt/PacketQueue.h | 2 +- libraries/networking/src/udt/SendQueue.cpp | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/libraries/networking/src/udt/PacketQueue.cpp b/libraries/networking/src/udt/PacketQueue.cpp index 48d5982eb3..088908200d 100644 --- a/libraries/networking/src/udt/PacketQueue.cpp +++ b/libraries/networking/src/udt/PacketQueue.cpp @@ -28,7 +28,7 @@ bool PacketQueue::isEmpty() const { return _packets.empty(); } -PacketQueue::PacketPointer PacketQueue::takeFront() { +PacketQueue::PacketPointer PacketQueue::takePacket() { LockGuard locker(_packetsLock); if (!_packets.empty()) { auto packet = std::move(_packets.front()); diff --git a/libraries/networking/src/udt/PacketQueue.h b/libraries/networking/src/udt/PacketQueue.h index 4bd0feeb25..a687ccf0bc 100644 --- a/libraries/networking/src/udt/PacketQueue.h +++ b/libraries/networking/src/udt/PacketQueue.h @@ -35,7 +35,7 @@ public: void queuePacketList(PacketListPointer packetList); bool isEmpty() const; - PacketPointer takeFront(); + PacketPointer takePacket(); MessageNumber getNextMessageNumber(); Mutex& getLock() { return _packetsLock; } diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index aa37c8f23b..2354bfa377 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -295,7 +295,7 @@ bool SendQueue::maybeSendNewPacket() { SequenceNumber nextNumber = getNextSequenceNumber(); // grab the first packet we will send - std::unique_ptr firstPacket = _packets.takeFront(); + std::unique_ptr firstPacket = _packets.takePacket(); std::unique_ptr secondPacket; bool shouldSendPairTail = false; @@ -305,7 +305,7 @@ bool SendQueue::maybeSendNewPacket() { // pull off a second packet if we can before we unlock shouldSendPairTail = true; - secondPacket = _packets.takeFront(); + secondPacket = _packets.takePacket(); } // definitely send the first packet From 48ff912dd2cba806444d4b5135c45d8c22585bbf Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Thu, 24 Sep 2015 13:08:04 +0200 Subject: [PATCH 15/22] Use one channel per packet list --- libraries/networking/src/udt/PacketQueue.cpp | 42 +++++++++++++++----- libraries/networking/src/udt/PacketQueue.h | 16 +++++--- libraries/networking/src/udt/SendQueue.cpp | 1 + 3 files changed, 43 insertions(+), 16 deletions(-) diff --git a/libraries/networking/src/udt/PacketQueue.cpp b/libraries/networking/src/udt/PacketQueue.cpp index 088908200d..d0110852cd 100644 --- a/libraries/networking/src/udt/PacketQueue.cpp +++ b/libraries/networking/src/udt/PacketQueue.cpp @@ -11,12 +11,10 @@ #include "PacketQueue.h" -#include "Packet.h" #include "PacketList.h" using namespace udt; - MessageNumber PacketQueue::getNextMessageNumber() { static const MessageNumber MAX_MESSAGE_NUMBER = MessageNumber(1) << MESSAGE_NUMBER_BITS; _currentMessageNumber = (_currentMessageNumber + 1) % MAX_MESSAGE_NUMBER; @@ -25,28 +23,50 @@ MessageNumber PacketQueue::getNextMessageNumber() { bool PacketQueue::isEmpty() const { LockGuard locker(_packetsLock); - return _packets.empty(); + // Only the main channel and it is empty + return (_channels.size() == 1) && _channels.front().empty(); } PacketQueue::PacketPointer PacketQueue::takePacket() { LockGuard locker(_packetsLock); - if (!_packets.empty()) { - auto packet = std::move(_packets.front()); - _packets.pop_front(); - return std::move(packet); + if (isEmpty()) { + return PacketPointer(); } - return PacketPointer(); + // Find next non empty channel + if (_channels[nextIndex()].empty()) { + nextIndex(); + } + auto& channel = _channels[_currentIndex]; + Q_ASSERT(!channel.empty()); + + // Take front packet + auto packet = std::move(channel.front()); + channel.pop_front(); + + // Remove now empty channel (Don't remove the main channel) + if (channel.empty() && _currentIndex != 0) { + channel.swap(_channels.back()); + _channels.pop_back(); + --_currentIndex; + } + + return std::move(packet); +} + +unsigned int PacketQueue::nextIndex() { + _currentIndex = ++_currentIndex % _channels.size(); + return _currentIndex; } void PacketQueue::queuePacket(PacketPointer packet) { LockGuard locker(_packetsLock); - _packets.push_back(std::move(packet)); + _channels.front().push_back(std::move(packet)); } void PacketQueue::queuePacketList(PacketListPointer packetList) { packetList->preparePackets(getNextMessageNumber()); LockGuard locker(_packetsLock); - _packets.splice(_packets.end(), packetList->_packets); -} \ No newline at end of file + _channels.push_back(std::move(packetList->_packets)); +} diff --git a/libraries/networking/src/udt/PacketQueue.h b/libraries/networking/src/udt/PacketQueue.h index a687ccf0bc..f3d925af30 100644 --- a/libraries/networking/src/udt/PacketQueue.h +++ b/libraries/networking/src/udt/PacketQueue.h @@ -13,12 +13,14 @@ #define hifi_PacketQueue_h #include +#include #include #include +#include "Packet.h" + namespace udt { -class Packet; class PacketList; using MessageNumber = uint32_t; @@ -28,7 +30,8 @@ class PacketQueue { using LockGuard = std::lock_guard; using PacketPointer = std::unique_ptr; using PacketListPointer = std::unique_ptr; - using PacketContainer = std::list; + using Channel = std::list; + using Channels = std::vector; public: void queuePacket(PacketPointer packet); @@ -37,14 +40,17 @@ public: bool isEmpty() const; PacketPointer takePacket(); - MessageNumber getNextMessageNumber(); Mutex& getLock() { return _packetsLock; } private: + MessageNumber getNextMessageNumber(); + unsigned int nextIndex(); + MessageNumber _currentMessageNumber { 0 }; - mutable Mutex _packetsLock; // Protects the packets to be sent list. - PacketContainer _packets; // List of packets to be sent + mutable Mutex _packetsLock; // Protects the packets to be sent. + Channels _channels { 1 }; // One channel per packet list + unsigned int _currentIndex { 0 }; }; } diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 2354bfa377..6ba947553a 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -296,6 +296,7 @@ bool SendQueue::maybeSendNewPacket() { // grab the first packet we will send std::unique_ptr firstPacket = _packets.takePacket(); + Q_ASSERT(firstPacket); std::unique_ptr secondPacket; bool shouldSendPairTail = false; From 1f07ba46d02965f89da2be1c9f9a88baf9f53887 Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Thu, 24 Sep 2015 15:00:55 +0200 Subject: [PATCH 16/22] Fix in-class initialization for windows build --- libraries/networking/src/udt/ConnectionStats.h | 14 ++++++++++---- libraries/networking/src/udt/PacketQueue.h | 2 +- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/libraries/networking/src/udt/ConnectionStats.h b/libraries/networking/src/udt/ConnectionStats.h index 8163ed5250..84cd6b2486 100644 --- a/libraries/networking/src/udt/ConnectionStats.h +++ b/libraries/networking/src/udt/ConnectionStats.h @@ -20,9 +20,6 @@ namespace udt { class ConnectionStats { public: struct Stats { - std::chrono::microseconds startTime; - std::chrono::microseconds endTime; - enum Event { SentACK, ReceivedACK, @@ -41,8 +38,14 @@ public: NumEvents }; + using microseconds = std::chrono::microseconds; + using Events = std::array; + + microseconds startTime; + microseconds endTime; + // construct a vector for the events of the size of our Enum - default value is zero - std::array events {{ 0 }}; + Events events; // packet counts and sizes int sentPackets { 0 }; @@ -66,6 +69,9 @@ public: int rtt { 0 }; int congestionWindowSize { 0 }; int packetSendPeriod { 0 }; + + // TODO: Remove once Win build supports brace initialization: `Events events {{ 0 }};` + Stats() { events.fill(0); } }; ConnectionStats(); diff --git a/libraries/networking/src/udt/PacketQueue.h b/libraries/networking/src/udt/PacketQueue.h index f3d925af30..69784fd8db 100644 --- a/libraries/networking/src/udt/PacketQueue.h +++ b/libraries/networking/src/udt/PacketQueue.h @@ -49,7 +49,7 @@ private: MessageNumber _currentMessageNumber { 0 }; mutable Mutex _packetsLock; // Protects the packets to be sent. - Channels _channels { 1 }; // One channel per packet list + Channels _channels = Channels(1); // One channel per packet list + Main channel unsigned int _currentIndex { 0 }; }; From 1e56d0c99bdf927c92a3c71266f88415d5328d9c Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Thu, 24 Sep 2015 18:15:10 +0200 Subject: [PATCH 17/22] Add parenthesis to quiet gcc warning --- libraries/networking/src/udt/PacketQueue.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/networking/src/udt/PacketQueue.cpp b/libraries/networking/src/udt/PacketQueue.cpp index d0110852cd..1c104e1427 100644 --- a/libraries/networking/src/udt/PacketQueue.cpp +++ b/libraries/networking/src/udt/PacketQueue.cpp @@ -55,7 +55,7 @@ PacketQueue::PacketPointer PacketQueue::takePacket() { } unsigned int PacketQueue::nextIndex() { - _currentIndex = ++_currentIndex % _channels.size(); + _currentIndex = (++_currentIndex) % _channels.size(); return _currentIndex; } From f5f349e13aa1b914dccc90399204c1e502174d44 Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Thu, 24 Sep 2015 18:24:34 +0200 Subject: [PATCH 18/22] Bumped packet version numbers --- libraries/networking/src/udt/PacketHeaders.cpp | 7 ++----- libraries/networking/src/udt/PacketHeaders.h | 1 + 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/libraries/networking/src/udt/PacketHeaders.cpp b/libraries/networking/src/udt/PacketHeaders.cpp index ca75a86158..1aa9e90f99 100644 --- a/libraries/networking/src/udt/PacketHeaders.cpp +++ b/libraries/networking/src/udt/PacketHeaders.cpp @@ -38,12 +38,9 @@ PacketVersion versionForPacketType(PacketType packetType) { case PacketType::EntityAdd: case PacketType::EntityEdit: case PacketType::EntityData: - return VERSION_ENTITIES_PARTICLE_ELLIPSOID_EMITTER; - case PacketType::AvatarData: - case PacketType::BulkAvatarData: - return 15; + return VERSION_ENTITIES_PROTOCOL_CHANNELS; default: - return 14; + return 16; } } diff --git a/libraries/networking/src/udt/PacketHeaders.h b/libraries/networking/src/udt/PacketHeaders.h index 9f3c5950a2..febb997047 100644 --- a/libraries/networking/src/udt/PacketHeaders.h +++ b/libraries/networking/src/udt/PacketHeaders.h @@ -141,5 +141,6 @@ const PacketVersion VERSION_ENTITIES_PARTICLE_RADIUS_PROPERTIES = 41; const PacketVersion VERSION_ENTITIES_PARTICLE_COLOR_PROPERTIES = 42; const PacketVersion VERSION_ENTITIES_PROTOCOL_HEADER_SWAP = 43; const PacketVersion VERSION_ENTITIES_PARTICLE_ELLIPSOID_EMITTER = 44; +const PacketVersion VERSION_ENTITIES_PROTOCOL_CHANNELS = 45; #endif // hifi_PacketHeaders_h From 81b88cb096c30ae6c82fe26a4e55badee1f81475 Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Tue, 6 Oct 2015 16:12:04 -0700 Subject: [PATCH 19/22] Renamed MessagePart > MessagePartNumber --- libraries/networking/src/udt/Connection.cpp | 8 ++++---- libraries/networking/src/udt/Packet.cpp | 14 +++++++------- libraries/networking/src/udt/Packet.h | 8 ++++---- libraries/networking/src/udt/PacketList.cpp | 6 +++--- 4 files changed, 18 insertions(+), 18 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index c8264a6737..96d73676f0 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -860,16 +860,16 @@ void PendingReceivedMessage::enqueuePacket(std::unique_ptr packet) { if (packet->getPacketPosition() == Packet::PacketPosition::LAST || packet->getPacketPosition() == Packet::PacketPosition::ONLY) { _hasLastPacket = true; - _numPackets = packet->getMessagePart() + 1; + _numPackets = packet->getMessagePartNumber() + 1; } // Insert into the packets list in sorted order. Because we generally expect to receive packets in order, begin // searching from the end of the list. - auto messagePart = packet->getMessagePart(); + auto messagePartNumber = packet->getMessagePartNumber(); auto it = std::find_if(_packets.rbegin(), _packets.rend(), - [&](const std::unique_ptr& value) { return messagePart >= value->getMessagePart(); }); + [&](const std::unique_ptr& value) { return messagePartNumber >= value->getMessagePartNumber(); }); - if (it != _packets.rend() && ((*it)->getMessagePart() == messagePart)) { + if (it != _packets.rend() && ((*it)->getMessagePartNumber() == messagePartNumber)) { qCDebug(networking) << "PendingReceivedMessage::enqueuePacket: This is a duplicate packet"; return; } diff --git a/libraries/networking/src/udt/Packet.cpp b/libraries/networking/src/udt/Packet.cpp index ebcf4baafd..d06ff9707e 100644 --- a/libraries/networking/src/udt/Packet.cpp +++ b/libraries/networking/src/udt/Packet.cpp @@ -15,7 +15,7 @@ using namespace udt; int Packet::localHeaderSize(bool isPartOfMessage) { return sizeof(Packet::SequenceNumberAndBitField) + - (isPartOfMessage ? sizeof(Packet::MessageNumberAndBitField) + sizeof(MessagePart) : 0); + (isPartOfMessage ? sizeof(Packet::MessageNumberAndBitField) + sizeof(MessagePartNumber) : 0); } int Packet::totalHeaderSize(bool isPartOfMessage) { @@ -109,11 +109,11 @@ Packet& Packet::operator=(Packet&& other) { return *this; } -void Packet::writeMessageNumber(MessageNumber messageNumber, PacketPosition position, MessagePart messagePart) { +void Packet::writeMessageNumber(MessageNumber messageNumber, PacketPosition position, MessagePartNumber messagePartNumber) { _isPartOfMessage = true; _messageNumber = messageNumber; _packetPosition = position; - _messagePart = messagePart; + _messagePartNumber = messagePartNumber; writeHeader(); } @@ -144,8 +144,8 @@ void Packet::readHeader() const { _messageNumber = *messageNumberAndBitField & MESSAGE_NUMBER_MASK; _packetPosition = static_cast(*messageNumberAndBitField >> PACKET_POSITION_OFFSET); - MessagePart* messagePart = messageNumberAndBitField + 1; - _messagePart = *messagePart; + MessagePartNumber* messagePartNumber = messageNumberAndBitField + 1; + _messagePartNumber = *messagePartNumber; } } @@ -172,7 +172,7 @@ void Packet::writeHeader() const { *messageNumberAndBitField = _messageNumber; *messageNumberAndBitField |= _packetPosition << PACKET_POSITION_OFFSET; - MessagePart* messagePart = messageNumberAndBitField + 1; - *messagePart = _messagePart; + MessagePartNumber* messagePartNumber = messageNumberAndBitField + 1; + *messagePartNumber = _messagePartNumber; } } diff --git a/libraries/networking/src/udt/Packet.h b/libraries/networking/src/udt/Packet.h index 71fb22eb98..24b9144672 100644 --- a/libraries/networking/src/udt/Packet.h +++ b/libraries/networking/src/udt/Packet.h @@ -31,7 +31,7 @@ public: // NOTE: The MessageNumber is only actually 30 bits to leave room for a bit field using MessageNumber = uint32_t; using MessageNumberAndBitField = uint32_t; - using MessagePart = uint32_t; + using MessagePartNumber = uint32_t; // Use same size as MessageNumberAndBitField so we can use the enum with bitwise operations enum PacketPosition : MessageNumberAndBitField { @@ -60,9 +60,9 @@ public: SequenceNumber getSequenceNumber() const { return _sequenceNumber; } MessageNumber getMessageNumber() const { return _messageNumber; } PacketPosition getPacketPosition() const { return _packetPosition; } - MessagePart getMessagePart() const { return _messagePart; } + MessagePartNumber getMessagePartNumber() const { return _messagePartNumber; } - void writeMessageNumber(MessageNumber messageNumber, PacketPosition position, MessagePart messagePart); + void writeMessageNumber(MessageNumber messageNumber, PacketPosition position, MessagePartNumber messagePartNumber); void writeSequenceNumber(SequenceNumber sequenceNumber) const; protected: @@ -86,7 +86,7 @@ private: mutable SequenceNumber _sequenceNumber { 0 }; mutable MessageNumber _messageNumber { 0 }; mutable PacketPosition _packetPosition { PacketPosition::ONLY }; - mutable MessagePart _messagePart { 0 }; + mutable MessagePartNumber _messagePartNumber { 0 }; }; } // namespace udt diff --git a/libraries/networking/src/udt/PacketList.cpp b/libraries/networking/src/udt/PacketList.cpp index 8f6a65abc9..ffe2b3eeba 100644 --- a/libraries/networking/src/udt/PacketList.cpp +++ b/libraries/networking/src/udt/PacketList.cpp @@ -140,13 +140,13 @@ void PacketList::preparePackets(MessageNumber messageNumber) { } else { const auto second = ++_packets.begin(); const auto last = --_packets.end(); - Packet::MessagePart messagePart = 0; + Packet::MessagePartNumber messagePartNumber = 0; std::for_each(second, last, [&](const PacketPointer& packet) { - packet->writeMessageNumber(messageNumber, Packet::PacketPosition::MIDDLE, ++messagePart); + packet->writeMessageNumber(messageNumber, Packet::PacketPosition::MIDDLE, ++messagePartNumber); }); _packets.front()->writeMessageNumber(messageNumber, Packet::PacketPosition::FIRST, 0); - _packets.back()->writeMessageNumber(messageNumber, Packet::PacketPosition::LAST, ++messagePart); + _packets.back()->writeMessageNumber(messageNumber, Packet::PacketPosition::LAST, ++messagePartNumber); } } From 5ee966261d44d974054adb37b59e5e28cdfe8171 Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Tue, 6 Oct 2015 16:12:50 -0700 Subject: [PATCH 20/22] Start thread right away --- libraries/networking/src/udt/SendQueue.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 6ba947553a..959dd02a52 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -69,7 +69,9 @@ std::unique_ptr SendQueue::create(Socket* socket, HifiSockAddr destin // Move queue to private thread and start it queue->moveToThread(thread); - return std::move(queue); + thread->start(); + + return queue; } SendQueue::SendQueue(Socket* socket, HifiSockAddr dest) : @@ -191,9 +193,6 @@ void SendQueue::sendHandshake() { // we wait for the ACK or the re-send interval to expire static const auto HANDSHAKE_RESEND_INTERVAL = std::chrono::milliseconds(100); _handshakeACKCondition.wait_for(handshakeLock, HANDSHAKE_RESEND_INTERVAL); - - // Once we're here we've either received the handshake ACK or it's going to be time to re-send a handshake. - // Either way let's continue processing - no packets will be sent if no handshake ACK has been received. } } @@ -254,6 +253,9 @@ void SendQueue::run() { // Keep processing events QCoreApplication::sendPostedEvents(this); + + // Once we're here we've either received the handshake ACK or it's going to be time to re-send a handshake. + // Either way let's continue processing - no packets will be sent if no handshake ACK has been received. } while (_state == State::Running) { From ca575bee89f52701cac271649595d4c0a6dcb9a5 Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Tue, 6 Oct 2015 16:15:51 -0700 Subject: [PATCH 21/22] Remove unecessary move --- libraries/networking/src/LimitedNodeList.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index c13a82f821..9d1e7abcbb 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -514,7 +514,7 @@ std::unique_ptr LimitedNodeList::constructPingPacket(PingType_t pingTy pingPacket->writePrimitive(pingType); pingPacket->writePrimitive(usecTimestampNow()); - return std::move(pingPacket); + return pingPacket; } std::unique_ptr LimitedNodeList::constructPingReplyPacket(NLPacket& pingPacket) { @@ -529,7 +529,7 @@ std::unique_ptr LimitedNodeList::constructPingReplyPacket(NLPacket& pi replyPacket->writePrimitive(timeFromOriginalPing); replyPacket->writePrimitive(usecTimestampNow()); - return std::move(replyPacket); + return replyPacket; } std::unique_ptr LimitedNodeList::constructICEPingPacket(PingType_t pingType, const QUuid& iceID) { @@ -539,7 +539,7 @@ std::unique_ptr LimitedNodeList::constructICEPingPacket(PingType_t pin icePingPacket->write(iceID.toRfc4122()); icePingPacket->writePrimitive(pingType); - return std::move(icePingPacket); + return icePingPacket; } std::unique_ptr LimitedNodeList::constructICEPingReplyPacket(NLPacket& pingPacket, const QUuid& iceID) { From 3a8deff53e51c86b53df0432ac03e20b797e4412 Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Tue, 6 Oct 2015 16:38:16 -0700 Subject: [PATCH 22/22] Return if innactive --- libraries/networking/src/udt/SendQueue.cpp | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 959dd02a52..d7e3bb2b65 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -278,7 +278,7 @@ void SendQueue::run() { // Either _isRunning will have been set to false and we'll break // Or something happened and we'll keep going if (_state != State::Running || isInactive(sentAPacket)) { - continue; + return; } // sleep as long as we need until next packet send, if we can @@ -398,7 +398,6 @@ bool SendQueue::isInactive(bool sentAPacket) { #endif deactivate(); - return true; } @@ -456,9 +455,6 @@ bool SendQueue::isInactive(bool sentAPacket) { _naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber); } } - - // skip to the next iteration - return true; } } }