From 507592b08868fe75f1ca2c7cfa4a7735e9c0c1f2 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 27 Jul 2015 11:40:05 -0700 Subject: [PATCH 01/13] add paths to CongestionControl files --- libraries/networking/src/udt/CongestionControl.cpp | 2 +- libraries/networking/src/udt/CongestionControl.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libraries/networking/src/udt/CongestionControl.cpp b/libraries/networking/src/udt/CongestionControl.cpp index ebbf86ecd8..4f32be0d06 100644 --- a/libraries/networking/src/udt/CongestionControl.cpp +++ b/libraries/networking/src/udt/CongestionControl.cpp @@ -1,6 +1,6 @@ // // CongestionControl.cpp -// +// libraries/networking/src/udt // // Created by Clement on 7/23/15. // Copyright 2015 High Fidelity, Inc. diff --git a/libraries/networking/src/udt/CongestionControl.h b/libraries/networking/src/udt/CongestionControl.h index fdf8803ec0..fe7f4a15fc 100644 --- a/libraries/networking/src/udt/CongestionControl.h +++ b/libraries/networking/src/udt/CongestionControl.h @@ -1,6 +1,6 @@ // // CongestionControl.h -// +// libraries/networking/src/udt // // Created by Clement on 7/23/15. // Copyright 2015 High Fidelity, Inc. From b0e7c208ac289d9a87d5bcbb704cdb51ea72ea81 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 27 Jul 2015 12:21:27 -0700 Subject: [PATCH 02/13] initial version of the sleeping ControlSender --- .../networking/src/udt/ControlSender.cpp | 40 +++++++++++++++++++ libraries/networking/src/udt/ControlSender.h | 39 ++++++++++++++++++ libraries/networking/src/udt/Socket.cpp | 35 ++++++++++++++++ libraries/networking/src/udt/Socket.h | 6 ++- 4 files changed, 119 insertions(+), 1 deletion(-) create mode 100644 libraries/networking/src/udt/ControlSender.cpp create mode 100644 libraries/networking/src/udt/ControlSender.h diff --git a/libraries/networking/src/udt/ControlSender.cpp b/libraries/networking/src/udt/ControlSender.cpp new file mode 100644 index 0000000000..dd9948502a --- /dev/null +++ b/libraries/networking/src/udt/ControlSender.cpp @@ -0,0 +1,40 @@ +// +// ControlSender.cpp +// libraries/networking/src/udt +// +// Created by Stephen Birarda on 2015-07-27. +// Copyright 2015 High Fidelity, Inc. +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#include +#include + +#include +#include + +#include "ControlSender.h" + +using namespace udt; + +void ControlSender::loop() { + while (!_isStopped) { + // grab the time now + auto start = std::chrono::high_resolution_clock::now(); + + // for each of the connections managed by the udt::Socket, we need to ask for the ACK value to send + + // since we're infinitely looping, give the thread a chance to process events + QCoreApplication::processEvents(); + + auto end = std::chrono::high_resolution_clock::now(); + + std::chrono::duration elapsed = end - start; + int timeToSleep = _synInterval - (int) elapsed.count(); + + // based on how much time it took us to process, let's figure out how much time we have need to sleep + std::this_thread::sleep_for(std::chrono::microseconds(timeToSleep)); + } +} diff --git a/libraries/networking/src/udt/ControlSender.h b/libraries/networking/src/udt/ControlSender.h new file mode 100644 index 0000000000..040c6a70db --- /dev/null +++ b/libraries/networking/src/udt/ControlSender.h @@ -0,0 +1,39 @@ +// +// ControlSender.h +// libraries/networking/src/udt +// +// Created by Stephen Birarda on 2015-07-27. +// Copyright 2015 High Fidelity, Inc. +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#pragma once + +#ifndef hifi_ControlSender_h +#define hifi_ControlSender_h + +#include + +namespace udt { + +// Handles the sending of periodic control packets for all active UDT reliable connections +// Currently the interval for all connections is the same, so one thread is sufficient to manage all +class ControlSender : public QObject { + Q_OBJECT +public: + ControlSender(QObject* parent = 0) : QObject(parent) {}; + +public slots: + void loop(); // infinitely loops and sleeps to manage rate of control packet sending + void stop() { _isStopped = true; } // stops the loop + +private: + int _synInterval = 10 * 1000; + bool _isStopped { false }; +}; + +} + +#endif // hifi_ControlSender_h diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index 8bf7f973ae..57c4b75b12 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -11,7 +11,10 @@ #include "Socket.h" +#include + #include "../NetworkLogging.h" +#include "ControlSender.h" #include "Packet.h" using namespace udt; @@ -20,6 +23,38 @@ Socket::Socket(QObject* parent) : QObject(parent) { connect(&_udpSocket, &QUdpSocket::readyRead, this, &Socket::readPendingDatagrams); + + // make a QThread for the ControlSender to live on + QThread* controlThread = new QThread(this); + + // setup the ControlSender and parent it + _controlSender = new ControlSender; + + // move the ControlSender to its thread + _controlSender->moveToThread(controlThread); + + // start the ControlSender once the thread is started + connect(controlThread, &QThread::started, _controlSender, &ControlSender::loop); + + // make sure the control thread is named so we can identify it + controlThread->setObjectName("UDT Control Thread"); + + // start the control thread + controlThread->start(); +} + +Socket::~Socket() { + if (_controlSender) { + QThread* controlThread = _controlSender->thread(); + + // tell the control sender to stop and be deleted so we can wait on its thread + QMetaObject::invokeMethod(_controlSender, "stop"); + _controlSender->deleteLater(); + + // make sure the control thread goes down + controlThread->quit(); + controlThread->wait(); + } } void Socket::rebind() { diff --git a/libraries/networking/src/udt/Socket.h b/libraries/networking/src/udt/Socket.h index 184d8716fe..be6ef982af 100644 --- a/libraries/networking/src/udt/Socket.h +++ b/libraries/networking/src/udt/Socket.h @@ -24,8 +24,9 @@ #include "SeqNum.h" namespace udt { - + class BasePacket; +class ControlSender; class Packet; using PacketFilterOperator = std::function; @@ -37,6 +38,7 @@ class Socket : public QObject { Q_OBJECT public: Socket(QObject* object = 0); + ~Socket(); quint16 localPort() const { return _udpSocket.localPort(); } @@ -68,6 +70,8 @@ private: std::unordered_map _unfilteredHandlers; std::unordered_map _packetSequenceNumbers; + + ControlSender* _controlSender { nullptr }; }; } // namespace udt From 594b4f4814819de64d513799f78f06ee94821822 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 27 Jul 2015 12:25:46 -0700 Subject: [PATCH 03/13] fix the shutdown of the ControlSender --- libraries/networking/src/udt/Socket.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index 57c4b75b12..9078ac0f6d 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -48,7 +48,7 @@ Socket::~Socket() { QThread* controlThread = _controlSender->thread(); // tell the control sender to stop and be deleted so we can wait on its thread - QMetaObject::invokeMethod(_controlSender, "stop"); + _controlSender->stop(); _controlSender->deleteLater(); // make sure the control thread goes down From a31053d4508c0b0491db89806af6d39fe7fe2c11 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 27 Jul 2015 13:02:11 -0700 Subject: [PATCH 04/13] small spacing change --- libraries/networking/src/udt/ControlSender.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/networking/src/udt/ControlSender.cpp b/libraries/networking/src/udt/ControlSender.cpp index dd9948502a..1b955d0254 100644 --- a/libraries/networking/src/udt/ControlSender.cpp +++ b/libraries/networking/src/udt/ControlSender.cpp @@ -25,7 +25,7 @@ void ControlSender::loop() { auto start = std::chrono::high_resolution_clock::now(); // for each of the connections managed by the udt::Socket, we need to ask for the ACK value to send - + // since we're infinitely looping, give the thread a chance to process events QCoreApplication::processEvents(); From 50a7ef7c2024ed07f7fbf879fa6acbc013eb99fd Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 27 Jul 2015 15:59:04 -0700 Subject: [PATCH 05/13] remove the ControlSender and perform sync in Socket --- .../networking/src/udt/ControlSender.cpp | 40 ----------------- libraries/networking/src/udt/ControlSender.h | 39 ---------------- libraries/networking/src/udt/Socket.cpp | 45 +++++++------------ libraries/networking/src/udt/Socket.h | 5 ++- 4 files changed, 19 insertions(+), 110 deletions(-) delete mode 100644 libraries/networking/src/udt/ControlSender.cpp delete mode 100644 libraries/networking/src/udt/ControlSender.h diff --git a/libraries/networking/src/udt/ControlSender.cpp b/libraries/networking/src/udt/ControlSender.cpp deleted file mode 100644 index 1b955d0254..0000000000 --- a/libraries/networking/src/udt/ControlSender.cpp +++ /dev/null @@ -1,40 +0,0 @@ -// -// ControlSender.cpp -// libraries/networking/src/udt -// -// Created by Stephen Birarda on 2015-07-27. -// Copyright 2015 High Fidelity, Inc. -// -// Distributed under the Apache License, Version 2.0. -// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html -// - -#include -#include - -#include -#include - -#include "ControlSender.h" - -using namespace udt; - -void ControlSender::loop() { - while (!_isStopped) { - // grab the time now - auto start = std::chrono::high_resolution_clock::now(); - - // for each of the connections managed by the udt::Socket, we need to ask for the ACK value to send - - // since we're infinitely looping, give the thread a chance to process events - QCoreApplication::processEvents(); - - auto end = std::chrono::high_resolution_clock::now(); - - std::chrono::duration elapsed = end - start; - int timeToSleep = _synInterval - (int) elapsed.count(); - - // based on how much time it took us to process, let's figure out how much time we have need to sleep - std::this_thread::sleep_for(std::chrono::microseconds(timeToSleep)); - } -} diff --git a/libraries/networking/src/udt/ControlSender.h b/libraries/networking/src/udt/ControlSender.h deleted file mode 100644 index 040c6a70db..0000000000 --- a/libraries/networking/src/udt/ControlSender.h +++ /dev/null @@ -1,39 +0,0 @@ -// -// ControlSender.h -// libraries/networking/src/udt -// -// Created by Stephen Birarda on 2015-07-27. -// Copyright 2015 High Fidelity, Inc. -// -// Distributed under the Apache License, Version 2.0. -// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html -// - -#pragma once - -#ifndef hifi_ControlSender_h -#define hifi_ControlSender_h - -#include - -namespace udt { - -// Handles the sending of periodic control packets for all active UDT reliable connections -// Currently the interval for all connections is the same, so one thread is sufficient to manage all -class ControlSender : public QObject { - Q_OBJECT -public: - ControlSender(QObject* parent = 0) : QObject(parent) {}; - -public slots: - void loop(); // infinitely loops and sleeps to manage rate of control packet sending - void stop() { _isStopped = true; } // stops the loop - -private: - int _synInterval = 10 * 1000; - bool _isStopped { false }; -}; - -} - -#endif // hifi_ControlSender_h diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index 9078ac0f6d..a110a4f649 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -24,37 +24,11 @@ Socket::Socket(QObject* parent) : { connect(&_udpSocket, &QUdpSocket::readyRead, this, &Socket::readPendingDatagrams); - // make a QThread for the ControlSender to live on - QThread* controlThread = new QThread(this); + // make sure our synchronization method is called every SYN interval + connect(&_synTimer, &QTimer::timeout, this, &Socket::rateControlSync); - // setup the ControlSender and parent it - _controlSender = new ControlSender; - - // move the ControlSender to its thread - _controlSender->moveToThread(controlThread); - - // start the ControlSender once the thread is started - connect(controlThread, &QThread::started, _controlSender, &ControlSender::loop); - - // make sure the control thread is named so we can identify it - controlThread->setObjectName("UDT Control Thread"); - - // start the control thread - controlThread->start(); -} - -Socket::~Socket() { - if (_controlSender) { - QThread* controlThread = _controlSender->thread(); - - // tell the control sender to stop and be deleted so we can wait on its thread - _controlSender->stop(); - _controlSender->deleteLater(); - - // make sure the control thread goes down - controlThread->quit(); - controlThread->wait(); - } + // start our timer for the synchronization time interval + _synTimer->start(_synInterval); } void Socket::rebind() { @@ -148,3 +122,14 @@ void Socket::readPendingDatagrams() { } } } + +void Socket::rateControlSync() { + + + + if (_synTimer.interval() != _synInterval) { + // if the _synTimer interval doesn't match the current _synInterval (changes when the CC factory is changed) + // then restart it now with the right interval + _synTimer.start(_synInterval); + } +} diff --git a/libraries/networking/src/udt/Socket.h b/libraries/networking/src/udt/Socket.h index be6ef982af..35f8991634 100644 --- a/libraries/networking/src/udt/Socket.h +++ b/libraries/networking/src/udt/Socket.h @@ -18,6 +18,7 @@ #include #include +#include #include #include "../HifiSockAddr.h" @@ -61,6 +62,7 @@ public: private slots: void readPendingDatagrams(); + void rateControlSync(); private: QUdpSocket _udpSocket { this }; @@ -71,7 +73,8 @@ private: std::unordered_map _packetSequenceNumbers; - ControlSender* _controlSender { nullptr }; + int32_t _synInterval; + QTimer _synTimer; }; } // namespace udt From 10d6520098beb0c74039184c63f5b3c24f46cd04 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 27 Jul 2015 16:00:00 -0700 Subject: [PATCH 06/13] add a TODO message to Socket::rateControlSync --- libraries/networking/src/udt/Socket.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index a110a4f649..75924b791a 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -125,7 +125,7 @@ void Socket::readPendingDatagrams() { void Socket::rateControlSync() { - + // TODO: enumerate our list of connections and ask each of them to send off periodic ACK packet for rate control if (_synTimer.interval() != _synInterval) { // if the _synTimer interval doesn't match the current _synInterval (changes when the CC factory is changed) From d787e086327bc1e78a31eff197580936dd5553f9 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 27 Jul 2015 16:10:52 -0700 Subject: [PATCH 07/13] fix for rate control sync in Socket --- libraries/networking/src/udt/Socket.cpp | 7 +++---- libraries/networking/src/udt/Socket.h | 3 +-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index 75924b791a..c258283c61 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -14,7 +14,6 @@ #include #include "../NetworkLogging.h" -#include "ControlSender.h" #include "Packet.h" using namespace udt; @@ -28,7 +27,7 @@ Socket::Socket(QObject* parent) : connect(&_synTimer, &QTimer::timeout, this, &Socket::rateControlSync); // start our timer for the synchronization time interval - _synTimer->start(_synInterval); + _synTimer.start(_synInterval); } void Socket::rebind() { @@ -124,8 +123,8 @@ void Socket::readPendingDatagrams() { } void Socket::rateControlSync() { - - // TODO: enumerate our list of connections and ask each of them to send off periodic ACK packet for rate control + + // TODO: enumerate our list of connections and ask each of them to send off periodic ACK packet for rate control if (_synTimer.interval() != _synInterval) { // if the _synTimer interval doesn't match the current _synInterval (changes when the CC factory is changed) diff --git a/libraries/networking/src/udt/Socket.h b/libraries/networking/src/udt/Socket.h index 2f49b9caf7..4feafa6c4c 100644 --- a/libraries/networking/src/udt/Socket.h +++ b/libraries/networking/src/udt/Socket.h @@ -39,7 +39,6 @@ class Socket : public QObject { Q_OBJECT public: Socket(QObject* object = 0); - ~Socket(); quint16 localPort() const { return _udpSocket.localPort(); } @@ -73,7 +72,7 @@ private: std::unordered_map _packetSequenceNumbers; - int32_t _synInterval; + int32_t _synInterval = 10; // 10ms QTimer _synTimer; }; From 75a722f63c8eed010211d858c30fffd0e6d7280c Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 27 Jul 2015 16:12:14 -0700 Subject: [PATCH 08/13] remove a couple of unused variables --- libraries/networking/src/udt/Packet.cpp | 1 - libraries/networking/src/udt/Packet.h | 2 -- 2 files changed, 3 deletions(-) diff --git a/libraries/networking/src/udt/Packet.cpp b/libraries/networking/src/udt/Packet.cpp index e9a5636172..a70c3bfbef 100644 --- a/libraries/networking/src/udt/Packet.cpp +++ b/libraries/networking/src/udt/Packet.cpp @@ -117,7 +117,6 @@ Packet& Packet::operator=(Packet&& other) { static const uint32_t CONTROL_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 1); static const uint32_t RELIABILITY_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 2); static const uint32_t MESSAGE_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 3); -static const int BIT_FIELD_LENGTH = 3; static const uint32_t BIT_FIELD_MASK = CONTROL_BIT_MASK | RELIABILITY_BIT_MASK | MESSAGE_BIT_MASK; void Packet::readIsReliable() { diff --git a/libraries/networking/src/udt/Packet.h b/libraries/networking/src/udt/Packet.h index 905af15ded..6189be0ef1 100644 --- a/libraries/networking/src/udt/Packet.h +++ b/libraries/networking/src/udt/Packet.h @@ -33,8 +33,6 @@ public: using MessageNumber = uint32_t; using MessageNumberAndBitField = uint32_t; - static const uint32_t DEFAULT_SEQUENCE_NUMBER = 0; - static std::unique_ptr create(qint64 size = -1, bool isReliable = false, bool isPartOfMessage = false); static std::unique_ptr fromReceivedPacket(std::unique_ptr data, qint64 size, const HifiSockAddr& senderSockAddr); From 90d25156741c78d6bfcc5a63aed40c5f22a250e0 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 27 Jul 2015 16:43:49 -0700 Subject: [PATCH 09/13] add methods to send ACKs from Connection --- libraries/networking/src/udt/Connection.cpp | 46 ++++++++++++++++--- libraries/networking/src/udt/Connection.h | 9 +++- .../networking/src/udt/ControlPacket.cpp | 36 ++++++++++----- libraries/networking/src/udt/ControlPacket.h | 11 ++--- libraries/networking/src/udt/SendQueue.cpp | 4 +- libraries/networking/src/udt/SendQueue.h | 3 +- libraries/networking/src/udt/Socket.cpp | 2 +- libraries/networking/src/udt/Socket.h | 2 +- 8 files changed, 82 insertions(+), 31 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index 7fd643cd5c..4f309249ff 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -28,15 +28,47 @@ void Connection::send(std::unique_ptr packet) { } } +void Connection::sendACK() { + static const int ACK_PACKET_PAYLOAD_BYTES = 8; + + // setup the ACK packet, make it static so we can re-use it + static auto ackPacket = ControlPacket::create(ControlPacket::ACK, ACK_PACKET_PAYLOAD_BYTES); + + // have the send queue send off our packet + _sendQueue->sendPacket(*ackPacket); +} + +void Connection::sendLightACK() const { + static const int LIGHT_ACK_PACKET_PAYLOAD_BYTES = 4; + + // create the light ACK packet, make it static so we can re-use it + static auto lightACKPacket = ControlPacket::create(ControlPacket::ACK, LIGHT_ACK_PACKET_PAYLOAD_BYTES); + + SeqNum nextACKNumber = nextACK(); + + if (nextACKNumber != _lastReceivedAcknowledgedACK) { + // pack in the ACK + memcpy(lightACKPacket->getPayload(), &nextACKNumber, sizeof(nextACKNumber)); + + // have the send queue send off our packet + _sendQueue->sendPacket(*lightACKPacket); + } +} + +SeqNum Connection::nextACK() const { + // TODO: check if we have a loss list + return _largestReceivedSeqNum + 1; +} + void Connection::processReceivedSeqNum(SeqNum seq) { - if (udt::seqcmp(seq, _largestRecievedSeqNum + 1) > 0) { + if (udt::seqcmp(seq, _largestReceivedSeqNum + 1) > 0) { // TODO: Add range to loss list // TODO: Send loss report } - if (seq > _largestRecievedSeqNum) { - _largestRecievedSeqNum = seq; + if (seq > _largestReceivedSeqNum) { + _largestReceivedSeqNum = seq; } else { // TODO: Remove seq from loss list } @@ -44,13 +76,13 @@ void Connection::processReceivedSeqNum(SeqNum seq) { void Connection::processControl(std::unique_ptr controlPacket) { switch (controlPacket->getType()) { - case ControlPacket::Type::ACK: + case ControlPacket::ACK: break; - case ControlPacket::Type::ACK2: + case ControlPacket::ACK2: break; - case ControlPacket::Type::NAK: + case ControlPacket::NAK: break; - case ControlPacket::Type::PacketPair: + case ControlPacket::PacketPair: break; } } diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index 3f3b39524b..8bfc2a3d31 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -31,12 +31,17 @@ public: void send(std::unique_ptr packet); + void sendACK(); + void sendLightACK() const; + + SeqNum nextACK() const; + void processReceivedSeqNum(SeqNum seq); void processControl(std::unique_ptr controlPacket); private: - - SeqNum _largestRecievedSeqNum; + SeqNum _largestReceivedSeqNum; + SeqNum _lastReceivedAcknowledgedACK; // The last sent ACK that has been acknowledged via an ACK2 from the peer std::unique_ptr _sendQueue; }; diff --git a/libraries/networking/src/udt/ControlPacket.cpp b/libraries/networking/src/udt/ControlPacket.cpp index a8688f2cdf..8d3a1561fc 100644 --- a/libraries/networking/src/udt/ControlPacket.cpp +++ b/libraries/networking/src/udt/ControlPacket.cpp @@ -13,8 +13,22 @@ using namespace udt; -std::unique_ptr ControlPacket::create(Type type, const SequenceNumberList& sequenceNumbers) { - return ControlPacket::create(type, sequenceNumbers); +std::unique_ptr ControlPacket::create(Type type, qint64 size) { + + std::unique_ptr controlPacket; + + if (size == -1) { + controlPacket = ControlPacket::create(type); + } else { + // Fail with invalid size + Q_ASSERT(size >= 0); + + controlPacket = ControlPacket::create(type, size); + } + + controlPacket->open(QIODevice::ReadWrite); + + return controlPacket; } ControlPacket::ControlPacketPair ControlPacket::createPacketPair(quint64 timestamp) { @@ -32,18 +46,18 @@ qint64 ControlPacket::totalHeadersSize() const { return BasePacket::totalHeadersSize() + localHeaderSize(); } -ControlPacket::ControlPacket(Type type, const SequenceNumberList& sequenceNumbers) : - BasePacket(localHeaderSize() + (sizeof(Packet::SequenceNumber) * sequenceNumbers.size())), +ControlPacket::ControlPacket(Type type) : + BasePacket(-1), + _type(type) +{ + adjustPayloadStartAndCapacity(); +} + +ControlPacket::ControlPacket(Type type, qint64 size) : + BasePacket(localHeaderSize() + size), _type(type) { adjustPayloadStartAndCapacity(); - - open(QIODevice::ReadWrite); - - // pack in the sequence numbers - for (auto& sequenceNumber : sequenceNumbers) { - writePrimitive(sequenceNumber); - } } ControlPacket::ControlPacket(quint64 timestamp) : diff --git a/libraries/networking/src/udt/ControlPacket.h b/libraries/networking/src/udt/ControlPacket.h index 8b9ccbf073..2eab8a2192 100644 --- a/libraries/networking/src/udt/ControlPacket.h +++ b/libraries/networking/src/udt/ControlPacket.h @@ -21,23 +21,21 @@ namespace udt { -using SequenceNumberList = std::vector; - class ControlPacket : public BasePacket { Q_OBJECT public: using TypeAndSubSequenceNumber = uint32_t; using ControlPacketPair = std::pair, std::unique_ptr>; - enum class Type : uint16_t { + enum Type : uint16_t { ACK, ACK2, NAK, PacketPair }; - std::unique_ptr create(Type type, const SequenceNumberList& sequenceNumbers); - ControlPacketPair createPacketPair(quint64 timestamp); + static std::unique_ptr create(Type type, qint64 size = -1); + static ControlPacketPair createPacketPair(quint64 timestamp); static qint64 localHeaderSize(); // Current level's header size virtual qint64 totalHeadersSize() const; // Cumulated size of all the headers @@ -45,7 +43,8 @@ public: Type getType() const { return _type; } private: - ControlPacket(Type type, const SequenceNumberList& sequenceNumbers); + ControlPacket(Type type); + ControlPacket(Type type, qint64 size); ControlPacket(quint64 timestamp); ControlPacket(ControlPacket&& other); ControlPacket(const ControlPacket& other) = delete; diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 7561a7042d..65c1807d19 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -80,8 +80,8 @@ void SendQueue::stop() { _running = false; } -void SendQueue::sendPacket(const Packet& packet) { - _socket->writeDatagram(packet.getData(), packet.getDataSize(), _destination); +void SendQueue::sendPacket(const BasePacket& packet) { + _socket->writeUnreliablePacket(packet, _destination); } void SendQueue::ack(SeqNum ack) { diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 1a1a2fc90d..9c14c19ede 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -26,6 +26,7 @@ namespace udt { class Socket; +class BasePacket; class Packet; class SendQueue : public QObject { @@ -47,7 +48,7 @@ public: public slots: void start(); void stop(); - void sendPacket(const Packet& packet); + void sendPacket(const BasePacket& packet); void ack(SeqNum ack); void nak(std::list naks); diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index c258283c61..6923879201 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -66,7 +66,7 @@ void Socket::setBufferSizes(int numBytes) { } } -qint64 Socket::writeUnreliablePacket(const Packet& packet, const HifiSockAddr& sockAddr) { +qint64 Socket::writeUnreliablePacket(const BasePacket& packet, const HifiSockAddr& sockAddr) { return writeDatagram(packet.getData(), packet.getDataSize(), sockAddr); } diff --git a/libraries/networking/src/udt/Socket.h b/libraries/networking/src/udt/Socket.h index 4feafa6c4c..f74ea49f0b 100644 --- a/libraries/networking/src/udt/Socket.h +++ b/libraries/networking/src/udt/Socket.h @@ -42,7 +42,7 @@ public: quint16 localPort() const { return _udpSocket.localPort(); } - qint64 writeUnreliablePacket(const Packet& packet, const HifiSockAddr& sockAddr); + qint64 writeUnreliablePacket(const BasePacket& packet, const HifiSockAddr& sockAddr); qint64 writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr) { return writeDatagram(QByteArray::fromRawData(data, size), sockAddr); } From f6fb421bf297c09c234b9e3dbd198b6a81a802ce Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 27 Jul 2015 17:15:26 -0700 Subject: [PATCH 10/13] write out more packing of ACK packet in Connection --- libraries/networking/src/udt/Connection.cpp | 67 ++++++++++++++++++--- libraries/networking/src/udt/Connection.h | 12 +++- 2 files changed, 68 insertions(+), 11 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index 4f309249ff..ae0ea6e065 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -17,6 +17,7 @@ #include "Socket.h" using namespace udt; +using namespace std::chrono; Connection::Connection(Socket* parentSocket, HifiSockAddr destination) { @@ -28,12 +29,57 @@ void Connection::send(std::unique_ptr packet) { } } -void Connection::sendACK() { - static const int ACK_PACKET_PAYLOAD_BYTES = 8; +void Connection::sendACK(bool wasCausedBySyncTimeout) { + static const int ACK_PACKET_PAYLOAD_BYTES = sizeof(SeqNum); // setup the ACK packet, make it static so we can re-use it static auto ackPacket = ControlPacket::create(ControlPacket::ACK, ACK_PACKET_PAYLOAD_BYTES); - + + auto currentTime = high_resolution_clock::now(); + + SeqNum nextACKNumber = nextACK(); + + if (nextACKNumber <= _lastReceivedAcknowledgedACK) { + // we already got an ACK2 for this ACK we would be sending, don't bother + return; + } + + if (nextACKNumber >= _lastSentACK) { + // we have received new packets since the last sent ACK + + // update the last sent ACK + _lastSentACK = nextACKNumber; + + // remove the ACKed packets from the receive queue + + } else if (nextACKNumber == _lastSentACK) { + // We already sent this ACK, but check if we should re-send it. + // We will re-send if it has been more than RTT + (4 * RTT variance) since the last ACK + milliseconds sinceLastACK = duration_cast(currentTime - _lastACKTime); + if (sinceLastACK.count() < (_rtt + (4 * _rttVariance))) { + return; + } + } + + // reset the ACK packet so we can fill it up and have it figure out what size it is + ackPacket->reset(); + + // pack in the ACK + ackPacket->writePrimitive(nextACKNumber); + + // pack in the RTT and variance + ackPacket->writePrimitive(_rtt); + ackPacket->writePrimitive(_rttVariance); + + // pack the available buffer size - must be a minimum of 2 + + if (wasCausedBySyncTimeout) { + // pack in the receive speed and bandwidth + + // record this as the last ACK send time + _lastACKTime = high_resolution_clock::now(); + } + // have the send queue send off our packet _sendQueue->sendPacket(*ackPacket); } @@ -46,13 +92,16 @@ void Connection::sendLightACK() const { SeqNum nextACKNumber = nextACK(); - if (nextACKNumber != _lastReceivedAcknowledgedACK) { - // pack in the ACK - memcpy(lightACKPacket->getPayload(), &nextACKNumber, sizeof(nextACKNumber)); - - // have the send queue send off our packet - _sendQueue->sendPacket(*lightACKPacket); + if (nextACKNumber == _lastReceivedAcknowledgedACK) { + // we already got an ACK2 for this ACK we would be sending, don't bother + return; } + + // pack in the ACK + memcpy(lightACKPacket->getPayload(), &nextACKNumber, sizeof(nextACKNumber)); + + // have the send queue send off our packet + _sendQueue->sendPacket(*lightACKPacket); } SeqNum Connection::nextACK() const { diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index 8bfc2a3d31..882de3b79f 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -12,6 +12,7 @@ #ifndef hifi_Connection_h #define hifi_Connection_h +#include #include #include "SendQueue.h" @@ -31,7 +32,7 @@ public: void send(std::unique_ptr packet); - void sendACK(); + void sendACK(bool wasCausedBySyncTimeout = true); void sendLightACK() const; SeqNum nextACK() const; @@ -40,8 +41,15 @@ public: void processControl(std::unique_ptr controlPacket); private: - SeqNum _largestReceivedSeqNum; + SeqNum _largestReceivedSeqNum; // The largest sequence number received from the peer + SeqNum _lastSentACK; // The last sent ACK SeqNum _lastReceivedAcknowledgedACK; // The last sent ACK that has been acknowledged via an ACK2 from the peer + + int32_t _rtt; // RTT, in milliseconds + int32_t _rttVariance; // RTT variance + + std::chrono::high_resolution_clock::time_point _lastACKTime; + std::unique_ptr _sendQueue; }; From 52411bb8bac7ecd18515782761a56194ddddeade Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 27 Jul 2015 17:23:39 -0700 Subject: [PATCH 11/13] pack ACK sub sequence number manually for control --- libraries/networking/src/udt/Connection.cpp | 5 ++++- libraries/networking/src/udt/Connection.h | 1 + libraries/networking/src/udt/ControlPacket.cpp | 2 +- libraries/networking/src/udt/ControlPacket.h | 2 +- 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index ae0ea6e065..4b249bc91e 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -64,7 +64,10 @@ void Connection::sendACK(bool wasCausedBySyncTimeout) { // reset the ACK packet so we can fill it up and have it figure out what size it is ackPacket->reset(); - // pack in the ACK + // pack in the ACK sub-sequence number + ackPacket->writePrimitive(_currentACKSubSequenceNumber++); + + // pack in the ACK number ackPacket->writePrimitive(nextACKNumber); // pack in the RTT and variance diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index 882de3b79f..a9c73ce74a 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -44,6 +44,7 @@ private: SeqNum _largestReceivedSeqNum; // The largest sequence number received from the peer SeqNum _lastSentACK; // The last sent ACK SeqNum _lastReceivedAcknowledgedACK; // The last sent ACK that has been acknowledged via an ACK2 from the peer + SeqNum _currentACKSubSequenceNumber; // The current ACK sub-sequence number (used for Acknowledgment of ACKs) int32_t _rtt; // RTT, in milliseconds int32_t _rttVariance; // RTT variance diff --git a/libraries/networking/src/udt/ControlPacket.cpp b/libraries/networking/src/udt/ControlPacket.cpp index 8d3a1561fc..f62c7a6454 100644 --- a/libraries/networking/src/udt/ControlPacket.cpp +++ b/libraries/networking/src/udt/ControlPacket.cpp @@ -39,7 +39,7 @@ ControlPacket::ControlPacketPair ControlPacket::createPacketPair(quint64 timesta } qint64 ControlPacket::localHeaderSize() { - return sizeof(TypeAndSubSequenceNumber); + return sizeof(BitFieldAndControlType); } qint64 ControlPacket::totalHeadersSize() const { diff --git a/libraries/networking/src/udt/ControlPacket.h b/libraries/networking/src/udt/ControlPacket.h index 2eab8a2192..049f95cf59 100644 --- a/libraries/networking/src/udt/ControlPacket.h +++ b/libraries/networking/src/udt/ControlPacket.h @@ -24,7 +24,7 @@ namespace udt { class ControlPacket : public BasePacket { Q_OBJECT public: - using TypeAndSubSequenceNumber = uint32_t; + using BitFieldAndControlType = uint32_t; using ControlPacketPair = std::pair, std::unique_ptr>; enum Type : uint16_t { From d2c5e79ac2e0d26c1fcde50987ecc2e37f3cd5b9 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 27 Jul 2015 17:38:10 -0700 Subject: [PATCH 12/13] add writing of control bit and type to ControlPacket --- .../networking/src/udt/ControlPacket.cpp | 32 +++++++++++++++---- libraries/networking/src/udt/ControlPacket.h | 5 ++- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/libraries/networking/src/udt/ControlPacket.cpp b/libraries/networking/src/udt/ControlPacket.cpp index f62c7a6454..8b863c5a21 100644 --- a/libraries/networking/src/udt/ControlPacket.cpp +++ b/libraries/networking/src/udt/ControlPacket.cpp @@ -18,17 +18,13 @@ std::unique_ptr ControlPacket::create(Type type, qint64 size) { std::unique_ptr controlPacket; if (size == -1) { - controlPacket = ControlPacket::create(type); + return ControlPacket::create(type); } else { // Fail with invalid size Q_ASSERT(size >= 0); - controlPacket = ControlPacket::create(type, size); + return ControlPacket::create(type, size); } - - controlPacket->open(QIODevice::ReadWrite); - - return controlPacket; } ControlPacket::ControlPacketPair ControlPacket::createPacketPair(quint64 timestamp) { @@ -39,7 +35,7 @@ ControlPacket::ControlPacketPair ControlPacket::createPacketPair(quint64 timesta } qint64 ControlPacket::localHeaderSize() { - return sizeof(BitFieldAndControlType); + return sizeof(ControlBitAndType); } qint64 ControlPacket::totalHeadersSize() const { @@ -51,6 +47,10 @@ ControlPacket::ControlPacket(Type type) : _type(type) { adjustPayloadStartAndCapacity(); + + open(QIODevice::ReadWrite); + + writeControlBitAndType(); } ControlPacket::ControlPacket(Type type, qint64 size) : @@ -58,6 +58,10 @@ ControlPacket::ControlPacket(Type type, qint64 size) : _type(type) { adjustPayloadStartAndCapacity(); + + open(QIODevice::ReadWrite); + + writeControlBitAndType(); } ControlPacket::ControlPacket(quint64 timestamp) : @@ -68,6 +72,8 @@ ControlPacket::ControlPacket(quint64 timestamp) : open(QIODevice::ReadWrite); + writeControlBitAndType(); + // pack in the timestamp writePrimitive(timestamp); } @@ -85,3 +91,15 @@ ControlPacket& ControlPacket::operator=(ControlPacket&& other) { return *this; } + +static const uint32_t CONTROL_BIT_MASK = 1 << (sizeof(ControlPacket::ControlBitAndType) - 1); + +void ControlPacket::writeControlBitAndType() { + ControlBitAndType* bitAndType = reinterpret_cast(_packet.get()); + + // write the control bit by OR'ing the current value with the CONTROL_BIT_MASK + *bitAndType = (*bitAndType | CONTROL_BIT_MASK); + + // write the type by OR'ing the type with the current value & CONTROL_BIT_MASK + *bitAndType = (*bitAndType & CONTROL_BIT_MASK) | (_type << sizeof((ControlPacket::Type) - 1)); +} diff --git a/libraries/networking/src/udt/ControlPacket.h b/libraries/networking/src/udt/ControlPacket.h index 049f95cf59..6d1b7eb5d3 100644 --- a/libraries/networking/src/udt/ControlPacket.h +++ b/libraries/networking/src/udt/ControlPacket.h @@ -24,7 +24,7 @@ namespace udt { class ControlPacket : public BasePacket { Q_OBJECT public: - using BitFieldAndControlType = uint32_t; + using ControlBitAndType = uint32_t; using ControlPacketPair = std::pair, std::unique_ptr>; enum Type : uint16_t { @@ -52,6 +52,9 @@ private: ControlPacket& operator=(ControlPacket&& other); ControlPacket& operator=(const ControlPacket& other) = delete; + // Header writers + void writeControlBitAndType(); + Type _type; }; From 75765d02e4ba3d073aa74758a11d163989538514 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 27 Jul 2015 17:40:32 -0700 Subject: [PATCH 13/13] correct sizeof ACK_PACKET_PAYLOAD_BYTES in Connection --- libraries/networking/src/udt/Connection.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index 4b249bc91e..3753729123 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -30,7 +30,8 @@ void Connection::send(std::unique_ptr packet) { } void Connection::sendACK(bool wasCausedBySyncTimeout) { - static const int ACK_PACKET_PAYLOAD_BYTES = sizeof(SeqNum); + static const int ACK_PACKET_PAYLOAD_BYTES = sizeof(_lastSentACK) + sizeof(_currentACKSubSequenceNumber) + + sizeof(_rtt) + sizeof(_rttVariance) + sizeof(int32_t) + sizeof(int32_t); // setup the ACK packet, make it static so we can re-use it static auto ackPacket = ControlPacket::create(ControlPacket::ACK, ACK_PACKET_PAYLOAD_BYTES);