Make static control packets class members

This commit is contained in:
Atlante45 2016-10-03 13:46:55 -07:00
parent 769a29332c
commit 573d5898cc
2 changed files with 65 additions and 59 deletions

View file

@ -45,6 +45,20 @@ Connection::Connection(Socket* parentSocket, HifiSockAddr destination, std::uniq
// set the initial RTT and flow window size on congestion control object // set the initial RTT and flow window size on congestion control object
_congestionControl->setRTT(_rtt); _congestionControl->setRTT(_rtt);
_congestionControl->setMaxCongestionWindowSize(_flowWindowSize); _congestionControl->setMaxCongestionWindowSize(_flowWindowSize);
// Setup packets
static const int ACK_PACKET_PAYLOAD_BYTES = sizeof(_lastSentACK) + sizeof(_currentACKSubSequenceNumber)
+ sizeof(_rtt) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t);
static const int LIGHT_ACK_PACKET_PAYLOAD_BYTES = sizeof(SequenceNumber);
static const int ACK2_PAYLOAD_BYTES = sizeof(SequenceNumber);
static const int NAK_PACKET_PAYLOAD_BYTES = 2 * sizeof(SequenceNumber);
static const int HANDSHAKE_ACK_PAYLOAD_BYTES = sizeof(SequenceNumber);
_ackPacket = ControlPacket::create(ControlPacket::ACK, ACK_PACKET_PAYLOAD_BYTES);
_lightACKPacket = ControlPacket::create(ControlPacket::LightACK, LIGHT_ACK_PACKET_PAYLOAD_BYTES);
_ack2Packet = ControlPacket::create(ControlPacket::ACK2, ACK2_PAYLOAD_BYTES);
_lossReport = ControlPacket::create(ControlPacket::NAK, NAK_PACKET_PAYLOAD_BYTES);
_handshakeACK = ControlPacket::create(ControlPacket::HandshakeACK, HANDSHAKE_ACK_PAYLOAD_BYTES);
} }
Connection::~Connection() { Connection::~Connection() {
@ -279,25 +293,22 @@ void Connection::sendACK(bool wasCausedBySyncTimeout) {
// update the last sent ACK // update the last sent ACK
_lastSentACK = nextACKNumber; _lastSentACK = nextACKNumber;
// setup the ACK packet, make it static so we can re-use it
static const int ACK_PACKET_PAYLOAD_BYTES = sizeof(_lastSentACK) + sizeof(_currentACKSubSequenceNumber) _ackPacket->reset(); // We need to reset it every time.
+ sizeof(_rtt) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t);
static auto ackPacket = ControlPacket::create(ControlPacket::ACK, ACK_PACKET_PAYLOAD_BYTES);
ackPacket->reset(); // We need to reset it every time.
// pack in the ACK sub-sequence number // pack in the ACK sub-sequence number
ackPacket->writePrimitive(++_currentACKSubSequenceNumber); _ackPacket->writePrimitive(++_currentACKSubSequenceNumber);
// pack in the ACK number // pack in the ACK number
ackPacket->writePrimitive(nextACKNumber); _ackPacket->writePrimitive(nextACKNumber);
// pack in the RTT and variance // pack in the RTT and variance
ackPacket->writePrimitive(_rtt); _ackPacket->writePrimitive(_rtt);
// pack the available buffer size, in packets // pack the available buffer size, in packets
// in our implementation we have no hard limit on receive buffer size, send the default value // in our implementation we have no hard limit on receive buffer size, send the default value
ackPacket->writePrimitive((int32_t) udt::CONNECTION_RECEIVE_BUFFER_SIZE_PACKETS); _ackPacket->writePrimitive((int32_t) udt::CONNECTION_RECEIVE_BUFFER_SIZE_PACKETS);
if (wasCausedBySyncTimeout) { if (wasCausedBySyncTimeout) {
// grab the up to date packet receive speed and estimated bandwidth // grab the up to date packet receive speed and estimated bandwidth
@ -309,15 +320,15 @@ void Connection::sendACK(bool wasCausedBySyncTimeout) {
_stats.recordEstimatedBandwidth(estimatedBandwidth); _stats.recordEstimatedBandwidth(estimatedBandwidth);
// pack in the receive speed and estimatedBandwidth // pack in the receive speed and estimatedBandwidth
ackPacket->writePrimitive(packetReceiveSpeed); _ackPacket->writePrimitive(packetReceiveSpeed);
ackPacket->writePrimitive(estimatedBandwidth); _ackPacket->writePrimitive(estimatedBandwidth);
} }
// record this as the last ACK send time // record this as the last ACK send time
lastACKSendTime = p_high_resolution_clock::now(); lastACKSendTime = p_high_resolution_clock::now();
// have the socket send off our packet // have the socket send off our packet
_parentSocket->writeBasePacket(*ackPacket, _destination); _parentSocket->writeBasePacket(*_ackPacket, _destination);
Q_ASSERT_X(_sentACKs.empty() || _sentACKs.back().first + 1 == _currentACKSubSequenceNumber, Q_ASSERT_X(_sentACKs.empty() || _sentACKs.back().first + 1 == _currentACKSubSequenceNumber,
"Connection::sendACK", "Adding an invalid ACK to _sentACKs"); "Connection::sendACK", "Adding an invalid ACK to _sentACKs");
@ -339,35 +350,27 @@ void Connection::sendLightACK() {
return; return;
} }
// create the light ACK packet, make it static so we can re-use it
static const int LIGHT_ACK_PACKET_PAYLOAD_BYTES = sizeof(SequenceNumber);
static auto lightACKPacket = ControlPacket::create(ControlPacket::LightACK, LIGHT_ACK_PACKET_PAYLOAD_BYTES);
// reset the lightACKPacket before we go to write the ACK to it // reset the lightACKPacket before we go to write the ACK to it
lightACKPacket->reset(); _lightACKPacket->reset();
// pack in the ACK // pack in the ACK
lightACKPacket->writePrimitive(nextACKNumber); _lightACKPacket->writePrimitive(nextACKNumber);
// have the socket send off our packet immediately // have the socket send off our packet immediately
_parentSocket->writeBasePacket(*lightACKPacket, _destination); _parentSocket->writeBasePacket(*_lightACKPacket, _destination);
_stats.record(ConnectionStats::Stats::SentLightACK); _stats.record(ConnectionStats::Stats::SentLightACK);
} }
void Connection::sendACK2(SequenceNumber currentACKSubSequenceNumber) { void Connection::sendACK2(SequenceNumber currentACKSubSequenceNumber) {
// setup a static ACK2 packet we will re-use
static const int ACK2_PAYLOAD_BYTES = sizeof(SequenceNumber);
static auto ack2Packet = ControlPacket::create(ControlPacket::ACK2, ACK2_PAYLOAD_BYTES);
// reset the ACK2 Packet before writing the sub-sequence number to it // reset the ACK2 Packet before writing the sub-sequence number to it
ack2Packet->reset(); _ack2Packet->reset();
// write the sub sequence number for this ACK2 // write the sub sequence number for this ACK2
ack2Packet->writePrimitive(currentACKSubSequenceNumber); _ack2Packet->writePrimitive(currentACKSubSequenceNumber);
// send the ACK2 packet // send the ACK2 packet
_parentSocket->writeBasePacket(*ack2Packet, _destination); _parentSocket->writeBasePacket(*_ack2Packet, _destination);
// update the last sent ACK2 and the last ACK2 send time // update the last sent ACK2 and the last ACK2 send time
_lastSentACK2 = currentACKSubSequenceNumber; _lastSentACK2 = currentACKSubSequenceNumber;
@ -376,19 +379,16 @@ void Connection::sendACK2(SequenceNumber currentACKSubSequenceNumber) {
} }
void Connection::sendNAK(SequenceNumber sequenceNumberRecieved) { void Connection::sendNAK(SequenceNumber sequenceNumberRecieved) {
// create the loss report packet, make it static so we can re-use it _lossReport->reset(); // We need to reset it every time.
static const int NAK_PACKET_PAYLOAD_BYTES = 2 * sizeof(SequenceNumber);
static auto lossReport = ControlPacket::create(ControlPacket::NAK, NAK_PACKET_PAYLOAD_BYTES);
lossReport->reset(); // We need to reset it every time.
// pack in the loss report // pack in the loss report
lossReport->writePrimitive(_lastReceivedSequenceNumber + 1); _lossReport->writePrimitive(_lastReceivedSequenceNumber + 1);
if (_lastReceivedSequenceNumber + 1 != sequenceNumberRecieved - 1) { if (_lastReceivedSequenceNumber + 1 != sequenceNumberRecieved - 1) {
lossReport->writePrimitive(sequenceNumberRecieved - 1); _lossReport->writePrimitive(sequenceNumberRecieved - 1);
} }
// have the parent socket send off our packet immediately // have the parent socket send off our packet immediately
_parentSocket->writeBasePacket(*lossReport, _destination); _parentSocket->writeBasePacket(*_lossReport, _destination);
// record our last NAK time // record our last NAK time
_lastNAKTime = p_high_resolution_clock::now(); _lastNAKTime = p_high_resolution_clock::now();
@ -519,7 +519,7 @@ bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, in
return !wasDuplicate; return !wasDuplicate;
} }
void Connection::processControl(std::unique_ptr<ControlPacket> controlPacket) { void Connection::processControl(ControlPacketPointer controlPacket) {
// Simple dispatch to control packets processing methods based on their type. // Simple dispatch to control packets processing methods based on their type.
@ -577,7 +577,7 @@ void Connection::processControl(std::unique_ptr<ControlPacket> controlPacket) {
} }
} }
void Connection::processACK(std::unique_ptr<ControlPacket> controlPacket) { void Connection::processACK(ControlPacketPointer controlPacket) {
// read the ACK sub-sequence number // read the ACK sub-sequence number
SequenceNumber currentACKSubSequenceNumber; SequenceNumber currentACKSubSequenceNumber;
controlPacket->readPrimitive(&currentACKSubSequenceNumber); controlPacket->readPrimitive(&currentACKSubSequenceNumber);
@ -678,7 +678,7 @@ void Connection::processACK(std::unique_ptr<ControlPacket> controlPacket) {
_stats.record(ConnectionStats::Stats::ProcessedACK); _stats.record(ConnectionStats::Stats::ProcessedACK);
} }
void Connection::processLightACK(std::unique_ptr<ControlPacket> controlPacket) { void Connection::processLightACK(ControlPacketPointer controlPacket) {
// read the ACKed sequence number // read the ACKed sequence number
SequenceNumber ack; SequenceNumber ack;
controlPacket->readPrimitive(&ack); controlPacket->readPrimitive(&ack);
@ -702,7 +702,7 @@ void Connection::processLightACK(std::unique_ptr<ControlPacket> controlPacket) {
_stats.record(ConnectionStats::Stats::ReceivedLightACK); _stats.record(ConnectionStats::Stats::ReceivedLightACK);
} }
void Connection::processACK2(std::unique_ptr<ControlPacket> controlPacket) { void Connection::processACK2(ControlPacketPointer controlPacket) {
// pull the sub sequence number from the packet // pull the sub sequence number from the packet
SequenceNumber subSequenceNumber; SequenceNumber subSequenceNumber;
controlPacket->readPrimitive(&subSequenceNumber); controlPacket->readPrimitive(&subSequenceNumber);
@ -742,7 +742,7 @@ void Connection::processACK2(std::unique_ptr<ControlPacket> controlPacket) {
_stats.record(ConnectionStats::Stats::ReceivedACK2); _stats.record(ConnectionStats::Stats::ReceivedACK2);
} }
void Connection::processNAK(std::unique_ptr<ControlPacket> controlPacket) { void Connection::processNAK(ControlPacketPointer controlPacket) {
// read the loss report // read the loss report
SequenceNumber start, end; SequenceNumber start, end;
controlPacket->readPrimitive(&start); controlPacket->readPrimitive(&start);
@ -764,7 +764,7 @@ void Connection::processNAK(std::unique_ptr<ControlPacket> controlPacket) {
_stats.record(ConnectionStats::Stats::ReceivedNAK); _stats.record(ConnectionStats::Stats::ReceivedNAK);
} }
void Connection::processHandshake(std::unique_ptr<ControlPacket> controlPacket) { void Connection::processHandshake(ControlPacketPointer controlPacket) {
SequenceNumber initialSequenceNumber; SequenceNumber initialSequenceNumber;
controlPacket->readPrimitive(&initialSequenceNumber); controlPacket->readPrimitive(&initialSequenceNumber);
@ -782,18 +782,16 @@ void Connection::processHandshake(std::unique_ptr<ControlPacket> controlPacket)
_lastReceivedSequenceNumber = initialSequenceNumber - 1; _lastReceivedSequenceNumber = initialSequenceNumber - 1;
_lastSentACK = initialSequenceNumber - 1; _lastSentACK = initialSequenceNumber - 1;
} }
// immediately respond with a handshake ACK _handshakeACK->reset();
static auto handshakeACK = ControlPacket::create(ControlPacket::HandshakeACK, sizeof(SequenceNumber)); _handshakeACK->writePrimitive(initialSequenceNumber);
handshakeACK->seek(0); _parentSocket->writeBasePacket(*_handshakeACK, _destination);
handshakeACK->writePrimitive(initialSequenceNumber);
_parentSocket->writeBasePacket(*handshakeACK, _destination);
// indicate that handshake has been received // indicate that handshake has been received
_hasReceivedHandshake = true; _hasReceivedHandshake = true;
} }
void Connection::processHandshakeACK(std::unique_ptr<ControlPacket> controlPacket) { void Connection::processHandshakeACK(ControlPacketPointer controlPacket) {
// if we've decided to clean up the send queue then this handshake ACK should be ignored, it's useless // if we've decided to clean up the send queue then this handshake ACK should be ignored, it's useless
if (_sendQueue) { if (_sendQueue) {
SequenceNumber initialSequenceNumber; SequenceNumber initialSequenceNumber;
@ -807,7 +805,7 @@ void Connection::processHandshakeACK(std::unique_ptr<ControlPacket> controlPacke
} }
} }
void Connection::processTimeoutNAK(std::unique_ptr<ControlPacket> controlPacket) { void Connection::processTimeoutNAK(ControlPacketPointer controlPacket) {
// Override SendQueue's LossList with the timeout NAK list // Override SendQueue's LossList with the timeout NAK list
getSendQueue().overrideNAKListFromPacket(*controlPacket); getSendQueue().overrideNAKListFromPacket(*controlPacket);
@ -817,7 +815,7 @@ void Connection::processTimeoutNAK(std::unique_ptr<ControlPacket> controlPacket)
_stats.record(ConnectionStats::Stats::ReceivedTimeoutNAK); _stats.record(ConnectionStats::Stats::ReceivedTimeoutNAK);
} }
void Connection::processProbeTail(std::unique_ptr<ControlPacket> controlPacket) { void Connection::processProbeTail(ControlPacketPointer controlPacket) {
if (((uint32_t) _lastReceivedSequenceNumber & 0xF) == 0) { if (((uint32_t) _lastReceivedSequenceNumber & 0xF) == 0) {
// this is the second packet in a probe set so we can estimate bandwidth // this is the second packet in a probe set so we can estimate bandwidth
// the sender sent this to us in lieu of sending new data (because they didn't have any) // the sender sent this to us in lieu of sending new data (because they didn't have any)

View file

@ -55,6 +55,7 @@ public:
using SequenceNumberTimePair = std::pair<SequenceNumber, p_high_resolution_clock::time_point>; using SequenceNumberTimePair = std::pair<SequenceNumber, p_high_resolution_clock::time_point>;
using ACKListPair = std::pair<SequenceNumber, SequenceNumberTimePair>; using ACKListPair = std::pair<SequenceNumber, SequenceNumberTimePair>;
using SentACKList = std::list<ACKListPair>; using SentACKList = std::list<ACKListPair>;
using ControlPacketPointer = std::unique_ptr<ControlPacket>;
Connection(Socket* parentSocket, HifiSockAddr destination, std::unique_ptr<CongestionControl> congestionControl); Connection(Socket* parentSocket, HifiSockAddr destination, std::unique_ptr<CongestionControl> congestionControl);
~Connection(); ~Connection();
@ -66,7 +67,7 @@ public:
// return indicates if this packet should be processed // return indicates if this packet should be processed
bool processReceivedSequenceNumber(SequenceNumber sequenceNumber, int packetSize, int payloadSize); bool processReceivedSequenceNumber(SequenceNumber sequenceNumber, int packetSize, int payloadSize);
void processControl(std::unique_ptr<ControlPacket> controlPacket); void processControl(ControlPacketPointer controlPacket);
void queueReceivedMessagePacket(std::unique_ptr<Packet> packet); void queueReceivedMessagePacket(std::unique_ptr<Packet> packet);
@ -96,14 +97,14 @@ private:
void sendNAK(SequenceNumber sequenceNumberRecieved); void sendNAK(SequenceNumber sequenceNumberRecieved);
void sendTimeoutNAK(); void sendTimeoutNAK();
void processACK(std::unique_ptr<ControlPacket> controlPacket); void processACK(ControlPacketPointer controlPacket);
void processLightACK(std::unique_ptr<ControlPacket> controlPacket); void processLightACK(ControlPacketPointer controlPacket);
void processACK2(std::unique_ptr<ControlPacket> controlPacket); void processACK2(ControlPacketPointer controlPacket);
void processNAK(std::unique_ptr<ControlPacket> controlPacket); void processNAK(ControlPacketPointer controlPacket);
void processTimeoutNAK(std::unique_ptr<ControlPacket> controlPacket); void processTimeoutNAK(ControlPacketPointer controlPacket);
void processHandshake(std::unique_ptr<ControlPacket> controlPacket); void processHandshake(ControlPacketPointer controlPacket);
void processHandshakeACK(std::unique_ptr<ControlPacket> controlPacket); void processHandshakeACK(ControlPacketPointer controlPacket);
void processProbeTail(std::unique_ptr<ControlPacket> controlPacket); void processProbeTail(ControlPacketPointer controlPacket);
void resetReceiveState(); void resetReceiveState();
void resetRTT(); void resetRTT();
@ -171,7 +172,14 @@ private:
std::map<MessageNumber, PendingReceivedMessage> _pendingReceivedMessages; std::map<MessageNumber, PendingReceivedMessage> _pendingReceivedMessages;
int _packetsSinceACK { 0 }; // The number of packets that have been received during the current ACK interval int _packetsSinceACK { 0 }; // The number of packets that have been received during the current ACK interval
// Re-used control packets
ControlPacketPointer _ackPacket;
ControlPacketPointer _lightACKPacket;
ControlPacketPointer _ack2Packet;
ControlPacketPointer _lossReport;
ControlPacketPointer _handshakeACK;
ConnectionStats _stats; ConnectionStats _stats;
}; };