mirror of
https://github.com/overte-org/overte.git
synced 2025-04-25 19:15:56 +02:00
Introduce PacketQueue
This commit is contained in:
parent
709dab6beb
commit
732ad41080
5 changed files with 266 additions and 209 deletions
|
@ -70,6 +70,7 @@ protected:
|
||||||
|
|
||||||
private:
|
private:
|
||||||
friend class ::LimitedNodeList;
|
friend class ::LimitedNodeList;
|
||||||
|
friend class PacketQueue;
|
||||||
friend class SendQueue;
|
friend class SendQueue;
|
||||||
friend class Socket;
|
friend class Socket;
|
||||||
|
|
||||||
|
|
71
libraries/networking/src/udt/PacketQueue.cpp
Normal file
71
libraries/networking/src/udt/PacketQueue.cpp
Normal file
|
@ -0,0 +1,71 @@
|
||||||
|
//
|
||||||
|
// PacketQueue.cpp
|
||||||
|
// libraries/networking/src/udt
|
||||||
|
//
|
||||||
|
// Created by Clement on 9/16/15.
|
||||||
|
// Copyright 2015 High Fidelity, Inc.
|
||||||
|
//
|
||||||
|
// Distributed under the Apache License, Version 2.0.
|
||||||
|
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
|
||||||
|
//
|
||||||
|
|
||||||
|
#include "PacketQueue.h"
|
||||||
|
|
||||||
|
#include "Packet.h"
|
||||||
|
#include "PacketList.h"
|
||||||
|
|
||||||
|
using namespace udt;
|
||||||
|
|
||||||
|
|
||||||
|
MessageNumber PacketQueue::getNextMessageNumber() {
|
||||||
|
static const MessageNumber MAX_MESSAGE_NUMBER = MessageNumber(1) << MESSAGE_NUMBER_BITS;
|
||||||
|
_currentMessageNumber = (_currentMessageNumber + 1) % MAX_MESSAGE_NUMBER;
|
||||||
|
return _currentMessageNumber;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool PacketQueue::isEmpty() const {
|
||||||
|
LockGuard locker(_packetsLock);
|
||||||
|
return _packets.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
PacketQueue::PacketPointer PacketQueue::takeFront() {
|
||||||
|
LockGuard locker(_packetsLock);
|
||||||
|
if (!_packets.empty()) {
|
||||||
|
auto packet = std::move(_packets.front());
|
||||||
|
_packets.pop_front();
|
||||||
|
return std::move(packet);
|
||||||
|
}
|
||||||
|
|
||||||
|
return PacketPointer();
|
||||||
|
}
|
||||||
|
|
||||||
|
void PacketQueue::queuePacket(PacketPointer packet) {
|
||||||
|
LockGuard locker(_packetsLock);
|
||||||
|
_packets.push_back(std::move(packet));
|
||||||
|
}
|
||||||
|
|
||||||
|
void PacketQueue::queuePacketList(PacketListPointer packetList) {
|
||||||
|
Q_ASSERT(packetList->_packets.size() > 0);
|
||||||
|
|
||||||
|
auto messageNumber = getNextMessageNumber();
|
||||||
|
auto markPacket = [&messageNumber](const PacketPointer& packet, Packet::PacketPosition position) {
|
||||||
|
packet->setPacketPosition(position);
|
||||||
|
packet->writeMessageNumber(messageNumber);
|
||||||
|
};
|
||||||
|
|
||||||
|
if (packetList->_packets.size() == 1) {
|
||||||
|
markPacket(packetList->_packets.front(), Packet::PacketPosition::ONLY);
|
||||||
|
} else {
|
||||||
|
const auto second = ++packetList->_packets.begin();
|
||||||
|
const auto last = --packetList->_packets.end();
|
||||||
|
std::for_each(second, last, [&](const PacketPointer& packet) {
|
||||||
|
markPacket(packet, Packet::PacketPosition::MIDDLE);
|
||||||
|
});
|
||||||
|
|
||||||
|
markPacket(packetList->_packets.front(), Packet::PacketPosition::FIRST);
|
||||||
|
markPacket(packetList->_packets.back(), Packet::PacketPosition::LAST);
|
||||||
|
}
|
||||||
|
|
||||||
|
LockGuard locker(_packetsLock);
|
||||||
|
_packets.splice(_packets.end(), packetList->_packets);
|
||||||
|
}
|
53
libraries/networking/src/udt/PacketQueue.h
Normal file
53
libraries/networking/src/udt/PacketQueue.h
Normal file
|
@ -0,0 +1,53 @@
|
||||||
|
//
|
||||||
|
// PacketQueue.h
|
||||||
|
// libraries/networking/src/udt
|
||||||
|
//
|
||||||
|
// Created by Clement on 9/16/15.
|
||||||
|
// Copyright 2015 High Fidelity, Inc.
|
||||||
|
//
|
||||||
|
// Distributed under the Apache License, Version 2.0.
|
||||||
|
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
|
||||||
|
//
|
||||||
|
|
||||||
|
#ifndef hifi_PacketQueue_h
|
||||||
|
#define hifi_PacketQueue_h
|
||||||
|
|
||||||
|
#include <list>
|
||||||
|
#include <memory>
|
||||||
|
#include <mutex>
|
||||||
|
|
||||||
|
namespace udt {
|
||||||
|
|
||||||
|
class Packet;
|
||||||
|
class PacketList;
|
||||||
|
|
||||||
|
using MessageNumber = uint32_t;
|
||||||
|
|
||||||
|
class PacketQueue {
|
||||||
|
using Mutex = std::recursive_mutex;
|
||||||
|
using LockGuard = std::lock_guard<Mutex>;
|
||||||
|
using PacketPointer = std::unique_ptr<Packet>;
|
||||||
|
using PacketListPointer = std::unique_ptr<PacketList>;
|
||||||
|
using PacketList = std::list<PacketPointer>;
|
||||||
|
|
||||||
|
public:
|
||||||
|
void queuePacket(PacketPointer packet);
|
||||||
|
void queuePacketList(PacketListPointer packetList);
|
||||||
|
|
||||||
|
bool isEmpty() const;
|
||||||
|
PacketPointer takeFront();
|
||||||
|
|
||||||
|
MessageNumber getNextMessageNumber();
|
||||||
|
Mutex& getLock() { return _packetsLock; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
MessageNumber _currentMessageNumber { 0 };
|
||||||
|
|
||||||
|
mutable Mutex _packetsLock; // Protects the packets to be sent list.
|
||||||
|
PacketList _packets; // List of packets to be sent
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#endif // hifi_PacketQueue_h
|
|
@ -30,7 +30,7 @@ using namespace udt;
|
||||||
|
|
||||||
class DoubleLock {
|
class DoubleLock {
|
||||||
public:
|
public:
|
||||||
DoubleLock(std::mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { }
|
DoubleLock(std::recursive_mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { }
|
||||||
|
|
||||||
DoubleLock(const DoubleLock&) = delete;
|
DoubleLock(const DoubleLock&) = delete;
|
||||||
DoubleLock& operator=(const DoubleLock&) = delete;
|
DoubleLock& operator=(const DoubleLock&) = delete;
|
||||||
|
@ -45,7 +45,7 @@ public:
|
||||||
void unlock() { _mutex1.unlock(); _mutex2.unlock(); }
|
void unlock() { _mutex1.unlock(); _mutex2.unlock(); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::mutex& _mutex1;
|
std::recursive_mutex& _mutex1;
|
||||||
std::mutex& _mutex2;
|
std::mutex& _mutex2;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -76,17 +76,10 @@ SendQueue::SendQueue(Socket* socket, HifiSockAddr dest) :
|
||||||
}
|
}
|
||||||
|
|
||||||
void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
|
void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
|
||||||
{
|
_packets.queuePacket(std::move(packet));
|
||||||
std::unique_lock<std::mutex> locker(_packetsLock);
|
|
||||||
|
|
||||||
_packets.push_back(std::move(packet));
|
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets
|
||||||
|
_emptyCondition.notify_one();
|
||||||
// unlock the mutex before we notify
|
|
||||||
locker.unlock();
|
|
||||||
|
|
||||||
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets
|
|
||||||
_emptyCondition.notify_one();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!this->thread()->isRunning() && _state == State::NotStarted) {
|
if (!this->thread()->isRunning() && _state == State::NotStarted) {
|
||||||
this->thread()->start();
|
this->thread()->start();
|
||||||
|
@ -94,46 +87,10 @@ void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void SendQueue::queuePacketList(std::unique_ptr<PacketList> packetList) {
|
void SendQueue::queuePacketList(std::unique_ptr<PacketList> packetList) {
|
||||||
Q_ASSERT(packetList->_packets.size() > 0);
|
_packets.queuePacketList(std::move(packetList));
|
||||||
|
|
||||||
{
|
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets
|
||||||
auto messageNumber = getNextMessageNumber();
|
_emptyCondition.notify_one();
|
||||||
|
|
||||||
if (packetList->_packets.size() == 1) {
|
|
||||||
auto& packet = packetList->_packets.front();
|
|
||||||
|
|
||||||
packet->setPacketPosition(Packet::PacketPosition::ONLY);
|
|
||||||
packet->writeMessageNumber(messageNumber);
|
|
||||||
} else {
|
|
||||||
bool haveMarkedFirstPacket = false;
|
|
||||||
auto end = packetList->_packets.end();
|
|
||||||
auto lastElement = --packetList->_packets.end();
|
|
||||||
for (auto it = packetList->_packets.begin(); it != end; ++it) {
|
|
||||||
auto& packet = *it;
|
|
||||||
|
|
||||||
if (!haveMarkedFirstPacket) {
|
|
||||||
packet->setPacketPosition(Packet::PacketPosition::FIRST);
|
|
||||||
haveMarkedFirstPacket = true;
|
|
||||||
} else if (it == lastElement) {
|
|
||||||
packet->setPacketPosition(Packet::PacketPosition::LAST);
|
|
||||||
} else {
|
|
||||||
packet->setPacketPosition(Packet::PacketPosition::MIDDLE);
|
|
||||||
}
|
|
||||||
|
|
||||||
packet->writeMessageNumber(messageNumber);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
std::unique_lock<std::mutex> locker(_packetsLock);
|
|
||||||
|
|
||||||
_packets.splice(_packets.end(), packetList->_packets);
|
|
||||||
|
|
||||||
// unlock the mutex so we can notify
|
|
||||||
locker.unlock();
|
|
||||||
|
|
||||||
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets
|
|
||||||
_emptyCondition.notify_one();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!this->thread()->isRunning() && _state == State::NotStarted) {
|
if (!this->thread()->isRunning() && _state == State::NotStarted) {
|
||||||
this->thread()->start();
|
this->thread()->start();
|
||||||
|
@ -144,10 +101,8 @@ void SendQueue::stop() {
|
||||||
|
|
||||||
_state = State::Stopped;
|
_state = State::Stopped;
|
||||||
|
|
||||||
// in case we're waiting to send another handshake, release the condition_variable now so we cleanup sooner
|
// Notify all conditions in case we're waiting somewhere
|
||||||
_handshakeACKCondition.notify_one();
|
_handshakeACKCondition.notify_one();
|
||||||
|
|
||||||
// in case the empty condition is waiting for packets/loss release it now so that the queue is cleaned up
|
|
||||||
_emptyCondition.notify_one();
|
_emptyCondition.notify_one();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -254,12 +209,6 @@ SequenceNumber SendQueue::getNextSequenceNumber() {
|
||||||
return _currentSequenceNumber;
|
return _currentSequenceNumber;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t SendQueue::getNextMessageNumber() {
|
|
||||||
static const MessageNumber MAX_MESSAGE_NUMBER = MessageNumber(1) << MESSAGE_NUMBER_BITS;
|
|
||||||
_currentMessageNumber = (_currentMessageNumber + 1) % MAX_MESSAGE_NUMBER;
|
|
||||||
return _currentMessageNumber;
|
|
||||||
}
|
|
||||||
|
|
||||||
void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket, SequenceNumber sequenceNumber) {
|
void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket, SequenceNumber sequenceNumber) {
|
||||||
// write the sequence number and send the packet
|
// write the sequence number and send the packet
|
||||||
newPacket->writeSequenceNumber(sequenceNumber);
|
newPacket->writeSequenceNumber(sequenceNumber);
|
||||||
|
@ -313,167 +262,67 @@ void SendQueue::run() {
|
||||||
// if we didn't find a packet to re-send AND we think we can fit a new packet on the wire
|
// if we didn't find a packet to re-send AND we think we can fit a new packet on the wire
|
||||||
// (this is according to the current flow window size) then we send out a new packet
|
// (this is according to the current flow window size) then we send out a new packet
|
||||||
if (!sentAPacket) {
|
if (!sentAPacket) {
|
||||||
if (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) {
|
sentAPacket = maybeSendNewPacket();
|
||||||
sentAPacket = maybeSendNewPacket();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// since we're a while loop, give the thread a chance to process events
|
// since we're a while loop, give the thread a chance to process events
|
||||||
QCoreApplication::sendPostedEvents(this, 0);
|
QCoreApplication::sendPostedEvents(this);
|
||||||
|
|
||||||
// we just processed events so check now if we were just told to stop
|
// we just processed events so check now if we were just told to stop
|
||||||
if (_state != State::Running) {
|
// If the send queue has been innactive, skip the sleep for
|
||||||
return;
|
// Either _isRunning will have been set to false and we'll break
|
||||||
|
// Or something happened and we'll keep going
|
||||||
|
if (_state != State::Running || isInactive(sentAPacket)) {
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!sentAPacket) {
|
|
||||||
// check if it is time to break this connection
|
|
||||||
|
|
||||||
// that will be the case if we have had 16 timeouts since hearing back from the client, and it has been
|
|
||||||
// at least 5 seconds
|
|
||||||
|
|
||||||
static const int NUM_TIMEOUTS_BEFORE_INACTIVE = 16;
|
|
||||||
static const int MIN_SECONDS_BEFORE_INACTIVE_MS = 5 * 1000;
|
|
||||||
|
|
||||||
auto sinceEpochNow = QDateTime::currentMSecsSinceEpoch();
|
|
||||||
|
|
||||||
if (_timeoutExpiryCount >= NUM_TIMEOUTS_BEFORE_INACTIVE
|
|
||||||
&& (sinceEpochNow - _lastReceiverResponse) > MIN_SECONDS_BEFORE_INACTIVE_MS) {
|
|
||||||
// If the flow window has been full for over CONSIDER_INACTIVE_AFTER,
|
|
||||||
// then signal the queue is inactive and return so it can be cleaned up
|
|
||||||
|
|
||||||
#ifdef UDT_CONNECTION_DEBUG
|
|
||||||
qCDebug(networking) << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts"
|
|
||||||
<< "and 10s before receiving any ACK/NAK and is now inactive. Stopping.";
|
|
||||||
#endif
|
|
||||||
|
|
||||||
deactivate();
|
|
||||||
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
// During our processing above we didn't send any packets
|
|
||||||
|
|
||||||
// If that is still the case we should use a condition_variable_any to sleep until we have data to handle.
|
|
||||||
// To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock
|
|
||||||
DoubleLock doubleLock(_packetsLock, _naksLock);
|
|
||||||
|
|
||||||
if (doubleLock.try_lock()) {
|
|
||||||
// The packets queue and loss list mutexes are now both locked - check if they're still both empty
|
|
||||||
|
|
||||||
if (_packets.empty() && _naks.getLength() == 0) {
|
|
||||||
if (uint32_t(_lastACKSequenceNumber) == uint32_t(_currentSequenceNumber)) {
|
|
||||||
// we've sent the client as much data as we have (and they've ACKed it)
|
|
||||||
// either wait for new data to send or 5 seconds before cleaning up the queue
|
|
||||||
static const auto EMPTY_QUEUES_INACTIVE_TIMEOUT = std::chrono::seconds(5);
|
|
||||||
|
|
||||||
// use our condition_variable_any to wait
|
|
||||||
auto cvStatus = _emptyCondition.wait_for(doubleLock, EMPTY_QUEUES_INACTIVE_TIMEOUT);
|
|
||||||
|
|
||||||
// we have the double lock again - Make sure to unlock it
|
|
||||||
doubleLock.unlock();
|
|
||||||
|
|
||||||
if (cvStatus == std::cv_status::timeout) {
|
|
||||||
#ifdef UDT_CONNECTION_DEBUG
|
|
||||||
qCDebug(networking) << "SendQueue to" << _destination << "has been empty for"
|
|
||||||
<< EMPTY_QUEUES_INACTIVE_TIMEOUT.count()
|
|
||||||
<< "seconds and receiver has ACKed all packets."
|
|
||||||
<< "The queue is now inactive and will be stopped.";
|
|
||||||
#endif
|
|
||||||
|
|
||||||
deactivate();
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// We think the client is still waiting for data (based on the sequence number gap)
|
|
||||||
// Let's wait either for a response from the client or until the estimated timeout
|
|
||||||
auto waitDuration = std::chrono::microseconds(_estimatedTimeout);
|
|
||||||
|
|
||||||
// use our condition_variable_any to wait
|
|
||||||
auto cvStatus = _emptyCondition.wait_for(doubleLock, waitDuration);
|
|
||||||
|
|
||||||
if (cvStatus == std::cv_status::timeout) {
|
|
||||||
// increase the number of timeouts
|
|
||||||
++_timeoutExpiryCount;
|
|
||||||
|
|
||||||
if (SequenceNumber(_lastACKSequenceNumber) < _currentSequenceNumber) {
|
|
||||||
// after a timeout if we still have sent packets that the client hasn't ACKed we
|
|
||||||
// add them to the loss list
|
|
||||||
|
|
||||||
// Note that thanks to the DoubleLock we have the _naksLock right now
|
|
||||||
_naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// we have the double lock again - Make sure to unlock it
|
|
||||||
doubleLock.unlock();
|
|
||||||
|
|
||||||
// skip to the next iteration
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// we got the try_lock but failed the other conditionals so we need to unlock
|
|
||||||
doubleLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const auto loopEndTimestamp = p_high_resolution_clock::now();
|
|
||||||
|
|
||||||
// sleep as long as we need until next packet send, if we can
|
// sleep as long as we need until next packet send, if we can
|
||||||
|
const auto loopEndTimestamp = p_high_resolution_clock::now();
|
||||||
const auto timeToSleep = (loopStartTimestamp + std::chrono::microseconds(_packetSendPeriod)) - loopEndTimestamp;
|
const auto timeToSleep = (loopStartTimestamp + std::chrono::microseconds(_packetSendPeriod)) - loopEndTimestamp;
|
||||||
std::this_thread::sleep_for(timeToSleep);
|
std::this_thread::sleep_for(timeToSleep);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool SendQueue::maybeSendNewPacket() {
|
bool SendQueue::maybeSendNewPacket() {
|
||||||
// we didn't re-send a packet, so time to send a new one
|
if (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) {
|
||||||
std::unique_lock<std::mutex> locker(_packetsLock);
|
// we didn't re-send a packet, so time to send a new one
|
||||||
|
|
||||||
if (_packets.size() > 0) {
|
|
||||||
SequenceNumber nextNumber = getNextSequenceNumber();
|
|
||||||
|
|
||||||
// grab the first packet we will send
|
if (!_packets.isEmpty()) {
|
||||||
std::unique_ptr<Packet> firstPacket;
|
SequenceNumber nextNumber = getNextSequenceNumber();
|
||||||
firstPacket.swap(_packets.front());
|
|
||||||
_packets.pop_front();
|
|
||||||
|
|
||||||
std::unique_ptr<Packet> secondPacket;
|
// grab the first packet we will send
|
||||||
bool shouldSendPairTail = false;
|
std::unique_ptr<Packet> firstPacket = _packets.takeFront();
|
||||||
|
|
||||||
if (((uint32_t) nextNumber & 0xF) == 0) {
|
std::unique_ptr<Packet> secondPacket;
|
||||||
// the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets
|
bool shouldSendPairTail = false;
|
||||||
// pull off a second packet if we can before we unlock
|
|
||||||
shouldSendPairTail = true;
|
|
||||||
|
|
||||||
if (_packets.size() > 0) {
|
if (((uint32_t) nextNumber & 0xF) == 0) {
|
||||||
secondPacket.swap(_packets.front());
|
// the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets
|
||||||
_packets.pop_front();
|
// pull off a second packet if we can before we unlock
|
||||||
|
shouldSendPairTail = true;
|
||||||
|
|
||||||
|
secondPacket = _packets.takeFront();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// definitely send the first packet
|
||||||
|
sendNewPacketAndAddToSentList(move(firstPacket), nextNumber);
|
||||||
|
|
||||||
|
// do we have a second in a pair to send as well?
|
||||||
|
if (secondPacket) {
|
||||||
|
sendNewPacketAndAddToSentList(move(secondPacket), getNextSequenceNumber());
|
||||||
|
} else if (shouldSendPairTail) {
|
||||||
|
// we didn't get a second packet to send in the probe pair
|
||||||
|
// send a control packet of type ProbePairTail so the receiver can still do
|
||||||
|
// proper bandwidth estimation
|
||||||
|
static auto pairTailPacket = ControlPacket::create(ControlPacket::ProbeTail);
|
||||||
|
_socket->writeBasePacket(*pairTailPacket, _destination);
|
||||||
|
}
|
||||||
|
|
||||||
|
// We sent our packet(s), return here
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// unlock the packets, we're done pulling
|
|
||||||
locker.unlock();
|
|
||||||
|
|
||||||
// definitely send the first packet
|
|
||||||
sendNewPacketAndAddToSentList(move(firstPacket), nextNumber);
|
|
||||||
|
|
||||||
// do we have a second in a pair to send as well?
|
|
||||||
if (secondPacket) {
|
|
||||||
sendNewPacketAndAddToSentList(move(secondPacket), getNextSequenceNumber());
|
|
||||||
} else if (shouldSendPairTail) {
|
|
||||||
// we didn't get a second packet to send in the probe pair
|
|
||||||
// send a control packet of type ProbePairTail so the receiver can still do
|
|
||||||
// proper bandwidth estimation
|
|
||||||
static auto pairTailPacket = ControlPacket::create(ControlPacket::ProbeTail);
|
|
||||||
_socket->writeBasePacket(*pairTailPacket, _destination);
|
|
||||||
}
|
|
||||||
|
|
||||||
// We sent our packet(s), return here
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// No packets were sent
|
// No packets were sent
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -524,6 +373,92 @@ bool SendQueue::maybeResendPacket() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool SendQueue::isInactive(bool sentAPacket) {
|
||||||
|
if (!sentAPacket) {
|
||||||
|
// check if it is time to break this connection
|
||||||
|
|
||||||
|
// that will be the case if we have had 16 timeouts since hearing back from the client, and it has been
|
||||||
|
// at least 5 seconds
|
||||||
|
static const int NUM_TIMEOUTS_BEFORE_INACTIVE = 16;
|
||||||
|
static const int MIN_SECONDS_BEFORE_INACTIVE_MS = 5 * 1000;
|
||||||
|
if (_timeoutExpiryCount >= NUM_TIMEOUTS_BEFORE_INACTIVE &&
|
||||||
|
(QDateTime::currentMSecsSinceEpoch() - _lastReceiverResponse) > MIN_SECONDS_BEFORE_INACTIVE_MS) {
|
||||||
|
// If the flow window has been full for over CONSIDER_INACTIVE_AFTER,
|
||||||
|
// then signal the queue is inactive and return so it can be cleaned up
|
||||||
|
|
||||||
|
#ifdef UDT_CONNECTION_DEBUG
|
||||||
|
qCDebug(networking) << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts"
|
||||||
|
<< "and 5s before receiving any ACK/NAK and is now inactive. Stopping.";
|
||||||
|
#endif
|
||||||
|
|
||||||
|
deactivate();
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// During our processing above we didn't send any packets
|
||||||
|
|
||||||
|
// If that is still the case we should use a condition_variable_any to sleep until we have data to handle.
|
||||||
|
// To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock
|
||||||
|
DoubleLock doubleLock(_packets.getLock(), _naksLock);
|
||||||
|
std::unique_lock<DoubleLock> locker(doubleLock, std::try_to_lock);
|
||||||
|
|
||||||
|
if (locker.owns_lock() && _packets.isEmpty() && _naks.isEmpty()) {
|
||||||
|
// The packets queue and loss list mutexes are now both locked and they're both empty
|
||||||
|
|
||||||
|
if (uint32_t(_lastACKSequenceNumber) == uint32_t(_currentSequenceNumber)) {
|
||||||
|
// we've sent the client as much data as we have (and they've ACKed it)
|
||||||
|
// either wait for new data to send or 5 seconds before cleaning up the queue
|
||||||
|
static const auto EMPTY_QUEUES_INACTIVE_TIMEOUT = std::chrono::seconds(5);
|
||||||
|
|
||||||
|
// use our condition_variable_any to wait
|
||||||
|
auto cvStatus = _emptyCondition.wait_for(locker, EMPTY_QUEUES_INACTIVE_TIMEOUT);
|
||||||
|
|
||||||
|
// we have the lock again - Make sure to unlock it
|
||||||
|
locker.unlock();
|
||||||
|
|
||||||
|
if (cvStatus == std::cv_status::timeout) {
|
||||||
|
#ifdef UDT_CONNECTION_DEBUG
|
||||||
|
qCDebug(networking) << "SendQueue to" << _destination << "has been empty for"
|
||||||
|
<< EMPTY_QUEUES_INACTIVE_TIMEOUT.count()
|
||||||
|
<< "seconds and receiver has ACKed all packets."
|
||||||
|
<< "The queue is now inactive and will be stopped.";
|
||||||
|
#endif
|
||||||
|
|
||||||
|
// Deactivate queue
|
||||||
|
deactivate();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// We think the client is still waiting for data (based on the sequence number gap)
|
||||||
|
// Let's wait either for a response from the client or until the estimated timeout
|
||||||
|
auto waitDuration = std::chrono::microseconds(_estimatedTimeout);
|
||||||
|
|
||||||
|
// use our condition_variable_any to wait
|
||||||
|
auto cvStatus = _emptyCondition.wait_for(locker, waitDuration);
|
||||||
|
|
||||||
|
if (cvStatus == std::cv_status::timeout) {
|
||||||
|
// increase the number of timeouts
|
||||||
|
++_timeoutExpiryCount;
|
||||||
|
|
||||||
|
if (SequenceNumber(_lastACKSequenceNumber) < _currentSequenceNumber) {
|
||||||
|
// after a timeout if we still have sent packets that the client hasn't ACKed we
|
||||||
|
// add them to the loss list
|
||||||
|
|
||||||
|
// Note that thanks to the DoubleLock we have the _naksLock right now
|
||||||
|
_naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// skip to the next iteration
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
void SendQueue::deactivate() {
|
void SendQueue::deactivate() {
|
||||||
// this queue is inactive - emit that signal and stop the while
|
// this queue is inactive - emit that signal and stop the while
|
||||||
emit queueInactive();
|
emit queueInactive();
|
||||||
|
|
|
@ -28,6 +28,7 @@
|
||||||
#include "../HifiSockAddr.h"
|
#include "../HifiSockAddr.h"
|
||||||
|
|
||||||
#include "Constants.h"
|
#include "Constants.h"
|
||||||
|
#include "PacketQueue.h"
|
||||||
#include "SequenceNumber.h"
|
#include "SequenceNumber.h"
|
||||||
#include "LossList.h"
|
#include "LossList.h"
|
||||||
|
|
||||||
|
@ -39,8 +40,6 @@ class Packet;
|
||||||
class PacketList;
|
class PacketList;
|
||||||
class Socket;
|
class Socket;
|
||||||
|
|
||||||
using MessageNumber = uint32_t;
|
|
||||||
|
|
||||||
class SendQueue : public QObject {
|
class SendQueue : public QObject {
|
||||||
Q_OBJECT
|
Q_OBJECT
|
||||||
|
|
||||||
|
@ -95,21 +94,19 @@ private:
|
||||||
bool maybeSendNewPacket(); // Figures out what packet to send next
|
bool maybeSendNewPacket(); // Figures out what packet to send next
|
||||||
bool maybeResendPacket(); // Determines whether to resend a packet and which one
|
bool maybeResendPacket(); // Determines whether to resend a packet and which one
|
||||||
|
|
||||||
|
bool isInactive(bool sentAPacket);
|
||||||
void deactivate(); // makes the queue inactive and cleans it up
|
void deactivate(); // makes the queue inactive and cleans it up
|
||||||
|
|
||||||
// Increments current sequence number and return it
|
// Increments current sequence number and return it
|
||||||
SequenceNumber getNextSequenceNumber();
|
SequenceNumber getNextSequenceNumber();
|
||||||
MessageNumber getNextMessageNumber();
|
|
||||||
|
|
||||||
mutable std::mutex _packetsLock; // Protects the packets to be sent list.
|
PacketQueue _packets;
|
||||||
std::list<std::unique_ptr<Packet>> _packets; // List of packets to be sent
|
|
||||||
|
|
||||||
Socket* _socket { nullptr }; // Socket to send packet on
|
Socket* _socket { nullptr }; // Socket to send packet on
|
||||||
HifiSockAddr _destination; // Destination addr
|
HifiSockAddr _destination; // Destination addr
|
||||||
|
|
||||||
std::atomic<uint32_t> _lastACKSequenceNumber { 0 }; // Last ACKed sequence number
|
std::atomic<uint32_t> _lastACKSequenceNumber { 0 }; // Last ACKed sequence number
|
||||||
|
|
||||||
MessageNumber _currentMessageNumber { 0 };
|
|
||||||
SequenceNumber _currentSequenceNumber; // Last sequence number sent out
|
SequenceNumber _currentSequenceNumber; // Last sequence number sent out
|
||||||
std::atomic<uint32_t> _atomicCurrentSequenceNumber { 0 }; // Atomic for last sequence number sent out
|
std::atomic<uint32_t> _atomicCurrentSequenceNumber { 0 }; // Atomic for last sequence number sent out
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue