cleanup SendQueue start and UDTTest bind

This commit is contained in:
Stephen Birarda 2015-07-30 17:59:27 -07:00
parent 8e786cb953
commit a38e7b0431
3 changed files with 23 additions and 26 deletions

View file

@ -31,6 +31,9 @@ std::unique_ptr<SendQueue> SendQueue::create(Socket* socket, HifiSockAddr dest)
// Setup queue private thread // Setup queue private thread
QThread* thread = new QThread(); QThread* thread = new QThread();
thread->setObjectName("Networking: SendQueue " + dest.objectName()); // Name thread for easier debug thread->setObjectName("Networking: SendQueue " + dest.objectName()); // Name thread for easier debug
connect(thread, &QThread::started, queue.get(), &SendQueue::run);
connect(queue.get(), &QObject::destroyed, thread, &QThread::quit); // Thread auto cleanup connect(queue.get(), &QObject::destroyed, thread, &QThread::quit); // Thread auto cleanup
connect(thread, &QThread::finished, thread, &QThread::deleteLater); // Thread auto cleanup connect(thread, &QThread::finished, thread, &QThread::deleteLater); // Thread auto cleanup
@ -45,7 +48,7 @@ SendQueue::SendQueue(Socket* socket, HifiSockAddr dest) :
_socket(socket), _socket(socket),
_destination(dest) _destination(dest)
{ {
_packetSendPeriod = DEFAULT_SEND_PERIOD;
} }
void SendQueue::queuePacket(std::unique_ptr<Packet> packet) { void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
@ -53,8 +56,8 @@ void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
QWriteLocker locker(&_packetsLock); QWriteLocker locker(&_packetsLock);
_packets.push_back(std::move(packet)); _packets.push_back(std::move(packet));
} }
if (!_isRunning) { if (!this->thread()->isRunning()) {
run(); this->thread()->start();
} }
} }
@ -116,13 +119,6 @@ SequenceNumber SendQueue::getNextSequenceNumber() {
} }
void SendQueue::run() { void SendQueue::run() {
// We need to make sure this is called on the right thread
if (thread() != QThread::currentThread()) {
QMetaObject::invokeMethod(this, "run", Qt::QueuedConnection);
return;
}
_isRunning = true; _isRunning = true;
while (_isRunning) { while (_isRunning) {
@ -166,7 +162,6 @@ void SendQueue::run() {
} }
if (nextPacket) { if (nextPacket) {
qDebug() << "the next packet is" << nextPacket->getDataSize() << "bytes";
bool shouldSendSecondOfPair = false; bool shouldSendSecondOfPair = false;
if (!hasResend) { if (!hasResend) {
@ -183,11 +178,13 @@ void SendQueue::run() {
nextPacket->writeSequenceNumber(sequenceNumber); nextPacket->writeSequenceNumber(sequenceNumber);
sendPacket(*nextPacket); sendPacket(*nextPacket);
// Insert the packet we have just sent in the sent list {
QWriteLocker locker(&_sentLock); // Insert the packet we have just sent in the sent list
_sentPackets[nextPacket->getSequenceNumber()].swap(nextPacket); QWriteLocker locker(&_sentLock);
Q_ASSERT_X(!nextPacket, _sentPackets[nextPacket->getSequenceNumber()].swap(nextPacket);
"SendQueue::sendNextPacket()", "Overriden packet in sent list"); Q_ASSERT_X(!nextPacket,
"SendQueue::sendNextPacket()", "Overriden packet in sent list");
}
emit packetSent(); emit packetSent();
@ -206,18 +203,18 @@ void SendQueue::run() {
pairedPacket->writeSequenceNumber(getNextSequenceNumber()); pairedPacket->writeSequenceNumber(getNextSequenceNumber());
sendPacket(*pairedPacket); sendPacket(*pairedPacket);
// add the paired packet to the sent list {
QWriteLocker locker(&_sentLock); // add the paired packet to the sent list
_sentPackets[pairedPacket->getSequenceNumber()].swap(pairedPacket); QWriteLocker locker(&_sentLock);
Q_ASSERT_X(!pairedPacket, _sentPackets[pairedPacket->getSequenceNumber()].swap(pairedPacket);
"SendQueue::sendNextPacket()", "Overriden packet in sent list"); Q_ASSERT_X(!pairedPacket,
"SendQueue::sendNextPacket()", "Overriden packet in sent list");
}
emit packetSent(); emit packetSent();
} }
} }
} }
} }
// since we're a while loop, give the thread a chance to process events // since we're a while loop, give the thread a chance to process events

View file

@ -37,7 +37,7 @@ class SendQueue : public QObject {
Q_OBJECT Q_OBJECT
public: public:
static const int DEFAULT_SEND_PERIOD = 16 * 1000; // 16ms, in microseconds static const int DEFAULT_SEND_PERIOD = 1; // in microseconds
static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr dest); static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr dest);
@ -86,7 +86,7 @@ private:
SequenceNumber _currentSequenceNumber; // Last sequence number sent out SequenceNumber _currentSequenceNumber; // Last sequence number sent out
std::atomic<uint32_t> _atomicCurrentSequenceNumber; // Atomic for last sequence number sent out std::atomic<uint32_t> _atomicCurrentSequenceNumber; // Atomic for last sequence number sent out
std::atomic<int> _packetSendPeriod { 0 }; // Interval between two packet send event in microseconds std::atomic<int> _packetSendPeriod { DEFAULT_SEND_PERIOD }; // Interval between two packet send event in microseconds
std::chrono::high_resolution_clock::time_point _lastSendTimestamp; // Record last time of packet departure std::chrono::high_resolution_clock::time_point _lastSendTimestamp; // Record last time of packet departure
std::atomic<bool> _isRunning { false }; std::atomic<bool> _isRunning { false };

View file

@ -49,7 +49,7 @@ UDTTest::UDTTest(int& argc, char** argv) :
// randomize the seed for packet size randomization // randomize the seed for packet size randomization
srand(time(NULL)); srand(time(NULL));
_socket.bind(QHostAddress::LocalHost, _argumentParser.value(PORT_OPTION).toUInt()); _socket.bind(QHostAddress::AnyIPv4, _argumentParser.value(PORT_OPTION).toUInt());
qDebug() << "Test socket is listening on" << _socket.localPort(); qDebug() << "Test socket is listening on" << _socket.localPort();
if (_argumentParser.isSet(TARGET_OPTION)) { if (_argumentParser.isSet(TARGET_OPTION)) {