More work on SendQueue (Threading)

This commit is contained in:
Atlante45 2015-07-24 14:33:40 -07:00
parent 797f74f3d9
commit da93301ac6
2 changed files with 68 additions and 27 deletions

View file

@ -11,18 +11,38 @@
#include "SendQueue.h"
#include <algorithm>
#include <QtCore/QThread>
#include <QtCore/QTimer>
#include <SharedUtil.h>
#include "Packet.h"
#include "Socket.h"
namespace udt {
std::unique_ptr<SendQueue> SendQueue::create() {
return std::unique_ptr<SendQueue>(new SendQueue());
std::unique_ptr<SendQueue> SendQueue::create(Socket* socket, HifiSockAddr dest) {
auto queue = std::unique_ptr<SendQueue>(new SendQueue(socket, dest));
// Setup queue private thread
QThread* thread = new QThread(queue.get());
thread->setObjectName("Networking: SendQueue"); // Name thread for easier debug
thread->connect(queue.get(), &QObject::destroyed, thread, &QThread::quit); // Thread auto cleanup
thread->connect(thread, &QThread::finished, thread, &QThread::deleteLater); // Thread auto cleanup
// Move queue to private thread and start it
queue->moveToThread(thread);
thread->start();
return std::move(queue);
}
SendQueue::SendQueue() {
SendQueue::SendQueue(Socket* socket, HifiSockAddr dest) :
_socket(socket),
_destination(dest)
{
_sendTimer.reset(new QTimer(this));
_sendTimer->setSingleShot(true);
QObject::connect(_sendTimer.get(), &QTimer::timeout, this, &SendQueue::sendNextPacket);
@ -32,8 +52,13 @@ SendQueue::SendQueue() {
}
void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
QWriteLocker locker(&_packetsLock);
_packets.push_back(std::move(packet));
{
QWriteLocker locker(&_packetsLock);
_packets.push_back(std::move(packet));
}
if (!_running) {
start();
}
}
void SendQueue::start() {
@ -41,28 +66,37 @@ void SendQueue::start() {
if (thread() != QThread::currentThread()) {
QMetaObject::invokeMethod(this, "start", Qt::QueuedConnection);
}
_running = true;
// This will send a packet and fire the send timer
sendNextPacket();
}
void SendQueue::stop() {
// We need to make sure this is called on the right thread
if (thread() != QThread::currentThread()) {
QMetaObject::invokeMethod(this, "stop", Qt::QueuedConnection);
}
// Stopping the timer will stop the sending of packets
_sendTimer->stop();
_running = false;
}
void SendQueue::sendPacket(const Packet& packet) {
_socket->writeDatagram(packet.getData(), packet.getDataSize(), _destination);
}
void SendQueue::sendNextPacket() {
if (!_running) {
return;
}
// Record timing
auto sendTime = msecTimestampNow(); // msec
_lastSendTimestamp = sendTime;
// TODO send packet
// Insert the packet we have just sent in the sent list
_sentPackets[_nextPacket->readSequenceNumber()].swap(_nextPacket);
Q_ASSERT(!_nextPacket); // There should be no packet where we inserted
if (_nextPacket) {
_nextPacket->writeSequenceNumber(++_currentSeqNum);
sendPacket(*_nextPacket);
// Insert the packet we have just sent in the sent list
_sentPackets[_nextPacket->getSequenceNumber()].swap(_nextPacket);
Q_ASSERT(!_nextPacket); // There should be no packet where we inserted
}
{ // Grab next packet to be sent
QWriteLocker locker(&_packetsLock);
@ -72,11 +106,7 @@ void SendQueue::sendNextPacket() {
// How long before next packet send
auto timeToSleep = (sendTime + _packetSendPeriod) - msecTimestampNow(); // msec
if (timeToSleep > 0) {
_sendTimer->start(timeToSleep);
} else {
_sendTimer->start(0);
}
_sendTimer->start(std::max((quint64)0, timeToSleep));
}
}

View file

@ -18,19 +18,24 @@
#include <QObject>
#include <QReadWriteLock>
#include "Packet.h"
#include "../HifiSockAddr.h"
#include "SeqNum.h"
class QTimer;
namespace udt {
class Socket;
class Packet;
class SendQueue : public QObject {
Q_OBJECT
public:
static const int DEFAULT_SEND_PERIOD = 16; // msec
static std::unique_ptr<SendQueue> create();
static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr dest);
void queuePacket(std::unique_ptr<Packet> packet);
int getQueueSize() const { return _packets.size(); }
@ -43,24 +48,30 @@ public:
public slots:
void start();
void stop();
void sendPacket(const Packet& packet);
private slots:
void sendNextPacket();
private:
SendQueue();
SendQueue(Socket* socket, HifiSockAddr dest);
SendQueue(SendQueue& other) = delete;
SendQueue(SendQueue&& other) = delete;
QReadWriteLock _packetsLock; // Protects the packets to be sent list.
std::list<std::unique_ptr<Packet>> _packets; // List of packets to be sent
std::unique_ptr<Packet> _nextPacket;
std::unique_ptr<Packet> _nextPacket; // Next packet to be sent
Socket* _socket { nullptr }; // Socket to send packet on
HifiSockAddr _destination; // Destination addr
SeqNum _currentSeqNum; // Last sequence number sent out
std::unique_ptr<QTimer> _sendTimer; // Send timer
std::atomic<int> _packetSendPeriod; // Interval between two packet send envent in msec
std::atomic<quint64> _lastSendTimestamp; // Record last time of packet departure
std::atomic<int> _packetSendPeriod { 0 }; // Interval between two packet send envent in msec
std::atomic<quint64> _lastSendTimestamp { 0 }; // Record last time of packet departure
std::atomic<bool> _running { false };
std::unordered_map<int, std::unique_ptr<Packet>> _sentPackets; // Packets waiting for ACK.
std::unordered_map<SeqNum, std::unique_ptr<Packet>> _sentPackets; // Packets waiting for ACK.
};
}