implement more of processACK in Connection

This commit is contained in:
Stephen Birarda 2015-07-28 11:47:57 -07:00
parent c32c95c707
commit c7ae4d5e59
4 changed files with 92 additions and 8 deletions

View file

@ -39,6 +39,7 @@ void Connection::sendACK(bool wasCausedBySyncTimeout) {
ackPacket->reset(); // We need to reset it every time.
auto currentTime = high_resolution_clock::now();
static high_resolution_clock::time_point lastACKSendTime;
SeqNum nextACKNumber = nextACK();
@ -58,7 +59,8 @@ void Connection::sendACK(bool wasCausedBySyncTimeout) {
} else if (nextACKNumber == _lastSentACK) {
// We already sent this ACK, but check if we should re-send it.
// We will re-send if it has been more than RTT + (4 * RTT variance) since the last ACK
milliseconds sinceLastACK = duration_cast<milliseconds>(currentTime - _lastACKTime);
milliseconds sinceLastACK = duration_cast<milliseconds>(currentTime - lastACKSendTime);
if (sinceLastACK.count() < (_rtt + (4 * _rttVariance))) {
return;
}
@ -80,7 +82,7 @@ void Connection::sendACK(bool wasCausedBySyncTimeout) {
// pack in the receive speed and bandwidth
// record this as the last ACK send time
_lastACKTime = high_resolution_clock::now();
lastACKSendTime = high_resolution_clock::now();
}
// have the send queue send off our packet
@ -181,14 +183,84 @@ void Connection::processACK(std::unique_ptr<ControlPacket> controlPacket) {
SeqNum currentACKSubSequenceNumber;
controlPacket->readPrimitive(&currentACKSubSequenceNumber);
// read the ACK number
SeqNum nextACKNumber;
controlPacket->readPrimitive(&nextACKNumber);
// check if we need send an ACK2 for this ACK
auto currentTime = high_resolution_clock::now();
static high_resolution_clock::time_point lastACK2SendTime;
milliseconds sinceLastACK2 = duration_cast<milliseconds>(currentTime - lastACK2SendTime);
if (sinceLastACK2.count() > _synInterval || currentACKSubSequenceNumber == _lastSentACK2) {
// setup a static ACK2 packet we will re-use
static const int ACK2_PAYLOAD_BYTES = sizeof(SeqNum);
static auto ack2Packet = ControlPacket::create(ControlPacket::ACK2, ACK2_PAYLOAD_BYTES);
// reset the ACK2 Packet before writing the sub-sequence number to it
ack2Packet->reset();
// write the sub sequence number for this ACK2
ack2Packet->writePrimitive(currentACKSubSequenceNumber);
// update the last sent ACK2 and the last ACK2 send time
_lastSentACK2 = currentACKSubSequenceNumber;
lastACK2SendTime = high_resolution_clock::now();
}
// read the ACKed sequence number
SeqNum ack;
controlPacket->readPrimitive(&ack);
// validate that this isn't a BS ACK
if (ack > (_sendQueue->getCurrentSeqNum() + 1)) {
// in UDT they specifically break the connection here - do we want to do anything?
return;
}
// read the RTT and variance
int32_t rtt, rttVariance;
controlPacket->readPrimitive(&rtt);
controlPacket->readPrimitive(&rttVariance);
// read the desired flow window size
int flowWindowSize;
controlPacket->readPrimitive(&flowWindowSize);
if (ack <= _lastReceivedACK) {
// this is a valid ACKed sequence number - update the flow window size and the last received ACK
_flowWindowSize = flowWindowSize;
_lastReceivedACK = ack;
}
// make sure this isn't a repeated ACK
if (ack <= SeqNum(_atomicLastReceivedACK)) {
return;
}
// ACK the send queue so it knows what was received
_sendQueue->ack(ack);
// update the atomic for last received ACK, the send queue uses this to re-transmit
_atomicLastReceivedACK.store((uint32_t) _lastReceivedACK);
// remove everything up to this ACK from the sender loss list
// update the RTT
_rttVariance = (_rttVariance * 3 + abs(rtt - _rtt)) >> 2;
_rtt = (_rtt * 7 + rtt) >> 3;
// set the RTT for congestion control
if (controlPacket->getPayloadSize() > (qint64) (sizeof(SeqNum) + sizeof(SeqNum) + sizeof(rtt) + sizeof(rttVariance))) {
int32_t deliveryRate, bandwidth;
controlPacket->readPrimitive(&deliveryRate);
controlPacket->readPrimitive(&bandwidth);
// set the delivery rate and bandwidth for congestion control
}
// fire the onACK callback for congestion control
// update the total count of received ACKs
++_totalReceivedACKs;
}
void Connection::processLightACK(std::unique_ptr<ControlPacket> controlPacket) {

View file

@ -38,6 +38,8 @@ public:
SeqNum nextACK() const;
SeqNum getLastReceivedACK() const { return SeqNum(_atomicLastReceivedACK); }
void processReceivedSeqNum(SeqNum seq);
void processControl(std::unique_ptr<ControlPacket> controlPacket);
@ -47,19 +49,24 @@ private:
void processACK2(std::unique_ptr<ControlPacket> controlPacket);
void processNAK(std::unique_ptr<ControlPacket> controlPacket);
int _synInterval; // Periodical Rate Control Interval, defaults to 10ms
LossList _lossList; // List of all missing packets
SeqNum _largestReceivedSeqNum; // The largest sequence number received from the peer
SeqNum _lastSentACK; // The last sent ACK
SeqNum _lastReceivedACK; // The last ACK received
std::atomic<uint32_t> _atomicLastReceivedACK; // Atomic for thread-safe get of last ACK received
SeqNum _lastReceivedAcknowledgedACK; // The last sent ACK that has been acknowledged via an ACK2 from the peer
SeqNum _currentACKSubSequenceNumber; // The current ACK sub-sequence number (used for Acknowledgment of ACKs)
SeqNum _lastSentACK; // The last sent ACK
SeqNum _lastSentACK2; // The last sent ACK sub-sequence number in an ACK2
int _totalReceivedACKs { 0 };
int32_t _rtt; // RTT, in milliseconds
int32_t _rttVariance; // RTT variance
int _flowWindowSize; // Flow control window size
std::chrono::high_resolution_clock::time_point _lastACKTime;
std::unique_ptr<SendQueue> _sendQueue;
};

View file

@ -114,6 +114,7 @@ void SendQueue::sendNextPacket() {
if (_nextPacket) {
_nextPacket->writeSequenceNumber(++_currentSeqNum);
sendPacket(*_nextPacket);
_atomicCurrentSeqNum.store((uint32_t) _currentSeqNum);
// Insert the packet we have just sent in the sent list
QWriteLocker locker(&_sentLock);

View file

@ -42,6 +42,8 @@ public:
quint64 getLastSendTimestamp() const { return _lastSendTimestamp; }
SeqNum getCurrentSeqNum() const { return SeqNum(_atomicCurrentSeqNum); }
int getPacketSendPeriod() const { return _packetSendPeriod; }
void setPacketSendPeriod(int newPeriod) { _packetSendPeriod = newPeriod; }
@ -73,6 +75,8 @@ private:
SeqNum _currentSeqNum; // Last sequence number sent out
SeqNum _lastAck; // ACKed sequence number
std::atomic<uint32_t> _atomicCurrentSeqNum; // Atomic for last sequence number sent out
std::unique_ptr<QTimer> _sendTimer; // Send timer
std::atomic<int> _packetSendPeriod { 0 }; // Interval between two packet send envent in msec
std::atomic<quint64> _lastSendTimestamp { 0 }; // Record last time of packet departure