diff --git a/libraries/networking/src/udt/PacketQueue.cpp b/libraries/networking/src/udt/PacketQueue.cpp index 088908200d..d0110852cd 100644 --- a/libraries/networking/src/udt/PacketQueue.cpp +++ b/libraries/networking/src/udt/PacketQueue.cpp @@ -11,12 +11,10 @@ #include "PacketQueue.h" -#include "Packet.h" #include "PacketList.h" using namespace udt; - MessageNumber PacketQueue::getNextMessageNumber() { static const MessageNumber MAX_MESSAGE_NUMBER = MessageNumber(1) << MESSAGE_NUMBER_BITS; _currentMessageNumber = (_currentMessageNumber + 1) % MAX_MESSAGE_NUMBER; @@ -25,28 +23,50 @@ MessageNumber PacketQueue::getNextMessageNumber() { bool PacketQueue::isEmpty() const { LockGuard locker(_packetsLock); - return _packets.empty(); + // Only the main channel and it is empty + return (_channels.size() == 1) && _channels.front().empty(); } PacketQueue::PacketPointer PacketQueue::takePacket() { LockGuard locker(_packetsLock); - if (!_packets.empty()) { - auto packet = std::move(_packets.front()); - _packets.pop_front(); - return std::move(packet); + if (isEmpty()) { + return PacketPointer(); } - return PacketPointer(); + // Find next non empty channel + if (_channels[nextIndex()].empty()) { + nextIndex(); + } + auto& channel = _channels[_currentIndex]; + Q_ASSERT(!channel.empty()); + + // Take front packet + auto packet = std::move(channel.front()); + channel.pop_front(); + + // Remove now empty channel (Don't remove the main channel) + if (channel.empty() && _currentIndex != 0) { + channel.swap(_channels.back()); + _channels.pop_back(); + --_currentIndex; + } + + return std::move(packet); +} + +unsigned int PacketQueue::nextIndex() { + _currentIndex = ++_currentIndex % _channels.size(); + return _currentIndex; } void PacketQueue::queuePacket(PacketPointer packet) { LockGuard locker(_packetsLock); - _packets.push_back(std::move(packet)); + _channels.front().push_back(std::move(packet)); } void PacketQueue::queuePacketList(PacketListPointer packetList) { packetList->preparePackets(getNextMessageNumber()); LockGuard locker(_packetsLock); - _packets.splice(_packets.end(), packetList->_packets); -} \ No newline at end of file + _channels.push_back(std::move(packetList->_packets)); +} diff --git a/libraries/networking/src/udt/PacketQueue.h b/libraries/networking/src/udt/PacketQueue.h index a687ccf0bc..f3d925af30 100644 --- a/libraries/networking/src/udt/PacketQueue.h +++ b/libraries/networking/src/udt/PacketQueue.h @@ -13,12 +13,14 @@ #define hifi_PacketQueue_h #include +#include #include #include +#include "Packet.h" + namespace udt { -class Packet; class PacketList; using MessageNumber = uint32_t; @@ -28,7 +30,8 @@ class PacketQueue { using LockGuard = std::lock_guard; using PacketPointer = std::unique_ptr; using PacketListPointer = std::unique_ptr; - using PacketContainer = std::list; + using Channel = std::list; + using Channels = std::vector; public: void queuePacket(PacketPointer packet); @@ -37,14 +40,17 @@ public: bool isEmpty() const; PacketPointer takePacket(); - MessageNumber getNextMessageNumber(); Mutex& getLock() { return _packetsLock; } private: + MessageNumber getNextMessageNumber(); + unsigned int nextIndex(); + MessageNumber _currentMessageNumber { 0 }; - mutable Mutex _packetsLock; // Protects the packets to be sent list. - PacketContainer _packets; // List of packets to be sent + mutable Mutex _packetsLock; // Protects the packets to be sent. + Channels _channels { 1 }; // One channel per packet list + unsigned int _currentIndex { 0 }; }; } diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 2354bfa377..6ba947553a 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -296,6 +296,7 @@ bool SendQueue::maybeSendNewPacket() { // grab the first packet we will send std::unique_ptr firstPacket = _packets.takePacket(); + Q_ASSERT(firstPacket); std::unique_ptr secondPacket; bool shouldSendPairTail = false;