diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index 507e11f206..9b58dc41a5 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -11,6 +11,8 @@ #include "Connection.h" +#include + #include #include "../HifiSockAddr.h" @@ -28,6 +30,19 @@ Connection::Connection(Socket* parentSocket, HifiSockAddr destination) : { } +Connection::~Connection() { + if (_sendQueue) { + // tell our send queue to stop and wait until its send thread is done + QThread* sendQueueThread = _sendQueue->thread(); + + _sendQueue->stop(); + _sendQueue->deleteLater(); + + sendQueueThread->quit(); + sendQueueThread->wait(); + } +} + void Connection::sendReliablePacket(unique_ptr packet) { Q_ASSERT_X(packet->isReliable(), "Connection::send", "Trying to send an unreliable packet reliably."); diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index 76725cae42..200f8acda8 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -33,6 +33,7 @@ public: using SentACKMap = std::unordered_map; Connection(Socket* parentSocket, HifiSockAddr destination); + ~Connection(); void sendReliablePacket(std::unique_ptr packet); diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 7824e01d17..c5542af978 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -13,6 +13,7 @@ #include +#include #include #include @@ -21,6 +22,7 @@ #include "Socket.h" using namespace udt; +using namespace std::chrono; std::unique_ptr SendQueue::create(Socket* socket, HifiSockAddr dest) { auto queue = std::unique_ptr(new SendQueue(socket, dest)); @@ -42,17 +44,7 @@ SendQueue::SendQueue(Socket* socket, HifiSockAddr dest) : _socket(socket), _destination(dest) { - _sendTimer.reset(new QTimer(this)); - _sendTimer->setSingleShot(true); - QObject::connect(_sendTimer.get(), &QTimer::timeout, this, &SendQueue::sendNextPacket); - _packetSendPeriod = DEFAULT_SEND_PERIOD; - _lastSendTimestamp = 0; -} - -SendQueue::~SendQueue() { - assert(thread() == QThread::currentThread()); - _sendTimer->stop(); } void SendQueue::queuePacket(std::unique_ptr packet) { @@ -60,24 +52,24 @@ void SendQueue::queuePacket(std::unique_ptr packet) { QWriteLocker locker(&_packetsLock); _packets.push_back(std::move(packet)); } - if (!_running) { - start(); + if (!_isRunning) { + run(); } } -void SendQueue::start() { +void SendQueue::run() { // We need to make sure this is called on the right thread if (thread() != QThread::currentThread()) { - QMetaObject::invokeMethod(this, "start", Qt::QueuedConnection); + QMetaObject::invokeMethod(this, "run", Qt::QueuedConnection); } - _running = true; + _isRunning = true; - // This will send a packet and fire the send timer - sendNextPacket(); + // This will loop and sleep to send packets + loop(); } void SendQueue::stop() { - _running = false; + _isRunning = false; } void SendQueue::sendPacket(const BasePacket& packet) { @@ -128,62 +120,69 @@ SequenceNumber SendQueue::getNextSequenceNumber() { return _currentSequenceNumber; } -void SendQueue::sendNextPacket() { - if (!_running) { - return; - } - - // Record timing - auto sendTime = msecTimestampNow(); // msec - _lastSendTimestamp = sendTime; - - if (_nextPacket) { - // Write packet's sequence number and send it off - _nextPacket->setSequenceNumber(getNextSequenceNumber()); - sendPacket(*_nextPacket); +void SendQueue::loop() { + while (_isRunning) { + // Record timing + _lastSendTimestamp = high_resolution_clock::now(); - // Insert the packet we have just sent in the sent list - QWriteLocker locker(&_sentLock); - _sentPackets[_nextPacket->getSequenceNumber()].swap(_nextPacket); - Q_ASSERT_X(!_nextPacket, - "SendQueue::sendNextPacket()", "Overriden packet in sent list"); - } - - bool hasResend = false; - SequenceNumber seqNum; - { - // Check nak list for packet to resend - QWriteLocker locker(&_naksLock); - if (!_naks.empty()) { - hasResend = true; - seqNum = _naks.front(); - _naks.pop_front(); + if (_nextPacket) { + // Write packet's sequence number and send it off + _nextPacket->setSequenceNumber(getNextSequenceNumber()); + sendPacket(*_nextPacket); + + // Insert the packet we have just sent in the sent list + QWriteLocker locker(&_sentLock); + _sentPackets[_nextPacket->getSequenceNumber()].swap(_nextPacket); + Q_ASSERT_X(!_nextPacket, + "SendQueue::sendNextPacket()", "Overriden packet in sent list"); + } + + bool hasResend = false; + SequenceNumber seqNum; + { + // Check nak list for packet to resend + QWriteLocker locker(&_naksLock); + if (!_naks.empty()) { + hasResend = true; + seqNum = _naks.front(); + _naks.pop_front(); + } + } + + // Find packet in sent list using SequenceNumber + if (hasResend) { + QWriteLocker locker(&_sentLock); + auto it = _sentPackets.find(seqNum); + Q_ASSERT_X(it != _sentPackets.end(), + "SendQueue::sendNextPacket()", "Couldn't find NAKed packet to resend"); + + if (it != _sentPackets.end()) { + it->second.swap(_nextPacket); + _sentPackets.erase(it); + } + } + + // If there is no packet to resend, grab the next one in the list + if (!_nextPacket) { + QWriteLocker locker(&_packetsLock); + _nextPacket.swap(_packets.front()); + _packets.pop_front(); + } + + // since we're a while loop, give the thread a chance to process events + QCoreApplication::processEvents(); + + // we just processed events so check now if we were just told to stop + if (!_isRunning) { + break; + } + + // sleep as long as we need until next packet send, if we can + auto now = high_resolution_clock::now(); + auto microsecondDuration = (_lastSendTimestamp + microseconds(_packetSendPeriod)) - now; + + if (microsecondDuration.count() > 0) { + usleep(microsecondDuration.count()); } } - - // Find packet in sent list using SequenceNumber - if (hasResend) { - QWriteLocker locker(&_sentLock); - auto it = _sentPackets.find(seqNum); - Q_ASSERT_X(it != _sentPackets.end(), - "SendQueue::sendNextPacket()", "Couldn't find NAKed packet to resend"); - - if (it != _sentPackets.end()) { - it->second.swap(_nextPacket); - _sentPackets.erase(it); - } - } - - // If there is no packet to resend, grab the next one in the list - if (!_nextPacket) { - QWriteLocker locker(&_packetsLock); - _nextPacket.swap(_packets.front()); - _packets.pop_front(); - } - - // check if we need to fire off a packet pair - we do this - - // How long before next packet send - auto timeToSleep = (sendTime + _packetSendPeriod) - msecTimestampNow(); // msec - _sendTimer->start(std::max((quint64)0, timeToSleep)); } diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 1c30d1580a..f532bf7fe2 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -12,6 +12,7 @@ #ifndef hifi_SendQueue_h #define hifi_SendQueue_h +#include #include #include @@ -33,15 +34,13 @@ class SendQueue : public QObject { Q_OBJECT public: - static const int DEFAULT_SEND_PERIOD = 16; // msec + static const int DEFAULT_SEND_PERIOD = 16 * 1000; // 16ms, in microseconds static std::unique_ptr create(Socket* socket, HifiSockAddr dest); void queuePacket(std::unique_ptr packet); int getQueueSize() const { QReadLocker locker(&_packetsLock); return _packets.size(); } - quint64 getLastSendTimestamp() const { return _lastSendTimestamp; } - SequenceNumber getCurrentSequenceNumber() const { return SequenceNumber(_atomicCurrentSequenceNumber); } int getPacketSendPeriod() const { return _packetSendPeriod; } @@ -51,14 +50,14 @@ public: void sendPacket(const BasePacket& packet); public slots: - void start(); + void run(); void stop(); void ack(SequenceNumber ack); void nak(std::list naks); private slots: - void sendNextPacket(); + void loop(); private: friend struct std::default_delete; @@ -66,7 +65,6 @@ private: SendQueue(Socket* socket, HifiSockAddr dest); SendQueue(SendQueue& other) = delete; SendQueue(SendQueue&& other) = delete; - ~SendQueue(); // Increments current sequence number and return it SequenceNumber getNextSequenceNumber(); @@ -82,10 +80,9 @@ private: SequenceNumber _currentSequenceNumber; // Last sequence number sent out std::atomic _atomicCurrentSequenceNumber; // Atomic for last sequence number sent out - std::unique_ptr _sendTimer; // Send timer - std::atomic _packetSendPeriod { 0 }; // Interval between two packet send envent in msec - std::atomic _lastSendTimestamp { 0 }; // Record last time of packet departure - std::atomic _running { false }; + std::atomic _packetSendPeriod { 0 }; // Interval between two packet send event in microseconds + std::chrono::high_resolution_clock::time_point _lastSendTimestamp; // Record last time of packet departure + std::atomic _isRunning { false }; mutable QReadWriteLock _naksLock; // Protects the naks list. std::list _naks; // Sequence numbers of packets to resend