Merge pull request #16 from birarda/handshake

add a handshake before UDT connection starts
This commit is contained in:
Stephen Birarda 2015-08-27 14:46:26 -07:00
commit 0807d1bf4d
9 changed files with 176 additions and 24 deletions

View file

@ -40,8 +40,7 @@ Connection::Connection(Socket* parentSocket, HifiSockAddr destination, std::uniq
// setup default SYN, RTT and RTT Variance based on the SYN interval in CongestionControl object
_synInterval = _congestionControl->synInterval();
_rtt = _synInterval * 10;
_rttVariance = _rtt / 2;
resetRTT();
// set the initial RTT and flow window size on congestion control object
_congestionControl->setRTT(_rtt);
@ -64,6 +63,11 @@ Connection::~Connection() {
}
}
void Connection::resetRTT() {
_rtt = _synInterval * 10;
_rttVariance = _rtt / 2;
}
SendQueue& Connection::getSendQueue() {
if (!_sendQueue) {
// Lasily create send queue
@ -125,13 +129,15 @@ void Connection::sync() {
// we send out a periodic ACK every rate control interval
sendACK();
// check if we need to re-transmit a loss list
// we do this if it has been longer than the current nakInterval since we last sent
auto now = high_resolution_clock::now();
if (duration_cast<microseconds>(now - _lastNAKTime).count() >= _nakInterval) {
// Send a timeout NAK packet
sendTimeoutNAK();
if (_lossList.getLength() > 0) {
// check if we need to re-transmit a loss list
// we do this if it has been longer than the current nakInterval since we last sent
auto now = high_resolution_clock::now();
if (duration_cast<microseconds>(now - _lastNAKTime).count() >= _nakInterval) {
// Send a timeout NAK packet
sendTimeoutNAK();
}
}
}
}
@ -318,6 +324,11 @@ SequenceNumber Connection::nextACK() const {
bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, int packetSize, int payloadSize) {
if (!_hasReceivedHandshake) {
// refuse to process any packets until we've received the handshake
return false;
}
_hasReceivedFirstPacket = true;
// check if this is a packet pair we should estimate bandwidth from, or just a regular packet
@ -349,7 +360,7 @@ bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, in
_nakInterval += (int) (_lossList.getLength() * (USECS_PER_SECOND / receivedPacketsPerSecond));
}
// the NAK interval is at least the _minNAKInterval but might be the estimated timeout
// the NAK interval is at least the _minNAKInterval but might be the value calculated above, if that is larger
_nakInterval = std::max(_nakInterval, _minNAKInterval);
}
@ -387,23 +398,42 @@ bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, in
}
void Connection::processControl(std::unique_ptr<ControlPacket> controlPacket) {
// Simple dispatch to control packets processing methods based on their type
// Simple dispatch to control packets processing methods based on their type.
// Processing of control packets (other than Handshake / Handshake ACK)
// is not performed if the handshake has not been completed.
switch (controlPacket->getType()) {
case ControlPacket::ACK:
if (controlPacket->getPayloadSize() == sizeof(SequenceNumber)) {
processLightACK(move(controlPacket));
} else {
processACK(move(controlPacket));
if (_hasReceivedHandshakeACK) {
if (controlPacket->getPayloadSize() == sizeof(SequenceNumber)) {
processLightACK(move(controlPacket));
} else {
processACK(move(controlPacket));
}
}
break;
case ControlPacket::ACK2:
processACK2(move(controlPacket));
if (_hasReceivedHandshake) {
processACK2(move(controlPacket));
}
break;
case ControlPacket::NAK:
processNAK(move(controlPacket));
if (_hasReceivedHandshakeACK) {
processNAK(move(controlPacket));
}
break;
case ControlPacket::TimeoutNAK:
processTimeoutNAK(move(controlPacket));
if (_hasReceivedHandshakeACK) {
processTimeoutNAK(move(controlPacket));
}
break;
case ControlPacket::Handshake:
processHandshake(move(controlPacket));
break;
case ControlPacket::HandshakeACK:
processHandshakeACK(move(controlPacket));
break;
}
}
@ -594,6 +624,30 @@ void Connection::processNAK(std::unique_ptr<ControlPacket> controlPacket) {
_stats.record(ConnectionStats::Stats::ReceivedNAK);
}
void Connection::processHandshake(std::unique_ptr<ControlPacket> controlPacket) {
if (!_hasReceivedHandshake || _hasReceivedFirstPacket) {
// server sent us a handshake - we need to assume this means state should be reset
// as long as we haven't received a handshake yet or we have and we've received some data
resetReceiveState();
}
// immediately respond with a handshake ACK
static auto handshakeACK = ControlPacket::create(ControlPacket::HandshakeACK, 0);
_parentSocket->writeBasePacket(*handshakeACK, _destination);
// indicate that handshake has been received
_hasReceivedHandshake = true;
}
void Connection::processHandshakeACK(std::unique_ptr<ControlPacket> controlPacket) {
// hand off this handshake ACK to the send queue so it knows it can start sending
getSendQueue().handshakeACK();
// indicate that handshake ACK was received
_hasReceivedHandshakeACK = true;
}
void Connection::processTimeoutNAK(std::unique_ptr<ControlPacket> controlPacket) {
// Override SendQueue's LossList with the timeout NAK list
getSendQueue().overrideNAKListFromPacket(*controlPacket);
@ -604,6 +658,40 @@ void Connection::processTimeoutNAK(std::unique_ptr<ControlPacket> controlPacket)
_stats.record(ConnectionStats::Stats::ReceivedTimeoutNAK);
}
void Connection::resetReceiveState() {
// reset all SequenceNumber member variables back to default
SequenceNumber defaultSequenceNumber;
_lastReceivedSequenceNumber = defaultSequenceNumber;
_lastReceivedAcknowledgedACK = defaultSequenceNumber;
_currentACKSubSequenceNumber = defaultSequenceNumber;
_lastSentACK = defaultSequenceNumber;
// clear the loss list and _lastNAKTime
_lossList.clear();
_lastNAKTime = high_resolution_clock::time_point();
// the _nakInterval need not be reset, that will happen on loss
// clear sync variables
_hasReceivedFirstPacket = false;
_acksDuringSYN = 1;
_lightACKsDuringSYN = 1;
_packetsSinceACK = 0;
// reset RTT to initial value
resetRTT();
// clear the intervals in the receive window
_receiveWindow.reset();
// clear any pending received messages
_pendingReceivedMessages.clear();
}
void Connection::updateRTT(int rtt) {
// This updates the RTT using exponential weighted moving average
// This is the Jacobson's forumla for RTT estimation

View file

@ -63,7 +63,7 @@ public:
void sync(); // rate control method, fired by Socket for all connections on SYN interval
// return indicates if this packet was a duplicate
// return indicates if this packet should be processed
bool processReceivedSequenceNumber(SequenceNumber sequenceNumber, int packetSize, int payloadSize);
void processControl(std::unique_ptr<ControlPacket> controlPacket);
@ -92,6 +92,11 @@ private:
void processACK2(std::unique_ptr<ControlPacket> controlPacket);
void processNAK(std::unique_ptr<ControlPacket> controlPacket);
void processTimeoutNAK(std::unique_ptr<ControlPacket> controlPacket);
void processHandshake(std::unique_ptr<ControlPacket> controlPacket);
void processHandshakeACK(std::unique_ptr<ControlPacket> controlPacket);
void resetReceiveState();
void resetRTT();
SendQueue& getSendQueue();
SequenceNumber nextACK() const;
@ -103,11 +108,13 @@ private:
int _synInterval; // Periodical Rate Control Interval, in microseconds
int _nakInterval; // NAK timeout interval, in microseconds
int _nakInterval { -1 }; // NAK timeout interval, in microseconds, set on loss
int _minNAKInterval { 100000 }; // NAK timeout interval lower bound, default of 100ms
std::chrono::high_resolution_clock::time_point _lastNAKTime;
bool _hasReceivedFirstPacket { false };
bool _hasReceivedHandshake { false }; // flag for receipt of handshake from server
bool _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client
LossList _lossList; // List of all missing packets
SequenceNumber _lastReceivedSequenceNumber; // The largest sequence number received from the peer

View file

@ -100,7 +100,7 @@ void ControlPacket::readType() {
Q_ASSERT_X(bitAndType & CONTROL_BIT_MASK, "ControlPacket::readHeader()", "This should be a control packet");
uint16_t packetType = (bitAndType & ~CONTROL_BIT_MASK) >> (8 * sizeof(Type));
Q_ASSERT_X(packetType <= ControlPacket::Type::TimeoutNAK, "ControlPacket::readType()", "Received a control packet with wrong type");
Q_ASSERT_X(packetType <= ControlPacket::Type::HandshakeACK, "ControlPacket::readType()", "Received a control packet with wrong type");
// read the type
_type = (Type) packetType;

View file

@ -30,7 +30,9 @@ public:
ACK,
ACK2,
NAK,
TimeoutNAK
TimeoutNAK,
Handshake,
HandshakeACK
};
static std::unique_ptr<ControlPacket> create(Type type, qint64 size = -1);

View file

@ -169,7 +169,8 @@ SequenceNumber LossList::popFirstSequenceNumber() {
void LossList::write(ControlPacket& packet, int maxPairs) {
int writtenPairs = 0;
for(const auto& pair : _lossList) {
for (const auto& pair : _lossList) {
packet.writePrimitive(pair.first);
packet.writePrimitive(pair.second);

View file

@ -31,6 +31,11 @@ PacketTimeWindow::PacketTimeWindow(int numPacketIntervals, int numProbeIntervals
}
void PacketTimeWindow::reset() {
_packetIntervals.assign(_numPacketIntervals, DEFAULT_PACKET_INTERVAL_MICROSECONDS);
_probeIntervals.assign(_numProbeIntervals, DEFAULT_PROBE_INTERVAL_MICROSECONDS);
}
template <typename Iterator>
int median(Iterator begin, Iterator end) {
// use std::nth_element to grab the middle - for an even number of elements this is the upper middle

View file

@ -29,6 +29,8 @@ public:
int32_t getPacketReceiveSpeed() const;
int32_t getEstimatedBandwidth() const;
void reset();
private:
int _numPacketIntervals { 0 }; // the number of packet intervals to store
int _numProbeIntervals { 0 }; // the number of probe intervals to store

View file

@ -191,6 +191,12 @@ void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) {
_emptyCondition.notify_one();
}
void SendQueue::handshakeACK() {
std::unique_lock<std::mutex> locker(_handshakeMutex);
_hasReceivedHandshakeACK = true;
_handshakeACKCondition.notify_one();
}
SequenceNumber SendQueue::getNextSequenceNumber() {
_atomicCurrentSequenceNumber = (SequenceNumber::Type)++_currentSequenceNumber;
return _currentSequenceNumber;
@ -228,6 +234,41 @@ void SendQueue::run() {
// Record how long the loop takes to execute
auto loopStartTimestamp = high_resolution_clock::now();
std::unique_lock<std::mutex> handshakeLock { _handshakeMutex };
if (!_hasReceivedHandshakeACK) {
// we haven't received a handshake ACK from the client
// if it has been at least 100ms since we last sent a handshake, send another now
// hold the time of last send in a static
static auto lastSendHandshake = high_resolution_clock::time_point();
static const auto HANDSHAKE_RESEND_INTERVAL_MS = std::chrono::milliseconds(100);
// calculation the duration since the last handshake send
auto sinceLastHandshake = std::chrono::duration_cast<std::chrono::milliseconds>(high_resolution_clock::now()
- lastSendHandshake);
if (sinceLastHandshake >= HANDSHAKE_RESEND_INTERVAL_MS) {
// it has been long enough since last handshake, send another
static auto handshakePacket = ControlPacket::create(ControlPacket::Handshake, 0);
_socket->writeBasePacket(*handshakePacket, _destination);
lastSendHandshake = high_resolution_clock::now();
}
// we wait for the ACK or the re-send interval to expire
_handshakeACKCondition.wait_until(handshakeLock,
high_resolution_clock::now()
+ HANDSHAKE_RESEND_INTERVAL_MS);
// Once we're here we've either received the handshake ACK or it's going to be time to re-send a handshake.
// Either way let's continue processing - no packets will be sent if no handshake ACK has been received.
}
handshakeLock.unlock();
bool sentAPacket = maybeResendPacket();
bool flowWindowFull = false;
@ -253,7 +294,7 @@ void SendQueue::run() {
break;
}
if (!sentAPacket) {
if (_hasReceivedHandshakeACK && !sentAPacket) {
static const std::chrono::seconds CONSIDER_INACTIVE_AFTER { 5 };
if (flowWindowFull && (high_resolution_clock::now() - _flowWindowFullSince) > CONSIDER_INACTIVE_AFTER) {
@ -313,6 +354,7 @@ bool SendQueue::maybeSendNewPacket() {
if (((uint32_t) nextNumber & 0xF) == 0) {
// the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets
// pull off a second packet if we can before we unlock
if (_packets.size() > 0) {
secondPacket.swap(_packets.front());
_packets.pop_front();

View file

@ -63,6 +63,7 @@ public slots:
void ack(SequenceNumber ack);
void nak(SequenceNumber start, SequenceNumber end);
void overrideNAKListFromPacket(ControlPacket& packet);
void handshakeACK();
signals:
void packetSent(int dataSize, int payloadSize);
@ -115,6 +116,10 @@ private:
mutable QReadWriteLock _sentLock; // Protects the sent packet list
std::unordered_map<SequenceNumber, std::unique_ptr<Packet>> _sentPackets; // Packets waiting for ACK.
std::mutex _handshakeMutex; // Protects the handshake ACK condition_variable
bool _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client
std::condition_variable _handshakeACKCondition;
std::condition_variable_any _emptyCondition;
};