Merge branch 'atp' of https://github.com/birarda/hifi into protocol

This commit is contained in:
Atlante45 2015-07-28 18:13:02 -07:00
commit b10396a169
10 changed files with 279 additions and 135 deletions

View file

@ -15,10 +15,9 @@
using namespace udt;
void UdtCC::init() {
_rcInterval = _synInterval;
void DefaultCC::init() {
_lastRCTime = usecTimestampNow();
setAckTimer(_rcInterval);
setAckTimer(synInterval());
_lastAck = _sendCurrSeqNum;
_lastDecSeq = SequenceNumber{ SequenceNumber::MAX };
@ -27,7 +26,7 @@ void UdtCC::init() {
_packetSendPeriod = 1.0;
}
void UdtCC::onACK(SequenceNumber ackNum) {
void DefaultCC::onACK(SequenceNumber ackNum) {
int64_t B = 0;
double inc = 0;
// Note: 1/24/2012
@ -37,7 +36,7 @@ void UdtCC::onACK(SequenceNumber ackNum) {
const double min_inc = 0.01;
uint64_t currtime = usecTimestampNow();
if (currtime - _lastRCTime < (uint64_t)_rcInterval) {
if (currtime - _lastRCTime < (uint64_t)synInterval()) {
return;
}
@ -52,11 +51,11 @@ void UdtCC::onACK(SequenceNumber ackNum) {
if (_recvieveRate > 0) {
_packetSendPeriod = 1000000.0 / _recvieveRate;
} else {
_packetSendPeriod = (_rtt + _rcInterval) / _congestionWindowSize;
_packetSendPeriod = (_rtt + synInterval()) / _congestionWindowSize;
}
}
} else {
_congestionWindowSize = _recvieveRate / 1000000.0 * (_rtt + _rcInterval) + 16;
_congestionWindowSize = _recvieveRate / 1000000.0 * (_rtt + synInterval()) + 16;
}
// During Slow Start, no rate increase
@ -86,10 +85,10 @@ void UdtCC::onACK(SequenceNumber ackNum) {
}
}
_packetSendPeriod = (_packetSendPeriod * _rcInterval) / (_packetSendPeriod * inc + _rcInterval);
_packetSendPeriod = (_packetSendPeriod * synInterval()) / (_packetSendPeriod * inc + synInterval());
}
void UdtCC::onLoss(const std::vector<SequenceNumber>& losslist) {
void DefaultCC::onLoss(const std::vector<SequenceNumber>& losslist) {
//Slow Start stopped, if it hasn't yet
if (_slowStart) {
_slowStart = false;
@ -101,7 +100,7 @@ void UdtCC::onLoss(const std::vector<SequenceNumber>& losslist) {
// If no receiving rate is observed, we have to compute the sending
// rate according to the current window size, and decrease it
// using the method below.
_packetSendPeriod = _congestionWindowSize / (_rtt + _rcInterval);
_packetSendPeriod = _congestionWindowSize / (_rtt + synInterval());
}
_loss = true;
@ -128,13 +127,13 @@ void UdtCC::onLoss(const std::vector<SequenceNumber>& losslist) {
}
}
void UdtCC::onTimeout() {
void DefaultCC::onTimeout() {
if (_slowStart) {
_slowStart = false;
if (_recvieveRate > 0) {
_packetSendPeriod = 1000000.0 / _recvieveRate;
} else {
_packetSendPeriod = _congestionWindowSize / (_rtt + _rcInterval);
_packetSendPeriod = _congestionWindowSize / (_rtt + synInterval());
}
} else {
/*

View file

@ -17,30 +17,32 @@
#include "SequenceNumber.h"
namespace udt {
static const int32_t DEFAULT_SYN_INTERVAL = 10000; // 10 ms
class Connection;
class Packet;
class CongestionControl {
friend class Connection;
public:
static const int32_t DEFAULT_SYN_INTERVAL = 10000; // 10 ms
CongestionControl() {}
CongestionControl() {};
CongestionControl(int synInterval) : _synInterval(synInterval) {}
virtual ~CongestionControl() {}
int synInterval() const { return _synInterval; }
virtual void init() {}
virtual void close() {}
virtual void onAck(SequenceNumber ackNum) {}
virtual void onLoss(const std::vector<SequenceNumber>& lossList) {}
virtual void onPacketSent(const Packet& packet) {}
virtual void onPacketReceived(const Packet& packet) {}
protected:
void setAckTimer(int syn) { _ackPeriod = (syn > _synInterval) ? _synInterval : syn; }
void setAckInterval(int interval) { _ackInterval = interval; }
void setAckTimer(int period) { _ackPeriod = (period > _synInterval) ? _synInterval : period; }
void setAckInterval(int ackInterval) { _ackInterval = ackInterval; }
void setRto(int rto) { _userDefinedRto = true; _rto = rto; }
int32_t _synInterval = DEFAULT_SYN_INTERVAL; // UDT constant parameter, SYN
double _packetSendPeriod = 1.0; // Packet sending period, in microseconds
double _congestionWindowSize = 16.0; // Congestion window size, in packets
@ -66,6 +68,8 @@ private:
int _ackPeriod = 0; // Periodical timer to send an ACK, in milliseconds
int _ackInterval = 0; // How many packets to send one ACK, in packets
int _synInterval { DEFAULT_SYN_INTERVAL };
bool _userDefinedRto = false; // if the RTO value is defined by users
int _rto = -1; // RTO value, microseconds
};
@ -75,6 +79,8 @@ class CongestionControlVirtualFactory {
public:
virtual ~CongestionControlVirtualFactory() {}
static int synInterval() { return DEFAULT_SYN_INTERVAL; }
virtual std::unique_ptr<CongestionControl> create() = 0;
};
@ -82,13 +88,12 @@ template <class T> class CongestionControlFactory: public CongestionControlVirtu
{
public:
virtual ~CongestionControlFactory() {}
virtual std::unique_ptr<CongestionControl> create() { return std::unique_ptr<T>(new T()); }
};
class UdtCC: public CongestionControl {
class DefaultCC: public CongestionControl {
public:
UdtCC() {}
DefaultCC() {}
public:
virtual void init();
@ -97,7 +102,6 @@ public:
virtual void onTimeout();
private:
int _rcInterval = 0; // UDT Rate control interval
uint64_t _lastRCTime = 0; // last rate increase time
bool _slowStart = true; // if in slow start phase
SequenceNumber _lastAck; // last ACKed seq num
@ -112,4 +116,4 @@ private:
}
#endif // hifi_CongestionControl_h
#endif // hifi_CongestionControl_h

View file

@ -11,7 +11,12 @@
#include "Connection.h"
#include <QtCore/QThread>
#include <NumericalConstants.h>
#include "../HifiSockAddr.h"
#include "CongestionControl.h"
#include "ControlPacket.h"
#include "Packet.h"
#include "Socket.h"
@ -20,10 +25,25 @@ using namespace udt;
using namespace std;
using namespace std::chrono;
Connection::Connection(Socket* parentSocket, HifiSockAddr destination) :
Connection::Connection(Socket* parentSocket, HifiSockAddr destination, unique_ptr<CongestionControl> congestionControl) :
_parentSocket(parentSocket),
_destination(destination)
_destination(destination),
_congestionControl(move(congestionControl))
{
}
Connection::~Connection() {
if (_sendQueue) {
// tell our send queue to stop and wait until its send thread is done
QThread* sendQueueThread = _sendQueue->thread();
_sendQueue->stop();
_sendQueue->deleteLater();
sendQueueThread->quit();
sendQueueThread->wait();
}
}
void Connection::sendReliablePacket(unique_ptr<Packet> packet) {
@ -37,6 +57,31 @@ void Connection::sendReliablePacket(unique_ptr<Packet> packet) {
_sendQueue->queuePacket(move(packet));
}
void Connection::sync() {
// we send out a periodic ACK every rate control interval
sendACK();
// check if we need to re-transmit a loss list
// we do this if it has been longer than the current nakInterval since we last sent
auto now = high_resolution_clock::now();
if (duration_cast<microseconds>(now - _lastNAKTime).count() >= _nakInterval) {
// construct a NAK packet that will hold all of the lost sequence numbers
auto lossListPacket = ControlPacket::create(ControlPacket::TimeoutNAK, _lossList.getLength() * sizeof(SequenceNumber));
// TODO: pack in the lost sequence numbers
// have our SendQueue send off this control packet
_sendQueue->sendPacket(*lossListPacket);
// record this as the last NAK time
_lastNAKTime = high_resolution_clock::now();
++_totalSentTimeoutNAKs;
}
}
void Connection::sendACK(bool wasCausedBySyncTimeout) {
static const int ACK_PACKET_PAYLOAD_BYTES = sizeof(_lastSentACK) + sizeof(_currentACKSubSequenceNumber)
+ sizeof(_rtt) + sizeof(int32_t) + sizeof(int32_t);
@ -65,10 +110,10 @@ void Connection::sendACK(bool wasCausedBySyncTimeout) {
} else if (nextACKNumber == _lastSentACK) {
// We already sent this ACK, but check if we should re-send it.
// We will re-send if it has been more than RTT + (4 * RTT variance) since the last ACK
milliseconds sinceLastACK = duration_cast<milliseconds>(currentTime - lastACKSendTime);
// We will re-send if it has been more than the estimated timeout since the last ACK
microseconds sinceLastACK = duration_cast<microseconds>(currentTime - lastACKSendTime);
if (sinceLastACK.count() < (_rtt + (4 * _rttVariance))) {
if (sinceLastACK.count() < estimatedTimeout()) {
return;
}
}
@ -98,9 +143,11 @@ void Connection::sendACK(bool wasCausedBySyncTimeout) {
// write this ACK to the map of sent ACKs
_sentACKs[_currentACKSubSequenceNumber] = { nextACKNumber, high_resolution_clock::now() };
++_totalSentACKs;
}
void Connection::sendLightACK() const {
void Connection::sendLightACK() {
// create the light ACK packet, make it static so we can re-use it
static const int LIGHT_ACK_PACKET_PAYLOAD_BYTES = sizeof(SequenceNumber);
static auto lightACKPacket = ControlPacket::create(ControlPacket::ACK, LIGHT_ACK_PACKET_PAYLOAD_BYTES);
@ -120,6 +167,8 @@ void Connection::sendLightACK() const {
// have the send queue send off our packet immediately
_sendQueue->sendPacket(*lightACKPacket);
++_totalSentLightACKs;
}
SequenceNumber Connection::nextACK() const {
@ -162,6 +211,25 @@ void Connection::processReceivedSequenceNumber(SequenceNumber seq) {
// have the send queue send off our packet immediately
_sendQueue->sendPacket(*lossReport);
// record our last NAK time
_lastNAKTime = high_resolution_clock::now();
++_totalSentNAKs;
// figure out when we should send the next loss report, if we haven't heard anything back
_nakInterval = (_rtt + 4 * _rttVariance);
int receivedPacketsPerSecond = _receiveWindow.getPacketReceiveSpeed();
if (receivedPacketsPerSecond > 0) {
// the NAK interval is at least the _minNAKInterval
// but might be the time required for all lost packets to be retransmitted
_nakInterval = std::max((int) (_lossList.getLength() * (USECS_PER_SECOND / receivedPacketsPerSecond)),
_minNAKInterval);
} else {
// the NAK interval is at least the _minNAKInterval but might be the estimated timeout
_nakInterval = std::max(estimatedTimeout(), _minNAKInterval);
}
}
if (seq > _lastReceivedSequenceNumber) {
@ -171,6 +239,8 @@ void Connection::processReceivedSequenceNumber(SequenceNumber seq) {
// Otherwise, it's a resend, remove it from the loss list
_lossList.remove(seq);
}
++_totalReceivedDataPackets;
}
void Connection::processControl(unique_ptr<ControlPacket> controlPacket) {
@ -188,6 +258,9 @@ void Connection::processControl(unique_ptr<ControlPacket> controlPacket) {
case ControlPacket::NAK:
processNAK(move(controlPacket));
break;
case ControlPacket::TimeoutNAK:
processTimeoutNAK(move(controlPacket));
break;
}
}
@ -202,7 +275,7 @@ void Connection::processACK(std::unique_ptr<ControlPacket> controlPacket) {
auto currentTime = high_resolution_clock::now();
static high_resolution_clock::time_point lastACK2SendTime;
milliseconds sinceLastACK2 = duration_cast<milliseconds>(currentTime - lastACK2SendTime);
microseconds sinceLastACK2 = duration_cast<microseconds>(currentTime - lastACK2SendTime);
if (sinceLastACK2.count() > _synInterval || currentACKSubSequenceNumber == _lastSentACK2) {
// setup a static ACK2 packet we will re-use
@ -218,6 +291,8 @@ void Connection::processACK(std::unique_ptr<ControlPacket> controlPacket) {
// update the last sent ACK2 and the last ACK2 send time
_lastSentACK2 = currentACKSubSequenceNumber;
lastACK2SendTime = high_resolution_clock::now();
++_totalSentACK2s;
}
// read the ACKed sequence number
@ -261,6 +336,7 @@ void Connection::processACK(std::unique_ptr<ControlPacket> controlPacket) {
updateRTT(rtt);
// set the RTT for congestion control
_congestionControl->setRtt(_rtt);
if (controlPacket->getPayloadSize() > (qint64) (sizeof(SequenceNumber) + sizeof(SequenceNumber) + sizeof(rtt))) {
int32_t deliveryRate, bandwidth;
@ -268,9 +344,12 @@ void Connection::processACK(std::unique_ptr<ControlPacket> controlPacket) {
controlPacket->readPrimitive(&bandwidth);
// set the delivery rate and bandwidth for congestion control
_congestionControl->setRcvRate(deliveryRate);
_congestionControl->setBandwidth(bandwidth);
}
// fire the onACK callback for congestion control
_congestionControl->onAck(ack);
// update the total count of received ACKs
++_totalReceivedACKs;
@ -289,6 +368,8 @@ void Connection::processLightACK(std::unique_ptr<ControlPacket> controlPacket) {
// update the last received ACK to the this one
_lastReceivedACK = ack;
}
++_totalReceivedLightACKs;
}
void Connection::processACK2(std::unique_ptr<ControlPacket> controlPacket) {
@ -304,26 +385,40 @@ void Connection::processACK2(std::unique_ptr<ControlPacket> controlPacket) {
// calculate the RTT (time now - time ACK sent)
auto now = high_resolution_clock::now();
int rtt = duration_cast<milliseconds>(now - pair.second).count();
int rtt = duration_cast<microseconds>(now - pair.second).count();
updateRTT(rtt);
// set the RTT for congestion control
_congestionControl->setRtt(_rtt);
// update the last ACKed ACK
if (pair.first > _lastReceivedAcknowledgedACK) {
_lastReceivedAcknowledgedACK = pair.first;
}
}
++_totalReceivedACK2s;
}
void Connection::processNAK(std::unique_ptr<ControlPacket> controlPacket) {
// read the loss report
SequenceNumber start, end;
controlPacket->readPrimitive(&start);
if (controlPacket->bytesLeftToRead() >= (qint64)sizeof(SequenceNumber)) {
controlPacket->readPrimitive(&end);
}
++_totalReceivedNAKs;
}
void Connection::processTimeoutNAK(std::unique_ptr<ControlPacket> controlPacket) {
// read the NAKed sequence numbers from the packet
++_totalReceivedTimeoutNAKs;
}
void Connection::updateRTT(int rtt) {
@ -337,6 +432,11 @@ void Connection::updateRTT(int rtt) {
// Deviation = (1 - x)(deviation) + x |sampleRTT - estimatedRTT|
// (where x = 0.25 via Jacobson)
_rttVariance = (_rttVariance * 3 + abs(rtt - _rtt)) >> 2;
_rtt = (_rtt * 7 + rtt) >> 3;
static const int RTT_ESTIMATION_ALPHA_NUMERATOR = 8;
static const int RTT_ESTIMATION_VARIANCE_ALPHA_NUMERATOR = 4;
_rtt = (_rtt * (1 - RTT_ESTIMATION_ALPHA_NUMERATOR) + rtt) / RTT_ESTIMATION_ALPHA_NUMERATOR;
_rttVariance = (_rttVariance * (1 - RTT_ESTIMATION_VARIANCE_ALPHA_NUMERATOR)
+ abs(rtt - _rtt)) / RTT_ESTIMATION_VARIANCE_ALPHA_NUMERATOR;
}

View file

@ -22,6 +22,7 @@
namespace udt {
class CongestionControl;
class ControlPacket;
class Packet;
class Socket;
@ -32,12 +33,12 @@ public:
using SequenceNumberTimePair = std::pair<SequenceNumber, std::chrono::high_resolution_clock::time_point>;
using SentACKMap = std::unordered_map<SequenceNumber, SequenceNumberTimePair>;
Connection(Socket* parentSocket, HifiSockAddr destination);
Connection(Socket* parentSocket, HifiSockAddr destination, std::unique_ptr<CongestionControl> congestionControl);
~Connection();
void sendReliablePacket(std::unique_ptr<Packet> packet);
void sendACK(bool wasCausedBySyncTimeout = true);
void sendLightACK() const;
void sync(); // rate control method, fired by Socket for all connections on SYN interval
SequenceNumber nextACK() const;
@ -47,14 +48,24 @@ public:
void processControl(std::unique_ptr<ControlPacket> controlPacket);
private:
void sendACK(bool wasCausedBySyncTimeout = true);
void sendLightACK();
void processACK(std::unique_ptr<ControlPacket> controlPacket);
void processLightACK(std::unique_ptr<ControlPacket> controlPacket);
void processACK2(std::unique_ptr<ControlPacket> controlPacket);
void processNAK(std::unique_ptr<ControlPacket> controlPacket);
void processTimeoutNAK(std::unique_ptr<ControlPacket> controlPacket);
void updateRTT(int rtt);
int _synInterval; // Periodical Rate Control Interval, defaults to 10ms
int estimatedTimeout() const { return _rtt + _rttVariance * 4; }
int _synInterval; // Periodical Rate Control Interval, in microseconds, defaults to 10ms
int _nakInterval; // NAK timeout interval, in microseconds
int _minNAKInterval { 100000 }; // NAK timeout interval lower bound, default of 100ms
std::chrono::high_resolution_clock::time_point _lastNAKTime;
LossList _lossList; // List of all missing packets
SequenceNumber _lastReceivedSequenceNumber { SequenceNumber::MAX }; // The largest sequence number received from the peer
@ -66,9 +77,7 @@ private:
SequenceNumber _lastSentACK { SequenceNumber::MAX }; // The last sent ACK
SequenceNumber _lastSentACK2; // The last sent ACK sub-sequence number in an ACK2
int _totalReceivedACKs { 0 };
int32_t _rtt; // RTT, in milliseconds
int32_t _rtt; // RTT, in microseconds
int32_t _rttVariance; // RTT variance
int _flowWindowSize; // Flow control window size
@ -80,6 +89,25 @@ private:
PacketTimeWindow _receiveWindow { 16, 64 }; // Window of interval between packets (16) and probes (64) for bandwidth and receive speed
std::unique_ptr<SendQueue> _sendQueue;
std::unique_ptr<CongestionControl> _congestionControl;
// Control Packet stat collection
int _totalReceivedACKs { 0 };
int _totalSentACKs { 0 };
int _totalSentLightACKs { 0 };
int _totalReceivedLightACKs { 0 };
int _totalReceivedACK2s { 0 };
int _totalSentACK2s { 0 };
int _totalReceivedNAKs { 0 };
int _totalSentNAKs { 0 };
int _totalReceivedTimeoutNAKs { 0 };
int _totalSentTimeoutNAKs { 0 };
// Data packet stat collection
int _totalReceivedDataPackets { 0 };
};
}

View file

@ -29,7 +29,8 @@ public:
enum Type : uint16_t {
ACK,
ACK2,
NAK
NAK,
TimeoutNAK
};
static std::unique_ptr<ControlPacket> create(Type type, qint64 size = -1);

View file

@ -18,11 +18,14 @@
using namespace udt;
using namespace std::chrono;
static const int DEFAULT_PACKET_INTERVAL_MICROSECONDS = 1000000;
static const int DEFAULT_PROBE_INTERVAL_MICROSECONDS = 1000;
PacketTimeWindow::PacketTimeWindow(int numPacketIntervals, int numProbeIntervals) :
_numPacketIntervals(numPacketIntervals),
_numProbeIntervals(numProbeIntervals),
_packetIntervals({ _numPacketIntervals }),
_probeIntervals({ _numProbeIntervals })
_packetIntervals({ _numPacketIntervals, DEFAULT_PACKET_INTERVAL_MICROSECONDS }),
_probeIntervals({ _numProbeIntervals, DEFAULT_PROBE_INTERVAL_MICROSECONDS })
{
}
@ -40,8 +43,9 @@ int32_t meanOfMedianFilteredValues(std::vector<int> intervals, int numValues, in
int count = 0;
int sum = 0;
int upperBound = median * 8;
int lowerBound = median / 8;
static const int MEDIAN_FILTERING_BOUND_MULTIPLIER = 8;
int upperBound = median * MEDIAN_FILTERING_BOUND_MULTIPLIER;
int lowerBound = median / MEDIAN_FILTERING_BOUND_MULTIPLIER;
for (auto& interval : intervals) {
if ((interval < upperBound) && interval > lowerBound) {

View file

@ -13,6 +13,7 @@
#include <algorithm>
#include <QtCore/QCoreApplication>
#include <QtCore/QThread>
#include <SharedUtil.h>
@ -21,6 +22,7 @@
#include "Socket.h"
using namespace udt;
using namespace std::chrono;
std::unique_ptr<SendQueue> SendQueue::create(Socket* socket, HifiSockAddr dest) {
auto queue = std::unique_ptr<SendQueue>(new SendQueue(socket, dest));
@ -42,17 +44,7 @@ SendQueue::SendQueue(Socket* socket, HifiSockAddr dest) :
_socket(socket),
_destination(dest)
{
_sendTimer.reset(new QTimer(this));
_sendTimer->setSingleShot(true);
QObject::connect(_sendTimer.get(), &QTimer::timeout, this, &SendQueue::sendNextPacket);
_packetSendPeriod = DEFAULT_SEND_PERIOD;
_lastSendTimestamp = 0;
}
SendQueue::~SendQueue() {
assert(thread() == QThread::currentThread());
_sendTimer->stop();
}
void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
@ -60,24 +52,24 @@ void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
QWriteLocker locker(&_packetsLock);
_packets.push_back(std::move(packet));
}
if (!_running) {
start();
if (!_isRunning) {
run();
}
}
void SendQueue::start() {
void SendQueue::run() {
// We need to make sure this is called on the right thread
if (thread() != QThread::currentThread()) {
QMetaObject::invokeMethod(this, "start", Qt::QueuedConnection);
QMetaObject::invokeMethod(this, "run", Qt::QueuedConnection);
}
_running = true;
_isRunning = true;
// This will send a packet and fire the send timer
sendNextPacket();
// This will loop and sleep to send packets
loop();
}
void SendQueue::stop() {
_running = false;
_isRunning = false;
}
void SendQueue::sendPacket(const BasePacket& packet) {
@ -116,61 +108,68 @@ SequenceNumber SendQueue::getNextSequenceNumber() {
return _currentSequenceNumber;
}
void SendQueue::sendNextPacket() {
if (!_running) {
return;
}
// Record timing
auto sendTime = msecTimestampNow(); // msec
_lastSendTimestamp = sendTime;
if (_nextPacket) {
// Write packet's sequence number and send it off
_nextPacket->writeSequenceNumber(getNextSequenceNumber());
sendPacket(*_nextPacket);
void SendQueue::loop() {
while (_isRunning) {
// Record timing
_lastSendTimestamp = high_resolution_clock::now();
// Insert the packet we have just sent in the sent list
QWriteLocker locker(&_sentLock);
_sentPackets[_nextPacket->getSequenceNumber()].swap(_nextPacket);
Q_ASSERT_X(!_nextPacket,
"SendQueue::sendNextPacket()", "Overriden packet in sent list");
}
bool hasResend = false;
SequenceNumber seqNum;
{
// Check nak list for packet to resend
QWriteLocker locker(&_naksLock);
if (_naks.getLength() > 0) {
hasResend = true;
seqNum = _naks.popFirstSequenceNumber();
if (_nextPacket) {
// Write packet's sequence number and send it off
_nextPacket->writeSequenceNumber(getNextSequenceNumber());
sendPacket(*_nextPacket);
// Insert the packet we have just sent in the sent list
QWriteLocker locker(&_sentLock);
_sentPackets[_nextPacket->getSequenceNumber()].swap(_nextPacket);
Q_ASSERT_X(!_nextPacket,
"SendQueue::sendNextPacket()", "Overriden packet in sent list");
}
bool hasResend = false;
SequenceNumber seqNum;
{
// Check nak list for packet to resend
QWriteLocker locker(&_naksLock);
if (_naks.getLength() > 0) {
hasResend = true;
seqNum = _naks.popFirstSequenceNumber();
}
}
// Find packet in sent list using SequenceNumber
if (hasResend) {
QWriteLocker locker(&_sentLock);
auto it = _sentPackets.find(seqNum);
Q_ASSERT_X(it != _sentPackets.end(),
"SendQueue::sendNextPacket()", "Couldn't find NAKed packet to resend");
if (it != _sentPackets.end()) {
it->second.swap(_nextPacket);
_sentPackets.erase(it);
}
}
// If there is no packet to resend, grab the next one in the list
if (!_nextPacket) {
QWriteLocker locker(&_packetsLock);
_nextPacket.swap(_packets.front());
_packets.pop_front();
}
// since we're a while loop, give the thread a chance to process events
QCoreApplication::processEvents();
// we just processed events so check now if we were just told to stop
if (!_isRunning) {
break;
}
// sleep as long as we need until next packet send, if we can
auto now = high_resolution_clock::now();
auto microsecondDuration = (_lastSendTimestamp + microseconds(_packetSendPeriod)) - now;
if (microsecondDuration.count() > 0) {
usleep(microsecondDuration.count());
}
}
// Find packet in sent list using SequenceNumber
if (hasResend) {
QWriteLocker locker(&_sentLock);
auto it = _sentPackets.find(seqNum);
Q_ASSERT_X(it != _sentPackets.end(),
"SendQueue::sendNextPacket()", "Couldn't find NAKed packet to resend");
if (it != _sentPackets.end()) {
it->second.swap(_nextPacket);
_sentPackets.erase(it);
}
}
// If there is no packet to resend, grab the next one in the list
if (!_nextPacket) {
QWriteLocker locker(&_packetsLock);
_nextPacket.swap(_packets.front());
_packets.pop_front();
}
// check if we need to fire off a packet pair - we do this
// How long before next packet send
auto timeToSleep = (sendTime + _packetSendPeriod) - msecTimestampNow(); // msec
_sendTimer->start(std::max((quint64)0, timeToSleep));
}

View file

@ -12,6 +12,7 @@
#ifndef hifi_SendQueue_h
#define hifi_SendQueue_h
#include <chrono>
#include <list>
#include <unordered_map>
@ -34,15 +35,13 @@ class SendQueue : public QObject {
Q_OBJECT
public:
static const int DEFAULT_SEND_PERIOD = 16; // msec
static const int DEFAULT_SEND_PERIOD = 16 * 1000; // 16ms, in microseconds
static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr dest);
void queuePacket(std::unique_ptr<Packet> packet);
int getQueueSize() const { QReadLocker locker(&_packetsLock); return _packets.size(); }
quint64 getLastSendTimestamp() const { return _lastSendTimestamp; }
SequenceNumber getCurrentSequenceNumber() const { return SequenceNumber(_atomicCurrentSequenceNumber); }
int getPacketSendPeriod() const { return _packetSendPeriod; }
@ -52,14 +51,14 @@ public:
void sendPacket(const BasePacket& packet);
public slots:
void start();
void run();
void stop();
void ack(SequenceNumber ack);
void nak(SequenceNumber start, SequenceNumber end);
private slots:
void sendNextPacket();
void loop();
private:
friend struct std::default_delete<SendQueue>;
@ -67,7 +66,6 @@ private:
SendQueue(Socket* socket, HifiSockAddr dest);
SendQueue(SendQueue& other) = delete;
SendQueue(SendQueue&& other) = delete;
~SendQueue();
// Increments current sequence number and return it
SequenceNumber getNextSequenceNumber();
@ -83,10 +81,9 @@ private:
SequenceNumber _currentSequenceNumber; // Last sequence number sent out
std::atomic<uint32_t> _atomicCurrentSequenceNumber; // Atomic for last sequence number sent out
std::unique_ptr<QTimer> _sendTimer; // Send timer
std::atomic<int> _packetSendPeriod { 0 }; // Interval between two packet send envent in msec
std::atomic<quint64> _lastSendTimestamp { 0 }; // Record last time of packet departure
std::atomic<bool> _running { false };
std::atomic<int> _packetSendPeriod { 0 }; // Interval between two packet send event in microseconds
std::chrono::high_resolution_clock::time_point _lastSendTimestamp; // Record last time of packet departure
std::atomic<bool> _isRunning { false };
mutable QReadWriteLock _naksLock; // Protects the naks list.
LossList _naks; // Sequence numbers of packets to resend

View file

@ -71,7 +71,7 @@ qint64 Socket::writePacket(const Packet& packet, const HifiSockAddr& sockAddr) {
Q_ASSERT_X(!packet.isReliable(), "Socket::writePacket", "Cannot send a reliable packet unreliably");
// write the correct sequence number to the Packet here
packet.writeSequenceNumber(_currentUnreliableSequenceNumber);
packet.writeSequenceNumber(++_unreliableSequenceNumbers[sockAddr]);
return writeDatagram(packet.getData(), packet.getDataSize(), sockAddr);
}
@ -80,7 +80,7 @@ qint64 Socket::writePacket(std::unique_ptr<Packet> packet, const HifiSockAddr& s
if (packet->isReliable()) {
auto it = _connectionsHash.find(sockAddr);
if (it == _connectionsHash.end()) {
it = _connectionsHash.insert(it, std::make_pair(sockAddr, new Connection(this, sockAddr)));
it = _connectionsHash.insert(it, std::make_pair(sockAddr, new Connection(this, sockAddr, _ccFactory->create())));
}
it->second->sendReliablePacket(std::move(packet));
return 0;
@ -146,7 +146,7 @@ void Socket::rateControlSync() {
// enumerate our list of connections and ask each of them to send off periodic ACK packet for rate control
for (auto& connection : _connectionsHash) {
connection.second->sendACK();
connection.second->sync();
}
if (_synTimer.interval() != _synInterval) {
@ -155,3 +155,11 @@ void Socket::rateControlSync() {
_synTimer.start(_synInterval);
}
}
void Socket::setCongestionControlFactory(std::unique_ptr<CongestionControlVirtualFactory> ccFactory) {
// swap the current unique_ptr for the new factory
_ccFactory.swap(ccFactory);
// update the _synInterval to the value from the factory
_synInterval = _ccFactory->synInterval();
}

View file

@ -22,6 +22,7 @@
#include <QtNetwork/QUdpSocket>
#include "../HifiSockAddr.h"
#include "CongestionControl.h"
#include "Connection.h"
namespace udt {
@ -59,6 +60,8 @@ public:
void addUnfilteredHandler(const HifiSockAddr& senderSockAddr, BasePacketHandler handler)
{ _unfilteredHandlers[senderSockAddr] = handler; }
void setCongestionControlFactory(std::unique_ptr<CongestionControlVirtualFactory> ccFactory);
private slots:
void readPendingDatagrams();
@ -69,13 +72,14 @@ private:
PacketFilterOperator _packetFilterOperator;
PacketHandler _packetHandler;
SequenceNumber _currentUnreliableSequenceNumber;
std::unordered_map<HifiSockAddr, BasePacketHandler> _unfilteredHandlers;
std::unordered_map<HifiSockAddr, SequenceNumber> _unreliableSequenceNumbers;
std::unordered_map<HifiSockAddr, Connection*> _connectionsHash;
int32_t _synInterval = 10; // 10ms
QTimer _synTimer;
std::unique_ptr<CongestionControlVirtualFactory> _ccFactory { new CongestionControlFactory<DefaultCC>() };
};
} // namespace udt