change SendQueue timing to use usleep and std::chrono

This commit is contained in:
Stephen Birarda 2015-07-28 17:16:20 -07:00
parent 05d9845077
commit 988bd226ca
4 changed files with 95 additions and 83 deletions

View file

@ -11,6 +11,8 @@
#include "Connection.h"
#include <QtCore/QThread>
#include <NumericalConstants.h>
#include "../HifiSockAddr.h"
@ -28,6 +30,19 @@ Connection::Connection(Socket* parentSocket, HifiSockAddr destination) :
{
}
Connection::~Connection() {
if (_sendQueue) {
// tell our send queue to stop and wait until its send thread is done
QThread* sendQueueThread = _sendQueue->thread();
_sendQueue->stop();
_sendQueue->deleteLater();
sendQueueThread->quit();
sendQueueThread->wait();
}
}
void Connection::sendReliablePacket(unique_ptr<Packet> packet) {
Q_ASSERT_X(packet->isReliable(), "Connection::send", "Trying to send an unreliable packet reliably.");

View file

@ -33,6 +33,7 @@ public:
using SentACKMap = std::unordered_map<SequenceNumber, SequenceNumberTimePair>;
Connection(Socket* parentSocket, HifiSockAddr destination);
~Connection();
void sendReliablePacket(std::unique_ptr<Packet> packet);

View file

@ -13,6 +13,7 @@
#include <algorithm>
#include <QtCore/QCoreApplication>
#include <QtCore/QThread>
#include <SharedUtil.h>
@ -21,6 +22,7 @@
#include "Socket.h"
using namespace udt;
using namespace std::chrono;
std::unique_ptr<SendQueue> SendQueue::create(Socket* socket, HifiSockAddr dest) {
auto queue = std::unique_ptr<SendQueue>(new SendQueue(socket, dest));
@ -42,17 +44,7 @@ SendQueue::SendQueue(Socket* socket, HifiSockAddr dest) :
_socket(socket),
_destination(dest)
{
_sendTimer.reset(new QTimer(this));
_sendTimer->setSingleShot(true);
QObject::connect(_sendTimer.get(), &QTimer::timeout, this, &SendQueue::sendNextPacket);
_packetSendPeriod = DEFAULT_SEND_PERIOD;
_lastSendTimestamp = 0;
}
SendQueue::~SendQueue() {
assert(thread() == QThread::currentThread());
_sendTimer->stop();
}
void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
@ -60,24 +52,24 @@ void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
QWriteLocker locker(&_packetsLock);
_packets.push_back(std::move(packet));
}
if (!_running) {
start();
if (!_isRunning) {
run();
}
}
void SendQueue::start() {
void SendQueue::run() {
// We need to make sure this is called on the right thread
if (thread() != QThread::currentThread()) {
QMetaObject::invokeMethod(this, "start", Qt::QueuedConnection);
QMetaObject::invokeMethod(this, "run", Qt::QueuedConnection);
}
_running = true;
_isRunning = true;
// This will send a packet and fire the send timer
sendNextPacket();
// This will loop and sleep to send packets
loop();
}
void SendQueue::stop() {
_running = false;
_isRunning = false;
}
void SendQueue::sendPacket(const BasePacket& packet) {
@ -128,62 +120,69 @@ SequenceNumber SendQueue::getNextSequenceNumber() {
return _currentSequenceNumber;
}
void SendQueue::sendNextPacket() {
if (!_running) {
return;
}
// Record timing
auto sendTime = msecTimestampNow(); // msec
_lastSendTimestamp = sendTime;
if (_nextPacket) {
// Write packet's sequence number and send it off
_nextPacket->setSequenceNumber(getNextSequenceNumber());
sendPacket(*_nextPacket);
void SendQueue::loop() {
while (_isRunning) {
// Record timing
_lastSendTimestamp = high_resolution_clock::now();
// Insert the packet we have just sent in the sent list
QWriteLocker locker(&_sentLock);
_sentPackets[_nextPacket->getSequenceNumber()].swap(_nextPacket);
Q_ASSERT_X(!_nextPacket,
"SendQueue::sendNextPacket()", "Overriden packet in sent list");
}
bool hasResend = false;
SequenceNumber seqNum;
{
// Check nak list for packet to resend
QWriteLocker locker(&_naksLock);
if (!_naks.empty()) {
hasResend = true;
seqNum = _naks.front();
_naks.pop_front();
if (_nextPacket) {
// Write packet's sequence number and send it off
_nextPacket->setSequenceNumber(getNextSequenceNumber());
sendPacket(*_nextPacket);
// Insert the packet we have just sent in the sent list
QWriteLocker locker(&_sentLock);
_sentPackets[_nextPacket->getSequenceNumber()].swap(_nextPacket);
Q_ASSERT_X(!_nextPacket,
"SendQueue::sendNextPacket()", "Overriden packet in sent list");
}
bool hasResend = false;
SequenceNumber seqNum;
{
// Check nak list for packet to resend
QWriteLocker locker(&_naksLock);
if (!_naks.empty()) {
hasResend = true;
seqNum = _naks.front();
_naks.pop_front();
}
}
// Find packet in sent list using SequenceNumber
if (hasResend) {
QWriteLocker locker(&_sentLock);
auto it = _sentPackets.find(seqNum);
Q_ASSERT_X(it != _sentPackets.end(),
"SendQueue::sendNextPacket()", "Couldn't find NAKed packet to resend");
if (it != _sentPackets.end()) {
it->second.swap(_nextPacket);
_sentPackets.erase(it);
}
}
// If there is no packet to resend, grab the next one in the list
if (!_nextPacket) {
QWriteLocker locker(&_packetsLock);
_nextPacket.swap(_packets.front());
_packets.pop_front();
}
// since we're a while loop, give the thread a chance to process events
QCoreApplication::processEvents();
// we just processed events so check now if we were just told to stop
if (!_isRunning) {
break;
}
// sleep as long as we need until next packet send, if we can
auto now = high_resolution_clock::now();
auto microsecondDuration = (_lastSendTimestamp + microseconds(_packetSendPeriod)) - now;
if (microsecondDuration.count() > 0) {
usleep(microsecondDuration.count());
}
}
// Find packet in sent list using SequenceNumber
if (hasResend) {
QWriteLocker locker(&_sentLock);
auto it = _sentPackets.find(seqNum);
Q_ASSERT_X(it != _sentPackets.end(),
"SendQueue::sendNextPacket()", "Couldn't find NAKed packet to resend");
if (it != _sentPackets.end()) {
it->second.swap(_nextPacket);
_sentPackets.erase(it);
}
}
// If there is no packet to resend, grab the next one in the list
if (!_nextPacket) {
QWriteLocker locker(&_packetsLock);
_nextPacket.swap(_packets.front());
_packets.pop_front();
}
// check if we need to fire off a packet pair - we do this
// How long before next packet send
auto timeToSleep = (sendTime + _packetSendPeriod) - msecTimestampNow(); // msec
_sendTimer->start(std::max((quint64)0, timeToSleep));
}

View file

@ -12,6 +12,7 @@
#ifndef hifi_SendQueue_h
#define hifi_SendQueue_h
#include <chrono>
#include <list>
#include <unordered_map>
@ -33,15 +34,13 @@ class SendQueue : public QObject {
Q_OBJECT
public:
static const int DEFAULT_SEND_PERIOD = 16; // msec
static const int DEFAULT_SEND_PERIOD = 16 * 1000; // 16ms, in microseconds
static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr dest);
void queuePacket(std::unique_ptr<Packet> packet);
int getQueueSize() const { QReadLocker locker(&_packetsLock); return _packets.size(); }
quint64 getLastSendTimestamp() const { return _lastSendTimestamp; }
SequenceNumber getCurrentSequenceNumber() const { return SequenceNumber(_atomicCurrentSequenceNumber); }
int getPacketSendPeriod() const { return _packetSendPeriod; }
@ -51,14 +50,14 @@ public:
void sendPacket(const BasePacket& packet);
public slots:
void start();
void run();
void stop();
void ack(SequenceNumber ack);
void nak(std::list<SequenceNumber> naks);
private slots:
void sendNextPacket();
void loop();
private:
friend struct std::default_delete<SendQueue>;
@ -66,7 +65,6 @@ private:
SendQueue(Socket* socket, HifiSockAddr dest);
SendQueue(SendQueue& other) = delete;
SendQueue(SendQueue&& other) = delete;
~SendQueue();
// Increments current sequence number and return it
SequenceNumber getNextSequenceNumber();
@ -82,10 +80,9 @@ private:
SequenceNumber _currentSequenceNumber; // Last sequence number sent out
std::atomic<uint32_t> _atomicCurrentSequenceNumber; // Atomic for last sequence number sent out
std::unique_ptr<QTimer> _sendTimer; // Send timer
std::atomic<int> _packetSendPeriod { 0 }; // Interval between two packet send envent in msec
std::atomic<quint64> _lastSendTimestamp { 0 }; // Record last time of packet departure
std::atomic<bool> _running { false };
std::atomic<int> _packetSendPeriod { 0 }; // Interval between two packet send event in microseconds
std::chrono::high_resolution_clock::time_point _lastSendTimestamp; // Record last time of packet departure
std::atomic<bool> _isRunning { false };
mutable QReadWriteLock _naksLock; // Protects the naks list.
std::list<SequenceNumber> _naks; // Sequence numbers of packets to resend