More cleanup

This commit is contained in:
Clement 2018-07-17 14:01:14 -07:00
parent 0bfe2671dd
commit 9a1c4afbd8
10 changed files with 25 additions and 383 deletions

View file

@ -32,11 +32,9 @@ class CongestionControl {
friend class Connection;
public:
CongestionControl() {};
CongestionControl(int synInterval) : _synInterval(synInterval) {}
virtual ~CongestionControl() {}
int synInterval() const { return _synInterval; }
CongestionControl() = default;
virtual ~CongestionControl() = default;
void setMaxBandwidth(int maxBandwidth);
virtual void init() {}
@ -47,31 +45,25 @@ public:
virtual void onTimeout() {}
virtual void onPacketSent(int wireSize, SequenceNumber seqNum, p_high_resolution_clock::time_point timePoint) {}
virtual int estimatedTimeout() const = 0;
protected:
void setMSS(int mss) { _mss = mss; }
void setMaxCongestionWindowSize(int window) { _maxCongestionWindowSize = window; }
virtual void setInitialSendSequenceNumber(SequenceNumber seqNum) = 0;
void setSendCurrentSequenceNumber(SequenceNumber seqNum) { _sendCurrSeqNum = seqNum; }
void setReceiveRate(int rate) { _receiveRate = rate; }
void setRTT(int rtt) { _rtt = rtt; }
void setPacketSendPeriod(double newSendPeriod); // call this internally to ensure send period doesn't go past max bandwidth
double _packetSendPeriod { 1.0 }; // Packet sending period, in microseconds
int _congestionWindowSize { 16 }; // Congestion window size, in packets
std::atomic<int> _maxBandwidth { -1 }; // Maximum desired bandwidth, bits per second
int _maxCongestionWindowSize { 0 }; // maximum cwnd size, in packets
int _mss { 0 }; // Maximum Packet Size, including all packet headers
SequenceNumber _sendCurrSeqNum; // current maximum seq num sent out
int _receiveRate { 0 }; // packet arrive rate at receiver side, packets per second
int _rtt { 0 }; // current estimated RTT, microsecond
private:
CongestionControl(const CongestionControl& other) = delete;
CongestionControl& operator=(const CongestionControl& other) = delete;
int _synInterval { DEFAULT_SYN_INTERVAL };
};
@ -79,8 +71,6 @@ class CongestionControlVirtualFactory {
public:
virtual ~CongestionControlVirtualFactory() {}
static int synInterval() { return DEFAULT_SYN_INTERVAL; }
virtual std::unique_ptr<CongestionControl> create() = 0;
};

View file

@ -39,19 +39,9 @@ Connection::Connection(Socket* parentSocket, HifiSockAddr destination, std::uniq
Q_ASSERT_X(_congestionControl, "Connection::Connection", "Must be called with a valid CongestionControl object");
_congestionControl->init();
// setup default SYN, RTT and RTT Variance based on the SYN interval in CongestionControl object
_synInterval = _congestionControl->synInterval();
resetRTT();
// set the initial RTT and flow window size on congestion control object
_congestionControl->setRTT(_rtt);
_congestionControl->setMaxCongestionWindowSize(_flowWindowSize);
// Setup packets
static const int ACK_PACKET_PAYLOAD_BYTES = sizeof(_lastSentACK)
+ sizeof(_rtt) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t);
static const int ACK_PACKET_PAYLOAD_BYTES = sizeof(SequenceNumber);
static const int HANDSHAKE_ACK_PAYLOAD_BYTES = sizeof(SequenceNumber);
_ackPacket = ControlPacket::create(ControlPacket::ACK, ACK_PACKET_PAYLOAD_BYTES);
@ -95,11 +85,6 @@ void Connection::stopSendQueue() {
}
}
void Connection::resetRTT() {
_rtt = _synInterval * 10;
_rttVariance = _rtt / 2;
}
void Connection::setMaxBandwidth(int maxBandwidth) {
_congestionControl->setMaxBandwidth(maxBandwidth);
}
@ -133,9 +118,8 @@ SendQueue& Connection::getSendQueue() {
// set defaults on the send queue from our congestion control object and estimatedTimeout()
_sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod);
_sendQueue->setSyncInterval(_synInterval);
_sendQueue->setEstimatedTimeout(estimatedTimeout());
_sendQueue->setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize));
_sendQueue->setEstimatedTimeout(_congestionControl->estimatedTimeout());
_sendQueue->setFlowWindowSize(_congestionControl->_congestionWindowSize);
// give the randomized sequence number to the congestion control object
_congestionControl->setInitialSendSequenceNumber(_sendQueue->getCurrentSequenceNumber());
@ -199,23 +183,6 @@ void Connection::queueReceivedMessagePacket(std::unique_ptr<Packet> packet) {
}
void Connection::sync() {
if (_isReceivingData) {
// check if we should expire the receive portion of this connection
// this occurs if it has been 16 timeouts since the last data received and at least 5 seconds
static const int NUM_TIMEOUTS_BEFORE_EXPIRY = 16;
static const int MIN_SECONDS_BEFORE_EXPIRY = 5;
auto now = p_high_resolution_clock::now();
auto sincePacketReceive = now - _lastReceiveTime;
if (duration_cast<microseconds>(sincePacketReceive).count() >= NUM_TIMEOUTS_BEFORE_EXPIRY * estimatedTimeout()
&& duration_cast<seconds>(sincePacketReceive).count() >= MIN_SECONDS_BEFORE_EXPIRY ) {
// the receive side of this connection is expired
_isReceivingData = false;
}
}
}
void Connection::recordSentPackets(int wireSize, int payloadSize,
@ -231,44 +198,17 @@ void Connection::recordRetransmission(int wireSize, SequenceNumber seqNum, p_hig
_congestionControl->onPacketSent(wireSize, seqNum, timePoint);
}
void Connection::sendACK(bool wasCausedBySyncTimeout) {
static p_high_resolution_clock::time_point lastACKSendTime;
void Connection::sendACK() {
SequenceNumber nextACKNumber = nextACK();
Q_ASSERT_X(nextACKNumber >= _lastSentACK, "Connection::sendACK", "Sending lower ACK, something is wrong");
// we have received new packets since the last sent ACK
// or our congestion control dictates that we always send ACKs
// update the last sent ACK
_lastSentACK = nextACKNumber;
_ackPacket->reset(); // We need to reset it every time.
// pack in the ACK number
_ackPacket->writePrimitive(nextACKNumber);
// pack in the RTT and variance
_ackPacket->writePrimitive(_rtt);
// pack the available buffer size, in packets
// in our implementation we have no hard limit on receive buffer size, send the default value
_ackPacket->writePrimitive((int32_t) udt::MAX_PACKETS_IN_FLIGHT);
if (wasCausedBySyncTimeout) {
// grab the up to date packet receive speed
int32_t packetReceiveSpeed = _receiveWindow.getPacketReceiveSpeed();
// update those values in our connection stats
_stats.recordReceiveRate(packetReceiveSpeed);
// pack in the receive speed
_ackPacket->writePrimitive(packetReceiveSpeed);
}
// record this as the last ACK send time
lastACKSendTime = p_high_resolution_clock::now();
// have the socket send off our packet
_parentSocket->writeBasePacket(*_ackPacket, _destination);
@ -304,12 +244,8 @@ bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, in
return false;
}
_isReceivingData = true;
// mark our last receive time as now (to push the potential expiry farther)
_lastReceiveTime = p_high_resolution_clock::now();
_receiveWindow.onPacketArrival();
// If this is not the next sequence number, report loss
if (sequenceNumber > _lastReceivedSequenceNumber + 1) {
@ -331,7 +267,7 @@ bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, in
}
// using a congestion control that ACKs every packet (like TCP Vegas)
sendACK(true);
sendACK();
if (wasDuplicate) {
_stats.record(ConnectionStats::Stats::Duplicate);
@ -397,22 +333,9 @@ void Connection::processACK(ControlPacketPointer controlPacket) {
return;
}
// read the RTT
int32_t rtt;
controlPacket->readPrimitive(&rtt);
if (ack < _lastReceivedACK) {
if (ack <= _lastReceivedACK) {
// this is an out of order ACK, bail
return;
}
// this is a valid ACKed sequence number - update the flow window size and the last received ACK
int32_t packedFlowWindow;
controlPacket->readPrimitive(&packedFlowWindow);
_flowWindowSize = packedFlowWindow;
if (ack == _lastReceivedACK) {
// or
// processing an already received ACK, bail
return;
}
@ -421,35 +344,7 @@ void Connection::processACK(ControlPacketPointer controlPacket) {
// ACK the send queue so it knows what was received
getSendQueue().ack(ack);
// update the RTT
updateRTT(rtt);
// write this RTT to stats
_stats.recordRTT(rtt);
// set the RTT for congestion control
_congestionControl->setRTT(_rtt);
if (controlPacket->bytesLeftToRead() > 0) {
int32_t receiveRate;
Q_ASSERT_X(controlPacket->bytesLeftToRead() == sizeof(receiveRate),
"Connection::processACK", "sync interval ACK packet does not contain expected data");
controlPacket->readPrimitive(&receiveRate);
// set the delivery rate for congestion control
// these are calculated using an EWMA
static const int EMWA_ALPHA_NUMERATOR = 8;
// record these samples in connection stats
_stats.recordSendRate(receiveRate);
_deliveryRate = (_deliveryRate * (EMWA_ALPHA_NUMERATOR - 1) + receiveRate) / EMWA_ALPHA_NUMERATOR;
_congestionControl->setReceiveRate(_deliveryRate);
}
// give this ACK to the congestion control and update the send queue parameters
updateCongestionControlAndSendQueue([this, ack, &controlPacket] {
@ -478,7 +373,6 @@ void Connection::processHandshake(ControlPacketPointer controlPacket) {
resetReceiveState();
_initialReceiveSequenceNumber = initialSequenceNumber;
_lastReceivedSequenceNumber = initialSequenceNumber - 1;
_lastSentACK = initialSequenceNumber - 1;
}
_handshakeACK->reset();
@ -516,22 +410,13 @@ void Connection::resetReceiveState() {
SequenceNumber defaultSequenceNumber;
_lastReceivedSequenceNumber = defaultSequenceNumber;
_lastSentACK = defaultSequenceNumber;
// clear the loss list
_lossList.clear();
// clear sync variables
_isReceivingData = false;
_connectionStart = p_high_resolution_clock::now();
// reset RTT to initial value
resetRTT();
// clear the intervals in the receive window
_receiveWindow.reset();
// clear any pending received messages
for (auto& pendingMessage : _pendingReceivedMessages) {
_parentSocket->messageFailed(this, pendingMessage.first);
@ -539,30 +424,6 @@ void Connection::resetReceiveState() {
_pendingReceivedMessages.clear();
}
void Connection::updateRTT(int rtt) {
// This updates the RTT using exponential weighted moving average
// This is the Jacobson's forumla for RTT estimation
// http://www.mathcs.emory.edu/~cheung/Courses/455/Syllabus/7-transport/Jacobson-88.pdf
// Estimated RTT = (1 - x)(estimatedRTT) + (x)(sampleRTT)
// (where x = 0.125 via Jacobson)
// Deviation = (1 - x)(deviation) + x |sampleRTT - estimatedRTT|
// (where x = 0.25 via Jacobson)
static const int RTT_ESTIMATION_ALPHA_NUMERATOR = 8;
static const int RTT_ESTIMATION_VARIANCE_ALPHA_NUMERATOR = 4;
_rtt = (_rtt * (RTT_ESTIMATION_ALPHA_NUMERATOR - 1) + rtt) / RTT_ESTIMATION_ALPHA_NUMERATOR;
_rttVariance = (_rttVariance * (RTT_ESTIMATION_VARIANCE_ALPHA_NUMERATOR - 1)
+ abs(rtt - _rtt)) / RTT_ESTIMATION_VARIANCE_ALPHA_NUMERATOR;
}
int Connection::estimatedTimeout() const {
return _rtt + _rttVariance * 4;
}
void Connection::updateCongestionControlAndSendQueue(std::function<void ()> congestionCallback) {
// update the last sent sequence number in congestion control
_congestionControl->setSendCurrentSequenceNumber(getSendQueue().getCurrentSequenceNumber());
@ -574,8 +435,8 @@ void Connection::updateCongestionControlAndSendQueue(std::function<void ()> cong
// now that we've updated the congestion control, update the packet send period and flow window size
sendQueue.setPacketSendPeriod(_congestionControl->_packetSendPeriod);
sendQueue.setEstimatedTimeout(estimatedTimeout());
sendQueue.setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize));
sendQueue.setEstimatedTimeout(_congestionControl->estimatedTimeout());
sendQueue.setFlowWindowSize(_congestionControl->_congestionWindowSize);
// record connection stats
_stats.recordPacketSendPeriod(_congestionControl->_packetSendPeriod);

View file

@ -22,7 +22,6 @@
#include "ConnectionStats.h"
#include "Constants.h"
#include "LossList.h"
#include "PacketTimeWindow.h"
#include "SendQueue.h"
#include "../HifiSockAddr.h"
@ -86,35 +85,27 @@ private slots:
void queueTimeout();
private:
void sendACK(bool wasCausedBySyncTimeout = true);
void sendACK();
void processACK(ControlPacketPointer controlPacket);
void processHandshake(ControlPacketPointer controlPacket);
void processHandshakeACK(ControlPacketPointer controlPacket);
void resetReceiveState();
void resetRTT();
SendQueue& getSendQueue();
SequenceNumber nextACK() const;
void updateRTT(int rtt);
int estimatedTimeout() const;
void updateCongestionControlAndSendQueue(std::function<void()> congestionCallback);
void stopSendQueue();
int _synInterval; // Periodical Rate Control Interval, in microseconds
bool _hasReceivedHandshake { false }; // flag for receipt of handshake from server
bool _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client
bool _didRequestHandshake { false }; // flag for request of handshake from server
p_high_resolution_clock::time_point _connectionStart = p_high_resolution_clock::now(); // holds the time_point for creation of this connection
p_high_resolution_clock::time_point _lastReceiveTime; // holds the last time we received anything from sender
bool _isReceivingData { false }; // flag used for expiry of receipt portion of connection
SequenceNumber _initialSequenceNumber; // Randomized on Connection creation, identifies connection during re-connect requests
SequenceNumber _initialReceiveSequenceNumber; // Randomized by peer Connection on creation, identifies connection during re-connect requests
@ -125,18 +116,8 @@ private:
SequenceNumber _lastReceivedSequenceNumber; // The largest sequence number received from the peer
SequenceNumber _lastReceivedACK; // The last ACK received
SequenceNumber _lastSentACK; // The last sent ACK
int32_t _rtt; // RTT, in microseconds
int32_t _rttVariance; // RTT variance
int _flowWindowSize { udt::MAX_PACKETS_IN_FLIGHT }; // Flow control window size
int _deliveryRate { 16 }; // Exponential moving average for receiver's receive rate, in packets per second
Socket* _parentSocket { nullptr };
HifiSockAddr _destination;
PacketTimeWindow _receiveWindow { 16 }; // Window of interval between packets (16)
std::unique_ptr<CongestionControl> _congestionControl;

View file

@ -95,7 +95,7 @@ PacketVersion versionForPacketType(PacketType packetType) {
case PacketType::AvatarIdentityRequest:
return 22;
default:
return 21;
return 22;
}
}

View file

@ -1,101 +0,0 @@
//
// PacketTimeWindow.cpp
// libraries/networking/src/udt
//
// Created by Stephen Birarda on 2015-07-28.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "PacketTimeWindow.h"
#include <numeric>
#include <cmath>
#include <NumericalConstants.h>
using namespace udt;
using namespace std::chrono;
static const int DEFAULT_PACKET_INTERVAL_MICROSECONDS = 1000000; // 1s
PacketTimeWindow::PacketTimeWindow(int numPacketIntervals) :
_numPacketIntervals(numPacketIntervals),
_packetIntervals(_numPacketIntervals, DEFAULT_PACKET_INTERVAL_MICROSECONDS)
{
}
void PacketTimeWindow::reset() {
_packetIntervals.assign(_numPacketIntervals, DEFAULT_PACKET_INTERVAL_MICROSECONDS);
}
template <typename Iterator>
int median(Iterator begin, Iterator end) {
// use std::nth_element to grab the middle - for an even number of elements this is the upper middle
Iterator middle = begin + (end - begin) / 2;
std::nth_element(begin, middle, end);
if ((end - begin) % 2 != 0) {
// odd number of elements, just return the middle
return *middle;
} else {
// even number of elements, return the mean of the upper middle and the lower middle
Iterator lowerMiddle = std::max_element(begin, middle);
return (*middle + *lowerMiddle) / 2;
}
}
int32_t meanOfMedianFilteredValues(std::vector<int> intervals, int numValues, int valuesRequired = 0) {
// grab the median value of the intervals vector
int intervalsMedian = median(intervals.begin(), intervals.end());
// figure out our bounds for median filtering
static const int MEDIAN_FILTERING_BOUND_MULTIPLIER = 8;
int upperBound = intervalsMedian * MEDIAN_FILTERING_BOUND_MULTIPLIER;
int lowerBound = intervalsMedian / MEDIAN_FILTERING_BOUND_MULTIPLIER;
int sum = 0;
int count = 0;
// sum the values that are inside the median filtered bounds
for (auto& interval : intervals) {
if ((interval < upperBound) && (interval > lowerBound)) {
++count;
sum += interval;
}
}
// make sure we hit our threshold of values required
if (count >= valuesRequired) {
// return the frequency (per second) for the mean interval
static const double USECS_PER_SEC = 1000000.0;
return (int32_t) ceil(USECS_PER_SEC / (((double) sum) / ((double) count)));
} else {
return 0;
}
}
int32_t PacketTimeWindow::getPacketReceiveSpeed() const {
// return the mean value of median filtered values (per second) - or zero if there are too few filtered values
return meanOfMedianFilteredValues(_packetIntervals, _numPacketIntervals, _numPacketIntervals / 2);
}
void PacketTimeWindow::onPacketArrival() {
// take the current time
auto now = p_high_resolution_clock::now();
if (_packetIntervals.size() > 0) {
// record the interval between this packet and the last one
_packetIntervals[_currentPacketInterval++] = duration_cast<microseconds>(now - _lastPacketTime).count();
// reset the currentPacketInterval index when it wraps
_currentPacketInterval %= _numPacketIntervals;
}
// remember this as the last packet arrival time
_lastPacketTime = now;
}

View file

@ -1,44 +0,0 @@
//
// PacketTimeWindow.h
// libraries/networking/src/udt
//
// Created by Stephen Birarda on 2015-07-28.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#pragma once
#ifndef hifi_PacketTimeWindow_h
#define hifi_PacketTimeWindow_h
#include <vector>
#include <PortableHighResolutionClock.h>
namespace udt {
class PacketTimeWindow {
public:
PacketTimeWindow(int numPacketIntervals = 16);
void onPacketArrival();
int32_t getPacketReceiveSpeed() const;
void reset();
private:
int _numPacketIntervals { 0 }; // the number of packet intervals to store
int _currentPacketInterval { 0 }; // index for the current packet interval
std::vector<int> _packetIntervals; // vector of microsecond intervals between packet arrivals
p_high_resolution_clock::time_point _lastPacketTime = p_high_resolution_clock::now(); // the time_point when last packet arrived
};
}
#endif // hifi_PacketTimeWindow_h

View file

@ -33,18 +33,11 @@ using namespace udt;
Socket::Socket(QObject* parent, bool shouldChangeSocketOptions) :
QObject(parent),
_synTimer(new QTimer(this)),
_readyReadBackupTimer(new QTimer(this)),
_shouldChangeSocketOptions(shouldChangeSocketOptions)
{
connect(&_udpSocket, &QUdpSocket::readyRead, this, &Socket::readPendingDatagrams);
// make sure our synchronization method is called every SYN interval
connect(_synTimer, &QTimer::timeout, this, &Socket::rateControlSync);
// start our timer for the synchronization time interval
_synTimer->start(_synInterval);
// make sure we hear about errors and state changes from the underlying socket
connect(&_udpSocket, SIGNAL(error(QAbstractSocket::SocketError)),
this, SLOT(handleSocketError(QAbstractSocket::SocketError)));
@ -427,49 +420,9 @@ void Socket::connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* r
}
}
void Socket::rateControlSync() {
// enumerate our list of connections and ask each of them to send off periodic ACK packet for rate control
// the way we do this is a little funny looking - we need to avoid the case where we call sync and
// (because of our Qt direct connection to the Connection's signal that it has been deactivated)
// an iterator on _connectionsHash would be invalidated by our own call to cleanupConnection
// collect the sockets for all connections in a vector
std::vector<HifiSockAddr> sockAddrVector;
sockAddrVector.reserve(_connectionsHash.size());
for (auto& connection : _connectionsHash) {
sockAddrVector.emplace_back(connection.first);
}
// enumerate that vector of HifiSockAddr objects
for (auto& sockAddr : sockAddrVector) {
// pull out the respective connection via a quick find on the unordered_map
auto it = _connectionsHash.find(sockAddr);
if (it != _connectionsHash.end()) {
// if the connection is erased while calling sync since we are re-using the iterator that was invalidated
// we're good to go
auto& connection = _connectionsHash[sockAddr];
connection->sync();
}
}
if (_synTimer->interval() != _synInterval) {
// if the _synTimer interval doesn't match the current _synInterval (changes when the CC factory is changed)
// then restart it now with the right interval
_synTimer->start(_synInterval);
}
}
void Socket::setCongestionControlFactory(std::unique_ptr<CongestionControlVirtualFactory> ccFactory) {
// swap the current unique_ptr for the new factory
_ccFactory.swap(ccFactory);
// update the _synInterval to the value from the factory
_synInterval = _ccFactory->synInterval();
}

View file

@ -102,7 +102,6 @@ public slots:
private slots:
void readPendingDatagrams();
void checkForReadyReadBackup();
void rateControlSync();
void handleSocketError(QAbstractSocket::SocketError socketError);
void handleStateChanged(QAbstractSocket::SocketState socketState);
@ -133,9 +132,6 @@ private:
std::unordered_map<HifiSockAddr, BasePacketHandler> _unfilteredHandlers;
std::unordered_map<HifiSockAddr, SequenceNumber> _unreliableSequenceNumbers;
std::unordered_map<HifiSockAddr, std::unique_ptr<Connection>> _connectionsHash;
int _synInterval { 10 }; // 10ms
QTimer* _synTimer { nullptr };
QTimer* _readyReadBackupTimer { nullptr };

View file

@ -101,12 +101,11 @@ bool TCPVegasCC::onACK(SequenceNumber ack, p_high_resolution_clock::time_point r
auto it = _sentPacketTimes.find(ack + 1);
if (it != _sentPacketTimes.end()) {
auto estimatedTimeout = _ewmaRTT + _rttVariance * 4;
auto now = p_high_resolution_clock::now();
auto sinceSend = duration_cast<microseconds>(now - it->second).count();
if (sinceSend >= estimatedTimeout) {
if (sinceSend >= estimatedTimeout()) {
// break out of slow start, we've decided this is loss
_slowStart = false;
@ -213,6 +212,11 @@ void TCPVegasCC::performCongestionAvoidance(udt::SequenceNumber ack) {
_numACKs = 0;
}
int TCPVegasCC::estimatedTimeout() const {
return _ewmaRTT == -1 ? DEFAULT_SYN_INTERVAL : _ewmaRTT + _rttVariance * 4;
}
bool TCPVegasCC::isCongestionWindowLimited() {
if (_slowStart) {
return true;

View file

@ -30,6 +30,8 @@ public:
virtual void onTimeout() override {};
virtual void onPacketSent(int wireSize, SequenceNumber seqNum, p_high_resolution_clock::time_point timePoint) override;
virtual int estimatedTimeout() const override;
protected:
virtual void performCongestionAvoidance(SequenceNumber ack);