Keep track of message numbers

This commit is contained in:
Atlante45 2017-12-13 19:25:24 -08:00
parent 44190f16c0
commit ad547f483d
6 changed files with 27 additions and 18 deletions

View file

@ -90,6 +90,9 @@ void Connection::stopSendQueue() {
// tell the send queue to stop and be deleted
sendQueue->stop();
_lastMessageNumber = sendQueue->getCurrentMessageNumber();
sendQueue->deleteLater();
// wait on the send queue thread so we know the send queue is gone
@ -116,11 +119,11 @@ SendQueue& Connection::getSendQueue() {
if (!_hasReceivedHandshakeACK) {
// First time creating a send queue for this connection
_sendQueue = SendQueue::create(_parentSocket, _destination, _initialSequenceNumber - 1, _hasReceivedHandshakeACK);
_sendQueue = SendQueue::create(_parentSocket, _destination, _initialSequenceNumber - 1, _lastMessageNumber, _hasReceivedHandshakeACK);
_lastReceivedACK = _sendQueue->getCurrentSequenceNumber();
} else {
// Connection already has a handshake from a previous send queue
_sendQueue = SendQueue::create(_parentSocket, _destination, _lastReceivedACK, _hasReceivedHandshakeACK);
_sendQueue = SendQueue::create(_parentSocket, _destination, _lastReceivedACK, _lastMessageNumber, _hasReceivedHandshakeACK);
}
#ifdef UDT_CONNECTION_DEBUG
@ -417,7 +420,6 @@ void Connection::sendHandshakeRequest() {
}
bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, int packetSize, int payloadSize) {
if (!_hasReceivedHandshake) {
// Refuse to process any packets until we've received the handshake
// Send handshake request to re-request a handshake
@ -509,7 +511,7 @@ bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, in
} else {
_stats.recordReceivedPackets(payloadSize, packetSize);
}
return !wasDuplicate;
}

View file

@ -137,6 +137,8 @@ private:
SequenceNumber _initialSequenceNumber; // Randomized on Connection creation, identifies connection during re-connect requests
SequenceNumber _initialReceiveSequenceNumber; // Randomized by peer Connection on creation, identifies connection during re-connect requests
MessageNumber _lastMessageNumber;
LossList _lossList; // List of all missing packets
SequenceNumber _lastReceivedSequenceNumber; // The largest sequence number received from the peer
SequenceNumber _lastReceivedACK; // The last ACK received

View file

@ -15,7 +15,7 @@
using namespace udt;
PacketQueue::PacketQueue() {
PacketQueue::PacketQueue(MessageNumber messageNumber) : _currentMessageNumber(messageNumber) {
_channels.emplace_back(new std::list<PacketPointer>());
}

View file

@ -34,7 +34,7 @@ class PacketQueue {
using Channels = std::vector<Channel>;
public:
PacketQueue();
PacketQueue(MessageNumber messageNumber = 0);
void queuePacket(PacketPointer packet);
void queuePacketList(PacketListPointer packetList);
@ -42,6 +42,8 @@ public:
PacketPointer takePacket();
Mutex& getLock() { return _packetsLock; }
MessageNumber getCurrentMessageNumber() const { return _currentMessageNumber; }
private:
MessageNumber getNextMessageNumber();

View file

@ -61,10 +61,12 @@ private:
Mutex2& _mutex2;
};
std::unique_ptr<SendQueue> SendQueue::create(Socket* socket, HifiSockAddr destination, SequenceNumber currentSequenceNumber, bool hasReceivedHandshakeACK) {
std::unique_ptr<SendQueue> SendQueue::create(Socket* socket, HifiSockAddr destination, SequenceNumber currentSequenceNumber,
MessageNumber currentMessageNumber, bool hasReceivedHandshakeACK) {
Q_ASSERT_X(socket, "SendQueue::create", "Must be called with a valid Socket*");
auto queue = std::unique_ptr<SendQueue>(new SendQueue(socket, destination, currentSequenceNumber, hasReceivedHandshakeACK));
auto queue = std::unique_ptr<SendQueue>(new SendQueue(socket, destination, currentSequenceNumber,
currentMessageNumber, hasReceivedHandshakeACK));
// Setup queue private thread
QThread* thread = new QThread;
@ -83,19 +85,16 @@ std::unique_ptr<SendQueue> SendQueue::create(Socket* socket, HifiSockAddr destin
return queue;
}
SendQueue::SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSequenceNumber, bool hasReceivedHandshakeACK) :
SendQueue::SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSequenceNumber,
MessageNumber currentMessageNumber, bool hasReceivedHandshakeACK) :
_socket(socket),
_destination(dest)
_destination(dest),
_packets(currentMessageNumber)
{
// set our member variables from current sequence number
_currentSequenceNumber = currentSequenceNumber;
_atomicCurrentSequenceNumber = uint32_t(_currentSequenceNumber);
if (hasReceivedHandshakeACK) {
_lastACKSequenceNumber = uint32_t(_currentSequenceNumber);
} else {
_lastACKSequenceNumber = uint32_t(_currentSequenceNumber - 1);
}
_lastACKSequenceNumber = uint32_t(_currentSequenceNumber);
_hasReceivedHandshakeACK = hasReceivedHandshakeACK;

View file

@ -50,7 +50,9 @@ public:
Stopped
};
static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr destination, SequenceNumber currentSequenceNumber, bool hasReceivedHandshakeACK);
static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr destination,
SequenceNumber currentSequenceNumber, MessageNumber currentMessageNumber,
bool hasReceivedHandshakeACK);
virtual ~SendQueue();
@ -58,6 +60,7 @@ public:
void queuePacketList(std::unique_ptr<PacketList> packetList);
SequenceNumber getCurrentSequenceNumber() const { return SequenceNumber(_atomicCurrentSequenceNumber); }
MessageNumber getCurrentMessageNumber() const { return _packets.getCurrentMessageNumber(); }
void setFlowWindowSize(int flowWindowSize) { _flowWindowSize = flowWindowSize; }
@ -91,7 +94,8 @@ private slots:
void run();
private:
SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSequenceNumber, bool hasReceivedHandshakeACK);
SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSequenceNumber,
MessageNumber currentMessageNumber, bool hasReceivedHandshakeACK);
SendQueue(SendQueue& other) = delete;
SendQueue(SendQueue&& other) = delete;