Merge pull request #14621 from birarda/feat/message-cycle-cap

cap the number of considered message channels in PacketQueue
This commit is contained in:
John Conklin II 2019-01-09 17:43:36 -08:00 committed by GitHub
commit 83cb60f46f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 21 deletions

View file

@ -16,7 +16,8 @@
using namespace udt;
PacketQueue::PacketQueue(MessageNumber messageNumber) : _currentMessageNumber(messageNumber) {
_channels.emplace_back(new std::list<PacketPointer>());
_channels.emplace_front(new std::list<PacketPointer>());
_currentChannel = _channels.begin();
}
MessageNumber PacketQueue::getNextMessageNumber() {
@ -27,21 +28,28 @@ MessageNumber PacketQueue::getNextMessageNumber() {
bool PacketQueue::isEmpty() const {
LockGuard locker(_packetsLock);
// Only the main channel and it is empty
return (_channels.size() == 1) && _channels.front()->empty();
return _channels.size() == 1 && _channels.front()->empty();
}
PacketQueue::PacketPointer PacketQueue::takePacket() {
LockGuard locker(_packetsLock);
if (isEmpty()) {
return PacketPointer();
}
// Find next non empty channel
if (_channels[nextIndex()]->empty()) {
nextIndex();
// handle the case where we are looking at the first channel and it is empty
if (_currentChannel == _channels.begin() && (*_currentChannel)->empty()) {
++_currentChannel;
}
auto& channel = _channels[_currentIndex];
// at this point the current channel should always not be at the end and should also not be empty
Q_ASSERT(_currentChannel != _channels.end());
auto& channel = *_currentChannel;
Q_ASSERT(!channel->empty());
// Take front packet
@ -49,20 +57,28 @@ PacketQueue::PacketPointer PacketQueue::takePacket() {
channel->pop_front();
// Remove now empty channel (Don't remove the main channel)
if (channel->empty() && _currentIndex != 0) {
channel->swap(*_channels.back());
_channels.pop_back();
--_currentIndex;
if (channel->empty() && _currentChannel != _channels.begin()) {
// erase the current channel and slide the iterator to the next channel
_currentChannel = _channels.erase(_currentChannel);
} else {
++_currentChannel;
}
// push forward our number of channels taken from
++_channelsVisitedCount;
// check if we need to restart back at the front channel (main)
// to respect our capped number of channels considered concurrently
static const int MAX_CHANNELS_SENT_CONCURRENTLY = 16;
if (_currentChannel == _channels.end() || _channelsVisitedCount >= MAX_CHANNELS_SENT_CONCURRENTLY) {
_channelsVisitedCount = 0;
_currentChannel = _channels.begin();
}
return packet;
}
unsigned int PacketQueue::nextIndex() {
_currentIndex = (_currentIndex + 1) % _channels.size();
return _currentIndex;
}
void PacketQueue::queuePacket(PacketPointer packet) {
LockGuard locker(_packetsLock);
_channels.front()->push_back(std::move(packet));

View file

@ -30,8 +30,9 @@ class PacketQueue {
using LockGuard = std::lock_guard<Mutex>;
using PacketPointer = std::unique_ptr<Packet>;
using PacketListPointer = std::unique_ptr<PacketList>;
using Channel = std::unique_ptr<std::list<PacketPointer>>;
using Channels = std::vector<Channel>;
using RawChannel = std::list<PacketPointer>;
using Channel = std::unique_ptr<RawChannel>;
using Channels = std::list<Channel>;
public:
PacketQueue(MessageNumber messageNumber = 0);
@ -47,16 +48,17 @@ public:
private:
MessageNumber getNextMessageNumber();
unsigned int nextIndex();
MessageNumber _currentMessageNumber { 0 };
mutable Mutex _packetsLock; // Protects the packets to be sent.
Channels _channels; // One channel per packet list + Main channel
unsigned int _currentIndex { 0 };
Channels::iterator _currentChannel;
unsigned int _channelsVisitedCount { 0 };
};
}
#endif // hifi_PacketQueue_h
#endif // hifi_PacketQueue_h