More work on Connnections

This commit is contained in:
Atlante45 2015-07-27 15:57:41 -07:00
parent 41a0f6b980
commit 2b29f5c130
6 changed files with 73 additions and 11 deletions

View file

@ -23,9 +23,34 @@ Connection::Connection(Socket* parentSocket, HifiSockAddr destination) {
}
void Connection::send(std::unique_ptr<Packet> packet) {
if (_sendQueue) {
_sendQueue->queuePacket(std::move(packet));
}
}
void Connection::processReceivedSeqNum(SeqNum seq) {
if (udt::seqcmp(seq, _largestRecievedSeqNum + 1) > 0) {
// TODO: Add range to loss list
// TODO: Send loss report
}
if (seq > _largestRecievedSeqNum) {
_largestRecievedSeqNum = seq;
} else {
// TODO: Remove seq from loss list
}
}
void Connection::processControl(std::unique_ptr<ControlPacket> controlPacket) {
switch (controlPacket->getType()) {
case ControlPacket::Type::ACK:
break;
case ControlPacket::Type::ACK2:
break;
case ControlPacket::Type::NAK:
break;
case ControlPacket::Type::PacketPair:
break;
}
}

View file

@ -30,9 +30,13 @@ public:
Connection(Socket* parentSocket, HifiSockAddr destination);
void send(std::unique_ptr<Packet> packet);
void processReceivedSeqNum(SeqNum seq);
void processControl(std::unique_ptr<ControlPacket> controlPacket);
private:
SeqNum _largestRecievedSeqNum;
std::unique_ptr<SendQueue> _sendQueue;
};

View file

@ -42,6 +42,8 @@ public:
static qint64 localHeaderSize(); // Current level's header size
virtual qint64 totalHeadersSize() const; // Cumulated size of all the headers
Type getType() const { return _type; }
private:
ControlPacket(Type type, const SequenceNumberList& sequenceNumbers);
ControlPacket(quint64 timestamp);

View file

@ -28,8 +28,8 @@ std::unique_ptr<SendQueue> SendQueue::create(Socket* socket, HifiSockAddr dest)
// Setup queue private thread
QThread* thread = new QThread(queue.get());
thread->setObjectName("Networking: SendQueue"); // Name thread for easier debug
thread->connect(queue.get(), &QObject::destroyed, thread, &QThread::quit); // Thread auto cleanup
thread->connect(thread, &QThread::finished, thread, &QThread::deleteLater); // Thread auto cleanup
connect(queue.get(), &QObject::destroyed, thread, &QThread::quit); // Thread auto cleanup
connect(thread, &QThread::finished, thread, &QThread::deleteLater); // Thread auto cleanup
// Move queue to private thread and start it
queue->moveToThread(thread);
@ -50,6 +50,11 @@ SendQueue::SendQueue(Socket* socket, HifiSockAddr dest) :
_lastSendTimestamp = 0;
}
SendQueue::~SendQueue() {
assert(thread() == QThread::currentThread());
_sendTimer->stop();
}
void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
{
QWriteLocker locker(&_packetsLock);

View file

@ -37,7 +37,7 @@ public:
static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr dest);
void queuePacket(std::unique_ptr<Packet> packet);
int getQueueSize() const { return _packets.size(); }
int getQueueSize() const { QReadLocker locker(&_packetsLock); return _packets.size(); }
quint64 getLastSendTimestamp() const { return _lastSendTimestamp; }
@ -56,11 +56,14 @@ private slots:
void sendNextPacket();
private:
friend struct std::default_delete<SendQueue>;
SendQueue(Socket* socket, HifiSockAddr dest);
SendQueue(SendQueue& other) = delete;
SendQueue(SendQueue&& other) = delete;
~SendQueue();
QReadWriteLock _packetsLock; // Protects the packets to be sent list.
mutable QReadWriteLock _packetsLock; // Protects the packets to be sent list.
std::list<std::unique_ptr<Packet>> _packets; // List of packets to be sent
std::unique_ptr<Packet> _nextPacket; // Next packet to be sent
@ -74,10 +77,10 @@ private:
std::atomic<quint64> _lastSendTimestamp { 0 }; // Record last time of packet departure
std::atomic<bool> _running { false };
QReadWriteLock _naksLock; // Protects the naks list.
mutable QReadWriteLock _naksLock; // Protects the naks list.
std::list<SeqNum> _naks; // Sequence numbers of packets to resend
QReadWriteLock _sentLock; // Protects the sent packet list
mutable QReadWriteLock _sentLock; // Protects the sent packet list
std::unordered_map<SeqNum, std::unique_ptr<Packet>> _sentPackets; // Packets waiting for ACK.
};

View file

@ -74,6 +74,11 @@ public:
return _value != other._value;
}
friend bool operator<(const SeqNum& a, const SeqNum& b);
friend bool operator>(const SeqNum& a, const SeqNum& b);
friend bool operator<=(const SeqNum& a, const SeqNum& b);
friend bool operator>=(const SeqNum& a, const SeqNum& b);
friend SeqNum operator+(const SeqNum a, const Type& b);
friend SeqNum operator+(const Type& a, const SeqNum b);
friend SeqNum operator-(const SeqNum a, const Type& b);
@ -89,22 +94,40 @@ private:
friend struct std::hash<SeqNum>;
};
SeqNum operator+(SeqNum a, const SeqNum::Type& b) {
inline bool operator<(const SeqNum& a, const SeqNum& b) {
}
inline bool operator>(const SeqNum& a, const SeqNum& b) {
}
inline bool operator<=(const SeqNum& a, const SeqNum& b) {
}
inline bool operator>=(const SeqNum& a, const SeqNum& b) {
}
inline SeqNum operator+(SeqNum a, const SeqNum::Type& b) {
a += b;
return a;
}
SeqNum operator+(const SeqNum::Type& a, SeqNum b) {
inline SeqNum operator+(const SeqNum::Type& a, SeqNum b) {
b += a;
return b;
}
SeqNum operator-(SeqNum a, const SeqNum::Type& b) {
inline SeqNum operator-(SeqNum a, const SeqNum::Type& b) {
a -= b;
return a;
}
SeqNum operator-(const SeqNum::Type& a, SeqNum b) {
inline SeqNum operator-(const SeqNum::Type& a, SeqNum b) {
b -= a;
return b;
}