add a wait condition for empty queue and loss list

This commit is contained in:
Stephen Birarda 2015-08-24 16:50:54 -07:00
parent 9275b954f4
commit e84595af49
2 changed files with 76 additions and 12 deletions

View file

@ -56,8 +56,11 @@ SendQueue::SendQueue(Socket* socket, HifiSockAddr dest) :
void SendQueue::queuePacket(std::unique_ptr<Packet> packet) { void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
{ {
QWriteLocker locker(&_packetsLock); std::lock_guard<std::mutex> locker(_packetsLock);
_packets.push_back(std::move(packet)); _packets.push_back(std::move(packet));
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets
_emptyCondition.notify_one();
} }
if (!this->thread()->isRunning()) { if (!this->thread()->isRunning()) {
this->thread()->start(); this->thread()->start();
@ -95,9 +98,12 @@ void SendQueue::queuePacketList(std::unique_ptr<PacketList> packetList) {
} }
} }
QWriteLocker locker(&_packetsLock); std::lock_guard<std::mutex> locker(_packetsLock);
_packets.splice(_packets.end(), packetList->_packets); _packets.splice(_packets.end(), packetList->_packets);
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets
_emptyCondition.notify_one();
} }
if (!this->thread()->isRunning()) { if (!this->thread()->isRunning()) {
@ -126,7 +132,7 @@ void SendQueue::ack(SequenceNumber ack) {
} }
{ // remove any sequence numbers equal to or lower than this ACK in the loss list { // remove any sequence numbers equal to or lower than this ACK in the loss list
QWriteLocker nakLocker(&_naksLock); std::lock_guard<std::mutex> nakLocker(_naksLock);
if (_naks.getLength() > 0 && _naks.getFirstSequenceNumber() <= ack) { if (_naks.getLength() > 0 && _naks.getFirstSequenceNumber() <= ack) {
_naks.remove(_naks.getFirstSequenceNumber(), ack); _naks.remove(_naks.getFirstSequenceNumber(), ack);
@ -137,12 +143,15 @@ void SendQueue::ack(SequenceNumber ack) {
} }
void SendQueue::nak(SequenceNumber start, SequenceNumber end) { void SendQueue::nak(SequenceNumber start, SequenceNumber end) {
QWriteLocker locker(&_naksLock); std::lock_guard<std::mutex> nakLocker(_naksLock);
_naks.insert(start, end); _naks.insert(start, end);
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for losses to re-send
_emptyCondition.notify_one();
} }
void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) { void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) {
QWriteLocker locker(&_naksLock); std::lock_guard<std::mutex> nakLocker(_naksLock);
_naks.clear(); _naks.clear();
SequenceNumber first, second; SequenceNumber first, second;
@ -156,6 +165,9 @@ void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) {
_naks.append(first, second); _naks.append(first, second);
} }
} }
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for losses to re-send
_emptyCondition.notify_one();
} }
SequenceNumber SendQueue::getNextSequenceNumber() { SequenceNumber SendQueue::getNextSequenceNumber() {
@ -195,16 +207,20 @@ void SendQueue::run() {
// Record timing // Record timing
_lastSendTimestamp = high_resolution_clock::now(); _lastSendTimestamp = high_resolution_clock::now();
bool naksEmpty = true; // used at the end of processing to see if we should wait for NAKs
bool resentPacket = false; bool resentPacket = false;
// the following while makes sure that we find a packet to re-send, if there is one // the following while makes sure that we find a packet to re-send, if there is one
while (!resentPacket) { while (!resentPacket) {
QWriteLocker naksLocker(&_naksLock); std::unique_lock<std::mutex> nakLocker(_naksLock);
if (_naks.getLength() > 0) { if (_naks.getLength() > 0) {
naksEmpty = _naks.getLength() > 1;
// pull the sequence number we need to re-send // pull the sequence number we need to re-send
SequenceNumber resendNumber = _naks.popFirstSequenceNumber(); SequenceNumber resendNumber = _naks.popFirstSequenceNumber();
naksLocker.unlock(); nakLocker.unlock();
// pull the packet to re-send from the sent packets list // pull the packet to re-send from the sent packets list
QReadLocker sentLocker(&_sentLock); QReadLocker sentLocker(&_sentLock);
@ -233,21 +249,27 @@ void SendQueue::run() {
// we'll fire the loop again to see if there is another to re-send // we'll fire the loop again to see if there is another to re-send
continue; continue;
} }
} else {
naksEmpty = true;
} }
// break from the while, we didn't resend a packet // break from the while, we didn't resend a packet
break; break;
} }
bool packetsEmpty = false; // used after processing to check if we should wait for packets
bool sentPacket = false;
// if we didn't find a packet to re-send AND we think we can fit a new packet on the wire // if we didn't find a packet to re-send AND we think we can fit a new packet on the wire
// (this is according to the current flow window size) then we send out a new packet // (this is according to the current flow window size) then we send out a new packet
if (!resentPacket if (!resentPacket
&& seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) { && seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) {
// we didn't re-send a packet, so time to send a new one // we didn't re-send a packet, so time to send a new one
QWriteLocker locker(&_packetsLock); std::unique_lock<std::mutex> locker(_packetsLock);
if (_packets.size() > 0) { if (_packets.size() > 0) {
SequenceNumber nextNumber = getNextSequenceNumber(); SequenceNumber nextNumber = getNextSequenceNumber();
// grab the first packet we will send // grab the first packet we will send
@ -266,9 +288,13 @@ void SendQueue::run() {
} }
} }
packetsEmpty = _packets.size() == 0;
// unlock the packets, we're done pulling // unlock the packets, we're done pulling
locker.unlock(); locker.unlock();
sentPacket = true;
// definitely send the first packet // definitely send the first packet
sendNewPacketAndAddToSentList(move(firstPacket), nextNumber); sendNewPacketAndAddToSentList(move(firstPacket), nextNumber);
@ -279,6 +305,7 @@ void SendQueue::run() {
} }
} else { } else {
packetsEmpty = true;
locker.unlock(); locker.unlock();
} }
} }
@ -291,6 +318,24 @@ void SendQueue::run() {
break; break;
} }
if (packetsEmpty && naksEmpty) {
// During our processing above the loss list and packet list were both empty.
// If that is still the case we should use a condition_variable_any to sleep until we have data to handle.
// To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock
DoubleLock doubleLock(_packetsLock, _naksLock);
// The packets queue and loss list mutexes are now both locked - check if they're still both empty
if (_packets.empty() && _naks.getLength() == 0) {
// both are empty - let's use a condition_variable_any to wait
_emptyCondition.wait(doubleLock);
// we have the double lock again - it'll be unlock once it goes out of scope
// skip to the next iteration
continue;
}
}
// sleep as long as we need until next packet send, if we can // sleep as long as we need until next packet send, if we can
auto now = high_resolution_clock::now(); auto now = high_resolution_clock::now();
auto microsecondDuration = duration_cast<microseconds>((_lastSendTimestamp + microseconds(_packetSendPeriod)) - now); auto microsecondDuration = duration_cast<microseconds>((_lastSendTimestamp + microseconds(_packetSendPeriod)) - now);

View file

@ -13,7 +13,9 @@
#define hifi_SendQueue_h #define hifi_SendQueue_h
#include <chrono> #include <chrono>
#include <condition_variable>
#include <list> #include <list>
#include <mutex>
#include <unordered_map> #include <unordered_map>
#include <QtCore/QObject> #include <QtCore/QObject>
@ -40,11 +42,26 @@ class SendQueue : public QObject {
Q_OBJECT Q_OBJECT
public: public:
class DoubleLock {
public:
DoubleLock(std::mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { lock(); }
~DoubleLock() { unlock(); }
DoubleLock(const DoubleLock&) = delete;
DoubleLock& operator=(const DoubleLock&) = delete;
void lock() { std::lock(_mutex1, _mutex2); }
void unlock() { _mutex1.unlock(); _mutex2.unlock(); }
private:
std::mutex& _mutex1;
std::mutex& _mutex2;
};
static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr destination); static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr destination);
void queuePacket(std::unique_ptr<Packet> packet); void queuePacket(std::unique_ptr<Packet> packet);
void queuePacketList(std::unique_ptr<PacketList> packetList); void queuePacketList(std::unique_ptr<PacketList> packetList);
int getQueueSize() const { QReadLocker locker(&_packetsLock); return _packets.size(); }
SequenceNumber getCurrentSequenceNumber() const { return SequenceNumber(_atomicCurrentSequenceNumber); } SequenceNumber getCurrentSequenceNumber() const { return SequenceNumber(_atomicCurrentSequenceNumber); }
@ -79,13 +96,13 @@ private:
SequenceNumber getNextSequenceNumber(); SequenceNumber getNextSequenceNumber();
MessageNumber getNextMessageNumber(); MessageNumber getNextMessageNumber();
mutable QReadWriteLock _packetsLock; // Protects the packets to be sent list. mutable std::mutex _packetsLock; // Protects the packets to be sent list.
std::list<std::unique_ptr<Packet>> _packets; // List of packets to be sent std::list<std::unique_ptr<Packet>> _packets; // List of packets to be sent
Socket* _socket { nullptr }; // Socket to send packet on Socket* _socket { nullptr }; // Socket to send packet on
HifiSockAddr _destination; // Destination addr HifiSockAddr _destination; // Destination addr
std::atomic<uint32_t> _lastACKSequenceNumber; // Last ACKed sequence number std::atomic<uint32_t> _lastACKSequenceNumber { 0 }; // Last ACKed sequence number
MessageNumber _currentMessageNumber { 0 }; MessageNumber _currentMessageNumber { 0 };
SequenceNumber _currentSequenceNumber; // Last sequence number sent out SequenceNumber _currentSequenceNumber; // Last sequence number sent out
@ -97,11 +114,13 @@ private:
std::atomic<int> _flowWindowSize; // Flow control window size (number of packets that can be on wire) - set from CC std::atomic<int> _flowWindowSize; // Flow control window size (number of packets that can be on wire) - set from CC
mutable QReadWriteLock _naksLock; // Protects the naks list. mutable std::mutex _naksLock; // Protects the naks list.
LossList _naks; // Sequence numbers of packets to resend LossList _naks; // Sequence numbers of packets to resend
mutable QReadWriteLock _sentLock; // Protects the sent packet list mutable QReadWriteLock _sentLock; // Protects the sent packet list
std::unordered_map<SequenceNumber, std::unique_ptr<Packet>> _sentPackets; // Packets waiting for ACK. std::unordered_map<SequenceNumber, std::unique_ptr<Packet>> _sentPackets; // Packets waiting for ACK.
std::condition_variable_any _emptyCondition;
}; };
} }