Keep track of message numbers

This commit is contained in:
Atlante45 2017-12-13 19:25:24 -08:00 committed by Stephen Birarda
parent 702d2c34a5
commit 34c1c27455
6 changed files with 27 additions and 18 deletions

View file

@ -90,6 +90,9 @@ void Connection::stopSendQueue() {
// tell the send queue to stop and be deleted // tell the send queue to stop and be deleted
sendQueue->stop(); sendQueue->stop();
_lastMessageNumber = sendQueue->getCurrentMessageNumber();
sendQueue->deleteLater(); sendQueue->deleteLater();
// wait on the send queue thread so we know the send queue is gone // wait on the send queue thread so we know the send queue is gone
@ -116,11 +119,11 @@ SendQueue& Connection::getSendQueue() {
if (!_hasReceivedHandshakeACK) { if (!_hasReceivedHandshakeACK) {
// First time creating a send queue for this connection // First time creating a send queue for this connection
_sendQueue = SendQueue::create(_parentSocket, _destination, _initialSequenceNumber - 1, _hasReceivedHandshakeACK); _sendQueue = SendQueue::create(_parentSocket, _destination, _initialSequenceNumber - 1, _lastMessageNumber, _hasReceivedHandshakeACK);
_lastReceivedACK = _sendQueue->getCurrentSequenceNumber(); _lastReceivedACK = _sendQueue->getCurrentSequenceNumber();
} else { } else {
// Connection already has a handshake from a previous send queue // Connection already has a handshake from a previous send queue
_sendQueue = SendQueue::create(_parentSocket, _destination, _lastReceivedACK, _hasReceivedHandshakeACK); _sendQueue = SendQueue::create(_parentSocket, _destination, _lastReceivedACK, _lastMessageNumber, _hasReceivedHandshakeACK);
} }
#ifdef UDT_CONNECTION_DEBUG #ifdef UDT_CONNECTION_DEBUG
@ -417,7 +420,6 @@ void Connection::sendHandshakeRequest() {
} }
bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, int packetSize, int payloadSize) { bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, int packetSize, int payloadSize) {
if (!_hasReceivedHandshake) { if (!_hasReceivedHandshake) {
// Refuse to process any packets until we've received the handshake // Refuse to process any packets until we've received the handshake
// Send handshake request to re-request a handshake // Send handshake request to re-request a handshake
@ -509,7 +511,7 @@ bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, in
} else { } else {
_stats.recordReceivedPackets(payloadSize, packetSize); _stats.recordReceivedPackets(payloadSize, packetSize);
} }
return !wasDuplicate; return !wasDuplicate;
} }

View file

@ -137,6 +137,8 @@ private:
SequenceNumber _initialSequenceNumber; // Randomized on Connection creation, identifies connection during re-connect requests SequenceNumber _initialSequenceNumber; // Randomized on Connection creation, identifies connection during re-connect requests
SequenceNumber _initialReceiveSequenceNumber; // Randomized by peer Connection on creation, identifies connection during re-connect requests SequenceNumber _initialReceiveSequenceNumber; // Randomized by peer Connection on creation, identifies connection during re-connect requests
MessageNumber _lastMessageNumber;
LossList _lossList; // List of all missing packets LossList _lossList; // List of all missing packets
SequenceNumber _lastReceivedSequenceNumber; // The largest sequence number received from the peer SequenceNumber _lastReceivedSequenceNumber; // The largest sequence number received from the peer
SequenceNumber _lastReceivedACK; // The last ACK received SequenceNumber _lastReceivedACK; // The last ACK received

View file

@ -15,7 +15,7 @@
using namespace udt; using namespace udt;
PacketQueue::PacketQueue() { PacketQueue::PacketQueue(MessageNumber messageNumber) : _currentMessageNumber(messageNumber) {
_channels.emplace_back(new std::list<PacketPointer>()); _channels.emplace_back(new std::list<PacketPointer>());
} }

View file

@ -34,7 +34,7 @@ class PacketQueue {
using Channels = std::vector<Channel>; using Channels = std::vector<Channel>;
public: public:
PacketQueue(); PacketQueue(MessageNumber messageNumber = 0);
void queuePacket(PacketPointer packet); void queuePacket(PacketPointer packet);
void queuePacketList(PacketListPointer packetList); void queuePacketList(PacketListPointer packetList);
@ -42,6 +42,8 @@ public:
PacketPointer takePacket(); PacketPointer takePacket();
Mutex& getLock() { return _packetsLock; } Mutex& getLock() { return _packetsLock; }
MessageNumber getCurrentMessageNumber() const { return _currentMessageNumber; }
private: private:
MessageNumber getNextMessageNumber(); MessageNumber getNextMessageNumber();

View file

@ -61,10 +61,12 @@ private:
Mutex2& _mutex2; Mutex2& _mutex2;
}; };
std::unique_ptr<SendQueue> SendQueue::create(Socket* socket, HifiSockAddr destination, SequenceNumber currentSequenceNumber, bool hasReceivedHandshakeACK) { std::unique_ptr<SendQueue> SendQueue::create(Socket* socket, HifiSockAddr destination, SequenceNumber currentSequenceNumber,
MessageNumber currentMessageNumber, bool hasReceivedHandshakeACK) {
Q_ASSERT_X(socket, "SendQueue::create", "Must be called with a valid Socket*"); Q_ASSERT_X(socket, "SendQueue::create", "Must be called with a valid Socket*");
auto queue = std::unique_ptr<SendQueue>(new SendQueue(socket, destination, currentSequenceNumber, hasReceivedHandshakeACK)); auto queue = std::unique_ptr<SendQueue>(new SendQueue(socket, destination, currentSequenceNumber,
currentMessageNumber, hasReceivedHandshakeACK));
// Setup queue private thread // Setup queue private thread
QThread* thread = new QThread; QThread* thread = new QThread;
@ -83,19 +85,16 @@ std::unique_ptr<SendQueue> SendQueue::create(Socket* socket, HifiSockAddr destin
return queue; return queue;
} }
SendQueue::SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSequenceNumber, bool hasReceivedHandshakeACK) : SendQueue::SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSequenceNumber,
MessageNumber currentMessageNumber, bool hasReceivedHandshakeACK) :
_socket(socket), _socket(socket),
_destination(dest) _destination(dest),
_packets(currentMessageNumber)
{ {
// set our member variables from current sequence number // set our member variables from current sequence number
_currentSequenceNumber = currentSequenceNumber; _currentSequenceNumber = currentSequenceNumber;
_atomicCurrentSequenceNumber = uint32_t(_currentSequenceNumber); _atomicCurrentSequenceNumber = uint32_t(_currentSequenceNumber);
_lastACKSequenceNumber = uint32_t(_currentSequenceNumber);
if (hasReceivedHandshakeACK) {
_lastACKSequenceNumber = uint32_t(_currentSequenceNumber);
} else {
_lastACKSequenceNumber = uint32_t(_currentSequenceNumber - 1);
}
_hasReceivedHandshakeACK = hasReceivedHandshakeACK; _hasReceivedHandshakeACK = hasReceivedHandshakeACK;

View file

@ -50,7 +50,9 @@ public:
Stopped Stopped
}; };
static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr destination, SequenceNumber currentSequenceNumber, bool hasReceivedHandshakeACK); static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr destination,
SequenceNumber currentSequenceNumber, MessageNumber currentMessageNumber,
bool hasReceivedHandshakeACK);
virtual ~SendQueue(); virtual ~SendQueue();
@ -58,6 +60,7 @@ public:
void queuePacketList(std::unique_ptr<PacketList> packetList); void queuePacketList(std::unique_ptr<PacketList> packetList);
SequenceNumber getCurrentSequenceNumber() const { return SequenceNumber(_atomicCurrentSequenceNumber); } SequenceNumber getCurrentSequenceNumber() const { return SequenceNumber(_atomicCurrentSequenceNumber); }
MessageNumber getCurrentMessageNumber() const { return _packets.getCurrentMessageNumber(); }
void setFlowWindowSize(int flowWindowSize) { _flowWindowSize = flowWindowSize; } void setFlowWindowSize(int flowWindowSize) { _flowWindowSize = flowWindowSize; }
@ -91,7 +94,8 @@ private slots:
void run(); void run();
private: private:
SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSequenceNumber, bool hasReceivedHandshakeACK); SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSequenceNumber,
MessageNumber currentMessageNumber, bool hasReceivedHandshakeACK);
SendQueue(SendQueue& other) = delete; SendQueue(SendQueue& other) = delete;
SendQueue(SendQueue&& other) = delete; SendQueue(SendQueue&& other) = delete;