From 3c6b7e3bd005d0a77ee6f94709b3c0f4f7485fa2 Mon Sep 17 00:00:00 2001 From: birarda Date: Tue, 18 Dec 2018 13:17:37 -0800 Subject: [PATCH] add a cap to cycled message channels in SendQueue --- libraries/networking/src/udt/PacketQueue.cpp | 46 +++++++++++++------- libraries/networking/src/udt/PacketQueue.h | 14 +++--- 2 files changed, 39 insertions(+), 21 deletions(-) diff --git a/libraries/networking/src/udt/PacketQueue.cpp b/libraries/networking/src/udt/PacketQueue.cpp index 0560855ecb..b2e3000f74 100644 --- a/libraries/networking/src/udt/PacketQueue.cpp +++ b/libraries/networking/src/udt/PacketQueue.cpp @@ -16,7 +16,8 @@ using namespace udt; PacketQueue::PacketQueue(MessageNumber messageNumber) : _currentMessageNumber(messageNumber) { - _channels.emplace_back(new std::list()); + _channels.emplace_front(new std::list()); + _currentChannel = _channels.begin(); } MessageNumber PacketQueue::getNextMessageNumber() { @@ -27,21 +28,28 @@ MessageNumber PacketQueue::getNextMessageNumber() { bool PacketQueue::isEmpty() const { LockGuard locker(_packetsLock); + // Only the main channel and it is empty - return (_channels.size() == 1) && _channels.front()->empty(); + return _channels.size() == 1 && _channels.front()->empty(); } PacketQueue::PacketPointer PacketQueue::takePacket() { LockGuard locker(_packetsLock); + if (isEmpty()) { return PacketPointer(); } - // Find next non empty channel - if (_channels[nextIndex()]->empty()) { - nextIndex(); + // handle the case where we are looking at the first channel and it is empty + if (_currentChannel == _channels.begin() && (*_currentChannel)->empty()) { + ++_currentChannel; } - auto& channel = _channels[_currentIndex]; + + // at this point the current channel should always not be at the end and should also not be empty + Q_ASSERT(_currentChannel != _channels.end()); + + auto& channel = *_currentChannel; + Q_ASSERT(!channel->empty()); // Take front packet @@ -49,20 +57,28 @@ PacketQueue::PacketPointer PacketQueue::takePacket() { channel->pop_front(); // Remove now empty channel (Don't remove the main channel) - if (channel->empty() && _currentIndex != 0) { - channel->swap(*_channels.back()); - _channels.pop_back(); - --_currentIndex; + if (channel->empty() && _currentChannel != _channels.begin()) { + // erase the current channel and slide the iterator to the next channel + _currentChannel = _channels.erase(_currentChannel); + } else { + ++_currentChannel; + } + + // push forward our number of channels taken from + ++_channelsVisitedCount; + + // check if we need to restart back at the front channel (main) + // to respect our capped number of channels considered concurrently + static const int MAX_CHANNELS_SENT_CONCURRENTLY = 16; + + if (_currentChannel == _channels.end() || _channelsVisitedCount >= MAX_CHANNELS_SENT_CONCURRENTLY) { + _channelsVisitedCount = 0; + _currentChannel = _channels.begin(); } return packet; } -unsigned int PacketQueue::nextIndex() { - _currentIndex = (_currentIndex + 1) % _channels.size(); - return _currentIndex; -} - void PacketQueue::queuePacket(PacketPointer packet) { LockGuard locker(_packetsLock); _channels.front()->push_back(std::move(packet)); diff --git a/libraries/networking/src/udt/PacketQueue.h b/libraries/networking/src/udt/PacketQueue.h index bc4c5e3432..f696864e6b 100644 --- a/libraries/networking/src/udt/PacketQueue.h +++ b/libraries/networking/src/udt/PacketQueue.h @@ -30,8 +30,9 @@ class PacketQueue { using LockGuard = std::lock_guard; using PacketPointer = std::unique_ptr; using PacketListPointer = std::unique_ptr; - using Channel = std::unique_ptr>; - using Channels = std::vector; + using RawChannel = std::list; + using Channel = std::unique_ptr; + using Channels = std::list; public: PacketQueue(MessageNumber messageNumber = 0); @@ -47,16 +48,17 @@ public: private: MessageNumber getNextMessageNumber(); - unsigned int nextIndex(); - + MessageNumber _currentMessageNumber { 0 }; mutable Mutex _packetsLock; // Protects the packets to be sent. Channels _channels; // One channel per packet list + Main channel - unsigned int _currentIndex { 0 }; + + Channels::iterator _currentChannel; + unsigned int _channelsVisitedCount { 0 }; }; } -#endif // hifi_PacketQueue_h \ No newline at end of file +#endif // hifi_PacketQueue_h