Add support for PacketList to SendQueue

This commit is contained in:
Ryan Huffman 2015-08-18 23:05:07 -07:00
parent 9c65849037
commit 5cb028cf43
2 changed files with 51 additions and 0 deletions

View file

@ -20,6 +20,7 @@
#include "ControlPacket.h"
#include "Packet.h"
#include "PacketList.h"
#include "Socket.h"
using namespace udt;
@ -63,6 +64,44 @@ void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
}
}
void SendQueue::queuePacketList(std::unique_ptr<PacketList> packetList) {
Q_ASSERT(packetList->_packets.size() > 0);
{
QWriteLocker locker(&_packetsLock);
auto messageNumber = getNextMessageNumber();
if (packetList->_packets.size() == 1) {
auto packet = packetList->takeFront<Packet>();
packet->setPacketPosition(Packet::PacketPosition::ONLY);
packet->writeMessageNumber(messageNumber);
_packets.push_back(std::move(packet));
} else {
bool haveMarkedFirstPacket = false;
while (!packetList->_packets.empty()) {
auto packet = packetList->takeFront<Packet>();
if (!haveMarkedFirstPacket) {
packet->setPacketPosition(Packet::PacketPosition::FIRST);
haveMarkedFirstPacket = true;
} else if (packetList->_packets.empty()) {
packet->setPacketPosition(Packet::PacketPosition::LAST);
} else {
packet->setPacketPosition(Packet::PacketPosition::MIDDLE);
}
packet->writeMessageNumber(messageNumber);
_packets.push_back(std::move(packet));
}
}
}
if (!this->thread()->isRunning()) {
this->thread()->start();
}
}
void SendQueue::stop() {
_isRunning = false;
}
@ -121,6 +160,12 @@ SequenceNumber SendQueue::getNextSequenceNumber() {
return _currentSequenceNumber;
}
uint32_t SendQueue::getNextMessageNumber() {
static const MessageNumber MAX_MESSAGE_NUMBER = MessageNumber(1) << 30;
_currentMessageNumber = (_currentMessageNumber + 1) % MAX_MESSAGE_NUMBER;
return _currentMessageNumber;
}
void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket, SequenceNumber sequenceNumber) {
// write the sequence number and send the packet
newPacket->writeSequenceNumber(sequenceNumber);

View file

@ -31,7 +31,10 @@ namespace udt {
class BasePacket;
class ControlPacket;
class Packet;
class PacketList;
class Socket;
using MessageNumber = uint32_t;
class SendQueue : public QObject {
Q_OBJECT
@ -40,6 +43,7 @@ public:
static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr destination);
void queuePacket(std::unique_ptr<Packet> packet);
void queuePacketList(std::unique_ptr<PacketList> packetList);
int getQueueSize() const { QReadLocker locker(&_packetsLock); return _packets.size(); }
SequenceNumber getCurrentSequenceNumber() const { return SequenceNumber(_atomicCurrentSequenceNumber); }
@ -73,6 +77,7 @@ private:
// Increments current sequence number and return it
SequenceNumber getNextSequenceNumber();
MessageNumber getNextMessageNumber();
mutable QReadWriteLock _packetsLock; // Protects the packets to be sent list.
std::list<std::unique_ptr<Packet>> _packets; // List of packets to be sent
@ -82,6 +87,7 @@ private:
std::atomic<uint32_t> _lastACKSequenceNumber; // Last ACKed sequence number
MessageNumber _currentMessageNumber { 0 };
SequenceNumber _currentSequenceNumber; // Last sequence number sent out
std::atomic<uint32_t> _atomicCurrentSequenceNumber;// Atomic for last sequence number sent out