add methods to send ACKs from Connection

This commit is contained in:
Stephen Birarda 2015-07-27 16:43:49 -07:00
parent 75a722f63c
commit 90d2515674
8 changed files with 82 additions and 31 deletions

View file

@ -28,15 +28,47 @@ void Connection::send(std::unique_ptr<Packet> packet) {
}
}
void Connection::sendACK() {
static const int ACK_PACKET_PAYLOAD_BYTES = 8;
// setup the ACK packet, make it static so we can re-use it
static auto ackPacket = ControlPacket::create(ControlPacket::ACK, ACK_PACKET_PAYLOAD_BYTES);
// have the send queue send off our packet
_sendQueue->sendPacket(*ackPacket);
}
void Connection::sendLightACK() const {
static const int LIGHT_ACK_PACKET_PAYLOAD_BYTES = 4;
// create the light ACK packet, make it static so we can re-use it
static auto lightACKPacket = ControlPacket::create(ControlPacket::ACK, LIGHT_ACK_PACKET_PAYLOAD_BYTES);
SeqNum nextACKNumber = nextACK();
if (nextACKNumber != _lastReceivedAcknowledgedACK) {
// pack in the ACK
memcpy(lightACKPacket->getPayload(), &nextACKNumber, sizeof(nextACKNumber));
// have the send queue send off our packet
_sendQueue->sendPacket(*lightACKPacket);
}
}
SeqNum Connection::nextACK() const {
// TODO: check if we have a loss list
return _largestReceivedSeqNum + 1;
}
void Connection::processReceivedSeqNum(SeqNum seq) {
if (udt::seqcmp(seq, _largestRecievedSeqNum + 1) > 0) {
if (udt::seqcmp(seq, _largestReceivedSeqNum + 1) > 0) {
// TODO: Add range to loss list
// TODO: Send loss report
}
if (seq > _largestRecievedSeqNum) {
_largestRecievedSeqNum = seq;
if (seq > _largestReceivedSeqNum) {
_largestReceivedSeqNum = seq;
} else {
// TODO: Remove seq from loss list
}
@ -44,13 +76,13 @@ void Connection::processReceivedSeqNum(SeqNum seq) {
void Connection::processControl(std::unique_ptr<ControlPacket> controlPacket) {
switch (controlPacket->getType()) {
case ControlPacket::Type::ACK:
case ControlPacket::ACK:
break;
case ControlPacket::Type::ACK2:
case ControlPacket::ACK2:
break;
case ControlPacket::Type::NAK:
case ControlPacket::NAK:
break;
case ControlPacket::Type::PacketPair:
case ControlPacket::PacketPair:
break;
}
}

View file

@ -31,12 +31,17 @@ public:
void send(std::unique_ptr<Packet> packet);
void sendACK();
void sendLightACK() const;
SeqNum nextACK() const;
void processReceivedSeqNum(SeqNum seq);
void processControl(std::unique_ptr<ControlPacket> controlPacket);
private:
SeqNum _largestRecievedSeqNum;
SeqNum _largestReceivedSeqNum;
SeqNum _lastReceivedAcknowledgedACK; // The last sent ACK that has been acknowledged via an ACK2 from the peer
std::unique_ptr<SendQueue> _sendQueue;
};

View file

@ -13,8 +13,22 @@
using namespace udt;
std::unique_ptr<ControlPacket> ControlPacket::create(Type type, const SequenceNumberList& sequenceNumbers) {
return ControlPacket::create(type, sequenceNumbers);
std::unique_ptr<ControlPacket> ControlPacket::create(Type type, qint64 size) {
std::unique_ptr<ControlPacket> controlPacket;
if (size == -1) {
controlPacket = ControlPacket::create(type);
} else {
// Fail with invalid size
Q_ASSERT(size >= 0);
controlPacket = ControlPacket::create(type, size);
}
controlPacket->open(QIODevice::ReadWrite);
return controlPacket;
}
ControlPacket::ControlPacketPair ControlPacket::createPacketPair(quint64 timestamp) {
@ -32,18 +46,18 @@ qint64 ControlPacket::totalHeadersSize() const {
return BasePacket::totalHeadersSize() + localHeaderSize();
}
ControlPacket::ControlPacket(Type type, const SequenceNumberList& sequenceNumbers) :
BasePacket(localHeaderSize() + (sizeof(Packet::SequenceNumber) * sequenceNumbers.size())),
ControlPacket::ControlPacket(Type type) :
BasePacket(-1),
_type(type)
{
adjustPayloadStartAndCapacity();
}
ControlPacket::ControlPacket(Type type, qint64 size) :
BasePacket(localHeaderSize() + size),
_type(type)
{
adjustPayloadStartAndCapacity();
open(QIODevice::ReadWrite);
// pack in the sequence numbers
for (auto& sequenceNumber : sequenceNumbers) {
writePrimitive(sequenceNumber);
}
}
ControlPacket::ControlPacket(quint64 timestamp) :

View file

@ -21,23 +21,21 @@
namespace udt {
using SequenceNumberList = std::vector<Packet::SequenceNumber>;
class ControlPacket : public BasePacket {
Q_OBJECT
public:
using TypeAndSubSequenceNumber = uint32_t;
using ControlPacketPair = std::pair<std::unique_ptr<ControlPacket>, std::unique_ptr<ControlPacket>>;
enum class Type : uint16_t {
enum Type : uint16_t {
ACK,
ACK2,
NAK,
PacketPair
};
std::unique_ptr<ControlPacket> create(Type type, const SequenceNumberList& sequenceNumbers);
ControlPacketPair createPacketPair(quint64 timestamp);
static std::unique_ptr<ControlPacket> create(Type type, qint64 size = -1);
static ControlPacketPair createPacketPair(quint64 timestamp);
static qint64 localHeaderSize(); // Current level's header size
virtual qint64 totalHeadersSize() const; // Cumulated size of all the headers
@ -45,7 +43,8 @@ public:
Type getType() const { return _type; }
private:
ControlPacket(Type type, const SequenceNumberList& sequenceNumbers);
ControlPacket(Type type);
ControlPacket(Type type, qint64 size);
ControlPacket(quint64 timestamp);
ControlPacket(ControlPacket&& other);
ControlPacket(const ControlPacket& other) = delete;

View file

@ -80,8 +80,8 @@ void SendQueue::stop() {
_running = false;
}
void SendQueue::sendPacket(const Packet& packet) {
_socket->writeDatagram(packet.getData(), packet.getDataSize(), _destination);
void SendQueue::sendPacket(const BasePacket& packet) {
_socket->writeUnreliablePacket(packet, _destination);
}
void SendQueue::ack(SeqNum ack) {

View file

@ -26,6 +26,7 @@
namespace udt {
class Socket;
class BasePacket;
class Packet;
class SendQueue : public QObject {
@ -47,7 +48,7 @@ public:
public slots:
void start();
void stop();
void sendPacket(const Packet& packet);
void sendPacket(const BasePacket& packet);
void ack(SeqNum ack);
void nak(std::list<SeqNum> naks);

View file

@ -66,7 +66,7 @@ void Socket::setBufferSizes(int numBytes) {
}
}
qint64 Socket::writeUnreliablePacket(const Packet& packet, const HifiSockAddr& sockAddr) {
qint64 Socket::writeUnreliablePacket(const BasePacket& packet, const HifiSockAddr& sockAddr) {
return writeDatagram(packet.getData(), packet.getDataSize(), sockAddr);
}

View file

@ -42,7 +42,7 @@ public:
quint16 localPort() const { return _udpSocket.localPort(); }
qint64 writeUnreliablePacket(const Packet& packet, const HifiSockAddr& sockAddr);
qint64 writeUnreliablePacket(const BasePacket& packet, const HifiSockAddr& sockAddr);
qint64 writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr)
{ return writeDatagram(QByteArray::fromRawData(data, size), sockAddr); }