Merge branch 'atp' of https://github.com/birarda/hifi into protocol

This commit is contained in:
Atlante45 2015-07-27 17:45:31 -07:00
commit 2ecb445ce9
12 changed files with 195 additions and 39 deletions

View file

@ -1,6 +1,6 @@
//
// CongestionControl.cpp
//
// libraries/networking/src/udt
//
// Created by Clement on 7/23/15.
// Copyright 2015 High Fidelity, Inc.

View file

@ -1,6 +1,6 @@
//
// CongestionControl.h
//
// libraries/networking/src/udt
//
// Created by Clement on 7/23/15.
// Copyright 2015 High Fidelity, Inc.

View file

@ -18,6 +18,7 @@
using namespace udt;
using namespace std;
using namespace std::chrono;
Connection::Connection(Socket* parentSocket, HifiSockAddr destination) {
@ -29,21 +30,105 @@ void Connection::send(unique_ptr<Packet> packet) {
}
}
void Connection::sendACK(bool wasCausedBySyncTimeout) {
static const int ACK_PACKET_PAYLOAD_BYTES = sizeof(_lastSentACK) + sizeof(_currentACKSubSequenceNumber)
+ sizeof(_rtt) + sizeof(_rttVariance) + sizeof(int32_t) + sizeof(int32_t);
// setup the ACK packet, make it static so we can re-use it
static auto ackPacket = ControlPacket::create(ControlPacket::ACK, ACK_PACKET_PAYLOAD_BYTES);
auto currentTime = high_resolution_clock::now();
SeqNum nextACKNumber = nextACK();
if (nextACKNumber <= _lastReceivedAcknowledgedACK) {
// we already got an ACK2 for this ACK we would be sending, don't bother
return;
}
if (nextACKNumber >= _lastSentACK) {
// we have received new packets since the last sent ACK
// update the last sent ACK
_lastSentACK = nextACKNumber;
// remove the ACKed packets from the receive queue
} else if (nextACKNumber == _lastSentACK) {
// We already sent this ACK, but check if we should re-send it.
// We will re-send if it has been more than RTT + (4 * RTT variance) since the last ACK
milliseconds sinceLastACK = duration_cast<milliseconds>(currentTime - _lastACKTime);
if (sinceLastACK.count() < (_rtt + (4 * _rttVariance))) {
return;
}
}
// reset the ACK packet so we can fill it up and have it figure out what size it is
ackPacket->reset();
// pack in the ACK sub-sequence number
ackPacket->writePrimitive(_currentACKSubSequenceNumber++);
// pack in the ACK number
ackPacket->writePrimitive(nextACKNumber);
// pack in the RTT and variance
ackPacket->writePrimitive(_rtt);
ackPacket->writePrimitive(_rttVariance);
// pack the available buffer size - must be a minimum of 2
if (wasCausedBySyncTimeout) {
// pack in the receive speed and bandwidth
// record this as the last ACK send time
_lastACKTime = high_resolution_clock::now();
}
// have the send queue send off our packet
_sendQueue->sendPacket(*ackPacket);
}
void Connection::sendLightACK() const {
static const int LIGHT_ACK_PACKET_PAYLOAD_BYTES = 4;
// create the light ACK packet, make it static so we can re-use it
static auto lightACKPacket = ControlPacket::create(ControlPacket::ACK, LIGHT_ACK_PACKET_PAYLOAD_BYTES);
SeqNum nextACKNumber = nextACK();
if (nextACKNumber == _lastReceivedAcknowledgedACK) {
// we already got an ACK2 for this ACK we would be sending, don't bother
return;
}
// pack in the ACK
memcpy(lightACKPacket->getPayload(), &nextACKNumber, sizeof(nextACKNumber));
// have the send queue send off our packet
_sendQueue->sendPacket(*lightACKPacket);
}
SeqNum Connection::nextACK() const {
// TODO: check if we have a loss list
return _largestReceivedSeqNum + 1;
}
void Connection::processReceivedSeqNum(SeqNum seq) {
// If this is not the next sequence number, report loss
if (seq > _largestRecievedSeqNum + 1) {
if (_largestRecievedSeqNum + 1 == seq - 1) {
_lossList.append(_largestRecievedSeqNum + 1);
if (seq > _largestReceivedSeqNum + 1) {
if (_largestReceivedSeqNum + 1 == seq - 1) {
_lossList.append(_largestReceivedSeqNum + 1);
} else {
_lossList.append(_largestRecievedSeqNum + 1, seq - 1);
_lossList.append(_largestReceivedSeqNum + 1, seq - 1);
}
// TODO: Send loss report
}
if (seq > _largestRecievedSeqNum) {
if (seq > _largestReceivedSeqNum) {
// Update largest recieved sequence number
_largestRecievedSeqNum = seq;
_largestReceivedSeqNum = seq;
} else {
// Otherwise, it's a resend, remove it from the loss list
_lossList.remove(seq);
@ -52,13 +137,13 @@ void Connection::processReceivedSeqNum(SeqNum seq) {
void Connection::processControl(unique_ptr<ControlPacket> controlPacket) {
switch (controlPacket->getType()) {
case ControlPacket::Type::ACK:
case ControlPacket::ACK:
break;
case ControlPacket::Type::ACK2:
case ControlPacket::ACK2:
break;
case ControlPacket::Type::NAK:
case ControlPacket::NAK:
break;
case ControlPacket::Type::PacketPair:
case ControlPacket::PacketPair:
break;
}
}

View file

@ -12,6 +12,7 @@
#ifndef hifi_Connection_h
#define hifi_Connection_h
#include <chrono>
#include <memory>
#include "LossList.h"
@ -32,13 +33,26 @@ public:
void send(std::unique_ptr<Packet> packet);
void sendACK(bool wasCausedBySyncTimeout = true);
void sendLightACK() const;
SeqNum nextACK() const;
void processReceivedSeqNum(SeqNum seq);
void processControl(std::unique_ptr<ControlPacket> controlPacket);
private:
LossList _lossList;
SeqNum _largestRecievedSeqNum;
SeqNum _largestReceivedSeqNum; // The largest sequence number received from the peer
SeqNum _lastSentACK; // The last sent ACK
SeqNum _lastReceivedAcknowledgedACK; // The last sent ACK that has been acknowledged via an ACK2 from the peer
SeqNum _currentACKSubSequenceNumber; // The current ACK sub-sequence number (used for Acknowledgment of ACKs)
int32_t _rtt; // RTT, in milliseconds
int32_t _rttVariance; // RTT variance
std::chrono::high_resolution_clock::time_point _lastACKTime;
std::unique_ptr<SendQueue> _sendQueue;
};

View file

@ -13,8 +13,18 @@
using namespace udt;
std::unique_ptr<ControlPacket> ControlPacket::create(Type type, const SequenceNumberList& sequenceNumbers) {
return ControlPacket::create(type, sequenceNumbers);
std::unique_ptr<ControlPacket> ControlPacket::create(Type type, qint64 size) {
std::unique_ptr<ControlPacket> controlPacket;
if (size == -1) {
return ControlPacket::create(type);
} else {
// Fail with invalid size
Q_ASSERT(size >= 0);
return ControlPacket::create(type, size);
}
}
ControlPacket::ControlPacketPair ControlPacket::createPacketPair(quint64 timestamp) {
@ -25,25 +35,33 @@ ControlPacket::ControlPacketPair ControlPacket::createPacketPair(quint64 timesta
}
qint64 ControlPacket::localHeaderSize() {
return sizeof(TypeAndSubSequenceNumber);
return sizeof(ControlBitAndType);
}
qint64 ControlPacket::totalHeadersSize() const {
return BasePacket::totalHeadersSize() + localHeaderSize();
}
ControlPacket::ControlPacket(Type type, const SequenceNumberList& sequenceNumbers) :
BasePacket(localHeaderSize() + (sizeof(Packet::SequenceNumber) * sequenceNumbers.size())),
ControlPacket::ControlPacket(Type type) :
BasePacket(-1),
_type(type)
{
adjustPayloadStartAndCapacity();
open(QIODevice::ReadWrite);
// pack in the sequence numbers
for (auto& sequenceNumber : sequenceNumbers) {
writePrimitive(sequenceNumber);
}
writeControlBitAndType();
}
ControlPacket::ControlPacket(Type type, qint64 size) :
BasePacket(localHeaderSize() + size),
_type(type)
{
adjustPayloadStartAndCapacity();
open(QIODevice::ReadWrite);
writeControlBitAndType();
}
ControlPacket::ControlPacket(quint64 timestamp) :
@ -54,6 +72,8 @@ ControlPacket::ControlPacket(quint64 timestamp) :
open(QIODevice::ReadWrite);
writeControlBitAndType();
// pack in the timestamp
writePrimitive(timestamp);
}
@ -71,3 +91,15 @@ ControlPacket& ControlPacket::operator=(ControlPacket&& other) {
return *this;
}
static const uint32_t CONTROL_BIT_MASK = 1 << (sizeof(ControlPacket::ControlBitAndType) - 1);
void ControlPacket::writeControlBitAndType() {
ControlBitAndType* bitAndType = reinterpret_cast<ControlBitAndType*>(_packet.get());
// write the control bit by OR'ing the current value with the CONTROL_BIT_MASK
*bitAndType = (*bitAndType | CONTROL_BIT_MASK);
// write the type by OR'ing the type with the current value & CONTROL_BIT_MASK
*bitAndType = (*bitAndType & CONTROL_BIT_MASK) | (_type << sizeof((ControlPacket::Type) - 1));
}

View file

@ -21,23 +21,21 @@
namespace udt {
using SequenceNumberList = std::vector<Packet::SequenceNumber>;
class ControlPacket : public BasePacket {
Q_OBJECT
public:
using TypeAndSubSequenceNumber = uint32_t;
using ControlBitAndType = uint32_t;
using ControlPacketPair = std::pair<std::unique_ptr<ControlPacket>, std::unique_ptr<ControlPacket>>;
enum class Type : uint16_t {
enum Type : uint16_t {
ACK,
ACK2,
NAK,
PacketPair
};
std::unique_ptr<ControlPacket> create(Type type, const SequenceNumberList& sequenceNumbers);
ControlPacketPair createPacketPair(quint64 timestamp);
static std::unique_ptr<ControlPacket> create(Type type, qint64 size = -1);
static ControlPacketPair createPacketPair(quint64 timestamp);
static qint64 localHeaderSize(); // Current level's header size
virtual qint64 totalHeadersSize() const; // Cumulated size of all the headers
@ -45,7 +43,8 @@ public:
Type getType() const { return _type; }
private:
ControlPacket(Type type, const SequenceNumberList& sequenceNumbers);
ControlPacket(Type type);
ControlPacket(Type type, qint64 size);
ControlPacket(quint64 timestamp);
ControlPacket(ControlPacket&& other);
ControlPacket(const ControlPacket& other) = delete;
@ -53,6 +52,9 @@ private:
ControlPacket& operator=(ControlPacket&& other);
ControlPacket& operator=(const ControlPacket& other) = delete;
// Header writers
void writeControlBitAndType();
Type _type;
};

View file

@ -117,7 +117,6 @@ Packet& Packet::operator=(Packet&& other) {
static const uint32_t CONTROL_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 1);
static const uint32_t RELIABILITY_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 2);
static const uint32_t MESSAGE_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 3);
static const int BIT_FIELD_LENGTH = 3;
static const uint32_t BIT_FIELD_MASK = CONTROL_BIT_MASK | RELIABILITY_BIT_MASK | MESSAGE_BIT_MASK;
void Packet::readIsReliable() {

View file

@ -33,8 +33,6 @@ public:
using MessageNumber = uint32_t;
using MessageNumberAndBitField = uint32_t;
static const uint32_t DEFAULT_SEQUENCE_NUMBER = 0;
static std::unique_ptr<Packet> create(qint64 size = -1, bool isReliable = false, bool isPartOfMessage = false);
static std::unique_ptr<Packet> fromReceivedPacket(std::unique_ptr<char> data, qint64 size, const HifiSockAddr& senderSockAddr);

View file

@ -80,8 +80,8 @@ void SendQueue::stop() {
_running = false;
}
void SendQueue::sendPacket(const Packet& packet) {
_socket->writeDatagram(packet.getData(), packet.getDataSize(), _destination);
void SendQueue::sendPacket(const BasePacket& packet) {
_socket->writeUnreliablePacket(packet, _destination);
}
void SendQueue::ack(SeqNum ack) {

View file

@ -26,6 +26,7 @@
namespace udt {
class Socket;
class BasePacket;
class Packet;
class SendQueue : public QObject {
@ -47,7 +48,7 @@ public:
public slots:
void start();
void stop();
void sendPacket(const Packet& packet);
void sendPacket(const BasePacket& packet);
void ack(SeqNum ack);
void nak(std::list<SeqNum> naks);

View file

@ -11,6 +11,8 @@
#include "Socket.h"
#include <QtCore/QThread>
#include "../NetworkLogging.h"
#include "Packet.h"
@ -20,6 +22,12 @@ Socket::Socket(QObject* parent) :
QObject(parent)
{
connect(&_udpSocket, &QUdpSocket::readyRead, this, &Socket::readPendingDatagrams);
// make sure our synchronization method is called every SYN interval
connect(&_synTimer, &QTimer::timeout, this, &Socket::rateControlSync);
// start our timer for the synchronization time interval
_synTimer.start(_synInterval);
}
void Socket::rebind() {
@ -58,7 +66,7 @@ void Socket::setBufferSizes(int numBytes) {
}
}
qint64 Socket::writeUnreliablePacket(const Packet& packet, const HifiSockAddr& sockAddr) {
qint64 Socket::writeUnreliablePacket(const BasePacket& packet, const HifiSockAddr& sockAddr) {
return writeDatagram(packet.getData(), packet.getDataSize(), sockAddr);
}
@ -113,3 +121,14 @@ void Socket::readPendingDatagrams() {
}
}
}
void Socket::rateControlSync() {
// TODO: enumerate our list of connections and ask each of them to send off periodic ACK packet for rate control
if (_synTimer.interval() != _synInterval) {
// if the _synTimer interval doesn't match the current _synInterval (changes when the CC factory is changed)
// then restart it now with the right interval
_synTimer.start(_synInterval);
}
}

View file

@ -18,13 +18,15 @@
#include <unordered_map>
#include <QtCore/QObject>
#include <QtCore/QTimer>
#include <QtNetwork/QUdpSocket>
#include "../HifiSockAddr.h"
namespace udt {
class BasePacket;
class ControlSender;
class Packet;
class SeqNum;
@ -40,7 +42,7 @@ public:
quint16 localPort() const { return _udpSocket.localPort(); }
qint64 writeUnreliablePacket(const Packet& packet, const HifiSockAddr& sockAddr);
qint64 writeUnreliablePacket(const BasePacket& packet, const HifiSockAddr& sockAddr);
qint64 writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr)
{ return writeDatagram(QByteArray::fromRawData(data, size), sockAddr); }
@ -59,6 +61,7 @@ public:
private slots:
void readPendingDatagrams();
void rateControlSync();
private:
QUdpSocket _udpSocket { this };
@ -68,6 +71,9 @@ private:
std::unordered_map<HifiSockAddr, BasePacketHandler> _unfilteredHandlers;
std::unordered_map<HifiSockAddr, SeqNum> _packetSequenceNumbers;
int32_t _synInterval = 10; // 10ms
QTimer _synTimer;
};
} // namespace udt