From 3ea7e7962273ea53fc004ab05581c5ca00a43dd3 Mon Sep 17 00:00:00 2001 From: Andrzej Kapolka Date: Mon, 21 Jul 2014 20:03:50 -0700 Subject: [PATCH] Working on distributing metavoxel session handling over multiple threads. --- .../src/metavoxels/MetavoxelServer.cpp | 84 +++++++++++++++---- .../src/metavoxels/MetavoxelServer.h | 46 ++++++++-- .../metavoxels/src/DatagramSequencer.cpp | 6 +- libraries/metavoxels/src/Endpoint.cpp | 2 +- 4 files changed, 109 insertions(+), 29 deletions(-) diff --git a/assignment-client/src/metavoxels/MetavoxelServer.cpp b/assignment-client/src/metavoxels/MetavoxelServer.cpp index eed0f3a226..cd80650638 100644 --- a/assignment-client/src/metavoxels/MetavoxelServer.cpp +++ b/assignment-client/src/metavoxels/MetavoxelServer.cpp @@ -25,14 +25,19 @@ const int SEND_INTERVAL = 50; MetavoxelServer::MetavoxelServer(const QByteArray& packet) : ThreadedAssignment(packet), - _sendTimer(this) { - - _sendTimer.setSingleShot(true); - connect(&_sendTimer, SIGNAL(timeout()), SLOT(sendDeltas())); + _nextSender(0) { } void MetavoxelServer::applyEdit(const MetavoxelEditMessage& edit) { - edit.apply(_data, SharedObject::getWeakHash()); + MetavoxelData data = _data; + edit.apply(data, SharedObject::getWeakHash()); + setData(data); +} + +void MetavoxelServer::setData(const MetavoxelData& data) { + if (_data != data) { + emit dataChanged(_data = data); + } } const QString METAVOXEL_SERVER_LOGGING_NAME = "metavoxel-server"; @@ -46,12 +51,26 @@ void MetavoxelServer::run() { connect(nodeList, SIGNAL(nodeAdded(SharedNodePointer)), SLOT(maybeAttachSession(const SharedNodePointer&))); connect(nodeList, SIGNAL(nodeKilled(SharedNodePointer)), SLOT(maybeDeleteSession(const SharedNodePointer&))); - _lastSend = QDateTime::currentMSecsSinceEpoch(); - _sendTimer.start(SEND_INTERVAL); - // initialize Bitstream before using it in multiple threads Bitstream::preThreadingInit(); + // create the senders, each with its own thread + int threadCount = QThread::idealThreadCount(); + if (threadCount == -1) { + const int DEFAULT_THREAD_COUNT = 4; + threadCount = DEFAULT_THREAD_COUNT; + } + qDebug() << "Creating" << threadCount << "sender threads"; + for (int i = 0; i < threadCount; i++) { + QThread* thread = new QThread(this); + MetavoxelSender* sender = new MetavoxelSender(this); + sender->moveToThread(thread); + sender->connect(thread, SIGNAL(finished()), SLOT(deleteLater())); + thread->start(); + QMetaObject::invokeMethod(sender, "start"); + _senders.append(sender); + } + // create the persister and start it in its own thread _persister = new MetavoxelPersister(this); QThread* persistenceThread = new QThread(this); @@ -86,6 +105,11 @@ void MetavoxelServer::readPendingDatagrams() { void MetavoxelServer::aboutToFinish() { QMetaObject::invokeMethod(_persister, "save", Q_ARG(const MetavoxelData&, _data)); + + foreach (MetavoxelSender* sender, _senders) { + sender->thread()->quit(); + sender->thread()->wait(); + } _persister->thread()->quit(); _persister->thread()->wait(); } @@ -93,7 +117,11 @@ void MetavoxelServer::aboutToFinish() { void MetavoxelServer::maybeAttachSession(const SharedNodePointer& node) { if (node->getType() == NodeType::Agent) { QMutexLocker locker(&node->getMutex()); - node->setLinkedData(new MetavoxelSession(node, this)); + MetavoxelSender* sender = _senders.at(_nextSender); + _nextSender = (_nextSender + 1) % _senders.size(); + MetavoxelSession* session = new MetavoxelSession(node, sender); + session->moveToThread(sender->thread()); + node->setLinkedData(session); } } @@ -108,11 +136,30 @@ void MetavoxelServer::maybeDeleteSession(const SharedNodePointer& node) { } } -void MetavoxelServer::sendDeltas() { - // send deltas for all sessions +MetavoxelSender::MetavoxelSender(MetavoxelServer* server) : + _server(server), + _sendTimer(this) { + + _sendTimer.setSingleShot(true); + connect(&_sendTimer, SIGNAL(timeout()), SLOT(sendDeltas())); + + connect(_server, &MetavoxelServer::dataChanged, this, &MetavoxelSender::setData); +} + +void MetavoxelSender::start() { + _lastSend = QDateTime::currentMSecsSinceEpoch(); + _sendTimer.start(SEND_INTERVAL); +} + +void MetavoxelSender::sendDeltas() { + // send deltas for all sessions associated with our thread foreach (const SharedNodePointer& node, NodeList::getInstance()->getNodeHash()) { if (node->getType() == NodeType::Agent) { - static_cast(node->getLinkedData())->update(); + QMutexLocker locker(&node->getMutex()); + MetavoxelSession* session = static_cast(node->getLinkedData()); + if (session && session->thread() == QThread::currentThread()) { + session->update(); + } } } @@ -124,9 +171,9 @@ void MetavoxelServer::sendDeltas() { _sendTimer.start(qMax(0, 2 * SEND_INTERVAL - qMax(elapsed, SEND_INTERVAL))); } -MetavoxelSession::MetavoxelSession(const SharedNodePointer& node, MetavoxelServer* server) : +MetavoxelSession::MetavoxelSession(const SharedNodePointer& node, MetavoxelSender* sender) : Endpoint(node, new PacketRecord(), NULL), - _server(server), + _sender(sender), _reliableDeltaChannel(NULL), _reliableDeltaID(0) { @@ -150,7 +197,7 @@ void MetavoxelSession::update() { int start = _sequencer.getOutputStream().getUnderlying().device()->pos(); out << QVariant::fromValue(MetavoxelDeltaMessage()); PacketRecord* sendRecord = getLastAcknowledgedSendRecord(); - _server->getData().writeDelta(sendRecord->getData(), sendRecord->getLOD(), out, _lod); + _sender->getData().writeDelta(sendRecord->getData(), sendRecord->getLOD(), out, _lod); out.flush(); int end = _sequencer.getOutputStream().getUnderlying().device()->pos(); if (end > _sequencer.getMaxPacketSize()) { @@ -162,7 +209,7 @@ void MetavoxelSession::update() { _reliableDeltaWriteMappings = out.getAndResetWriteMappings(); _reliableDeltaReceivedOffset = _reliableDeltaChannel->getBytesWritten(); - _reliableDeltaData = _server->getData(); + _reliableDeltaData = _sender->getData(); _reliableDeltaLOD = _lod; // go back to the beginning with the current packet and note that there's a delta pending @@ -185,7 +232,7 @@ void MetavoxelSession::handleMessage(const QVariant& message, Bitstream& in) { PacketRecord* MetavoxelSession::maybeCreateSendRecord() const { return _reliableDeltaChannel ? new PacketRecord(_reliableDeltaLOD, _reliableDeltaData) : - new PacketRecord(_lod, _server->getData()); + new PacketRecord(_lod, _sender->getData()); } void MetavoxelSession::handleMessage(const QVariant& message) { @@ -195,7 +242,8 @@ void MetavoxelSession::handleMessage(const QVariant& message) { _lod = state.lod; } else if (userType == MetavoxelEditMessage::Type) { - _server->applyEdit(message.value()); + QMetaObject::invokeMethod(_sender->getServer(), "applyEdit", Q_ARG(const MetavoxelEditMessage&, + message.value())); } else if (userType == QMetaType::QVariantList) { foreach (const QVariant& element, message.toList()) { diff --git a/assignment-client/src/metavoxels/MetavoxelServer.h b/assignment-client/src/metavoxels/MetavoxelServer.h index 42694b03bf..45478a4946 100644 --- a/assignment-client/src/metavoxels/MetavoxelServer.h +++ b/assignment-client/src/metavoxels/MetavoxelServer.h @@ -32,28 +32,60 @@ public: MetavoxelServer(const QByteArray& packet); - void applyEdit(const MetavoxelEditMessage& edit); + Q_INVOKABLE void applyEdit(const MetavoxelEditMessage& edit); const MetavoxelData& getData() const { return _data; } - Q_INVOKABLE void setData(const MetavoxelData& data) { _data = data; } + Q_INVOKABLE void setData(const MetavoxelData& data); virtual void run(); virtual void readPendingDatagrams(); virtual void aboutToFinish(); - + +signals: + + void dataChanged(const MetavoxelData& data); + private slots: void maybeAttachSession(const SharedNodePointer& node); - void maybeDeleteSession(const SharedNodePointer& node); - void sendDeltas(); + void maybeDeleteSession(const SharedNodePointer& node); private: + QVector _senders; + int _nextSender; + MetavoxelPersister* _persister; + MetavoxelData _data; +}; + +/// Handles update sending for one thread. +class MetavoxelSender : public QObject { + Q_OBJECT + +public: + + MetavoxelSender(MetavoxelServer* server); + + MetavoxelServer* getServer() const { return _server; } + + const MetavoxelData& getData() const { return _data; } + + Q_INVOKABLE void start(); + +private slots: + + void setData(const MetavoxelData& data) { _data = data; } + void sendDeltas(); + +private: + + MetavoxelServer* _server; + QTimer _sendTimer; qint64 _lastSend; @@ -66,7 +98,7 @@ class MetavoxelSession : public Endpoint { public: - MetavoxelSession(const SharedNodePointer& node, MetavoxelServer* server); + MetavoxelSession(const SharedNodePointer& node, MetavoxelSender* sender); virtual void update(); @@ -85,7 +117,7 @@ private: void sendPacketGroup(int alreadySent = 0); - MetavoxelServer* _server; + MetavoxelSender* _sender; MetavoxelLOD _lod; diff --git a/libraries/metavoxels/src/DatagramSequencer.cpp b/libraries/metavoxels/src/DatagramSequencer.cpp index c757e131bb..be3d684a3d 100644 --- a/libraries/metavoxels/src/DatagramSequencer.cpp +++ b/libraries/metavoxels/src/DatagramSequencer.cpp @@ -29,7 +29,7 @@ const float DEFAULT_SLOW_START_THRESHOLD = 1000.0f; DatagramSequencer::DatagramSequencer(const QByteArray& datagramHeader, QObject* parent) : QObject(parent), _outgoingPacketStream(&_outgoingPacketData, QIODevice::WriteOnly), - _outputStream(_outgoingPacketStream), + _outputStream(_outgoingPacketStream, Bitstream::NO_METADATA, Bitstream::NO_GENERICS, this), _incomingDatagramStream(&_incomingDatagramBuffer), _datagramHeaderSize(datagramHeader.size()), _outgoingPacketNumber(0), @@ -38,7 +38,7 @@ DatagramSequencer::DatagramSequencer(const QByteArray& datagramHeader, QObject* _outgoingDatagramStream(&_outgoingDatagramBuffer), _incomingPacketNumber(0), _incomingPacketStream(&_incomingPacketData, QIODevice::ReadOnly), - _inputStream(_incomingPacketStream), + _inputStream(_incomingPacketStream, Bitstream::NO_METADATA, Bitstream::NO_GENERICS, this), _receivedHighPriorityMessages(0), _maxPacketSize(DEFAULT_MAX_PACKET_SIZE), _packetsPerGroup(1.0f), @@ -752,7 +752,7 @@ ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool o _index(index), _output(output), _dataStream(&_buffer), - _bitstream(_dataStream), + _bitstream(_dataStream, Bitstream::NO_METADATA, Bitstream::NO_GENERICS, this), _priority(1.0f), _offset(0), _writePosition(0), diff --git a/libraries/metavoxels/src/Endpoint.cpp b/libraries/metavoxels/src/Endpoint.cpp index 9672b245cc..65e088c75e 100644 --- a/libraries/metavoxels/src/Endpoint.cpp +++ b/libraries/metavoxels/src/Endpoint.cpp @@ -15,7 +15,7 @@ Endpoint::Endpoint(const SharedNodePointer& node, PacketRecord* baselineSendRecord, PacketRecord* baselineReceiveRecord) : _node(node), - _sequencer(byteArrayWithPopulatedHeader(PacketTypeMetavoxelData)) { + _sequencer(byteArrayWithPopulatedHeader(PacketTypeMetavoxelData), this) { connect(&_sequencer, SIGNAL(readyToWrite(const QByteArray&)), SLOT(sendDatagram(const QByteArray&))); connect(&_sequencer, SIGNAL(readyToRead(Bitstream&)), SLOT(readMessage(Bitstream&)));