Working on distributing metavoxel session handling over multiple threads.

This commit is contained in:
Andrzej Kapolka 2014-07-21 20:03:50 -07:00
parent 0ddd3e650c
commit 3ea7e79622
4 changed files with 109 additions and 29 deletions

View file

@ -25,14 +25,19 @@ const int SEND_INTERVAL = 50;
MetavoxelServer::MetavoxelServer(const QByteArray& packet) :
ThreadedAssignment(packet),
_sendTimer(this) {
_sendTimer.setSingleShot(true);
connect(&_sendTimer, SIGNAL(timeout()), SLOT(sendDeltas()));
_nextSender(0) {
}
void MetavoxelServer::applyEdit(const MetavoxelEditMessage& edit) {
edit.apply(_data, SharedObject::getWeakHash());
MetavoxelData data = _data;
edit.apply(data, SharedObject::getWeakHash());
setData(data);
}
void MetavoxelServer::setData(const MetavoxelData& data) {
if (_data != data) {
emit dataChanged(_data = data);
}
}
const QString METAVOXEL_SERVER_LOGGING_NAME = "metavoxel-server";
@ -46,12 +51,26 @@ void MetavoxelServer::run() {
connect(nodeList, SIGNAL(nodeAdded(SharedNodePointer)), SLOT(maybeAttachSession(const SharedNodePointer&)));
connect(nodeList, SIGNAL(nodeKilled(SharedNodePointer)), SLOT(maybeDeleteSession(const SharedNodePointer&)));
_lastSend = QDateTime::currentMSecsSinceEpoch();
_sendTimer.start(SEND_INTERVAL);
// initialize Bitstream before using it in multiple threads
Bitstream::preThreadingInit();
// create the senders, each with its own thread
int threadCount = QThread::idealThreadCount();
if (threadCount == -1) {
const int DEFAULT_THREAD_COUNT = 4;
threadCount = DEFAULT_THREAD_COUNT;
}
qDebug() << "Creating" << threadCount << "sender threads";
for (int i = 0; i < threadCount; i++) {
QThread* thread = new QThread(this);
MetavoxelSender* sender = new MetavoxelSender(this);
sender->moveToThread(thread);
sender->connect(thread, SIGNAL(finished()), SLOT(deleteLater()));
thread->start();
QMetaObject::invokeMethod(sender, "start");
_senders.append(sender);
}
// create the persister and start it in its own thread
_persister = new MetavoxelPersister(this);
QThread* persistenceThread = new QThread(this);
@ -86,6 +105,11 @@ void MetavoxelServer::readPendingDatagrams() {
void MetavoxelServer::aboutToFinish() {
QMetaObject::invokeMethod(_persister, "save", Q_ARG(const MetavoxelData&, _data));
foreach (MetavoxelSender* sender, _senders) {
sender->thread()->quit();
sender->thread()->wait();
}
_persister->thread()->quit();
_persister->thread()->wait();
}
@ -93,7 +117,11 @@ void MetavoxelServer::aboutToFinish() {
void MetavoxelServer::maybeAttachSession(const SharedNodePointer& node) {
if (node->getType() == NodeType::Agent) {
QMutexLocker locker(&node->getMutex());
node->setLinkedData(new MetavoxelSession(node, this));
MetavoxelSender* sender = _senders.at(_nextSender);
_nextSender = (_nextSender + 1) % _senders.size();
MetavoxelSession* session = new MetavoxelSession(node, sender);
session->moveToThread(sender->thread());
node->setLinkedData(session);
}
}
@ -108,11 +136,30 @@ void MetavoxelServer::maybeDeleteSession(const SharedNodePointer& node) {
}
}
void MetavoxelServer::sendDeltas() {
// send deltas for all sessions
MetavoxelSender::MetavoxelSender(MetavoxelServer* server) :
_server(server),
_sendTimer(this) {
_sendTimer.setSingleShot(true);
connect(&_sendTimer, SIGNAL(timeout()), SLOT(sendDeltas()));
connect(_server, &MetavoxelServer::dataChanged, this, &MetavoxelSender::setData);
}
void MetavoxelSender::start() {
_lastSend = QDateTime::currentMSecsSinceEpoch();
_sendTimer.start(SEND_INTERVAL);
}
void MetavoxelSender::sendDeltas() {
// send deltas for all sessions associated with our thread
foreach (const SharedNodePointer& node, NodeList::getInstance()->getNodeHash()) {
if (node->getType() == NodeType::Agent) {
static_cast<MetavoxelSession*>(node->getLinkedData())->update();
QMutexLocker locker(&node->getMutex());
MetavoxelSession* session = static_cast<MetavoxelSession*>(node->getLinkedData());
if (session && session->thread() == QThread::currentThread()) {
session->update();
}
}
}
@ -124,9 +171,9 @@ void MetavoxelServer::sendDeltas() {
_sendTimer.start(qMax(0, 2 * SEND_INTERVAL - qMax(elapsed, SEND_INTERVAL)));
}
MetavoxelSession::MetavoxelSession(const SharedNodePointer& node, MetavoxelServer* server) :
MetavoxelSession::MetavoxelSession(const SharedNodePointer& node, MetavoxelSender* sender) :
Endpoint(node, new PacketRecord(), NULL),
_server(server),
_sender(sender),
_reliableDeltaChannel(NULL),
_reliableDeltaID(0) {
@ -150,7 +197,7 @@ void MetavoxelSession::update() {
int start = _sequencer.getOutputStream().getUnderlying().device()->pos();
out << QVariant::fromValue(MetavoxelDeltaMessage());
PacketRecord* sendRecord = getLastAcknowledgedSendRecord();
_server->getData().writeDelta(sendRecord->getData(), sendRecord->getLOD(), out, _lod);
_sender->getData().writeDelta(sendRecord->getData(), sendRecord->getLOD(), out, _lod);
out.flush();
int end = _sequencer.getOutputStream().getUnderlying().device()->pos();
if (end > _sequencer.getMaxPacketSize()) {
@ -162,7 +209,7 @@ void MetavoxelSession::update() {
_reliableDeltaWriteMappings = out.getAndResetWriteMappings();
_reliableDeltaReceivedOffset = _reliableDeltaChannel->getBytesWritten();
_reliableDeltaData = _server->getData();
_reliableDeltaData = _sender->getData();
_reliableDeltaLOD = _lod;
// go back to the beginning with the current packet and note that there's a delta pending
@ -185,7 +232,7 @@ void MetavoxelSession::handleMessage(const QVariant& message, Bitstream& in) {
PacketRecord* MetavoxelSession::maybeCreateSendRecord() const {
return _reliableDeltaChannel ? new PacketRecord(_reliableDeltaLOD, _reliableDeltaData) :
new PacketRecord(_lod, _server->getData());
new PacketRecord(_lod, _sender->getData());
}
void MetavoxelSession::handleMessage(const QVariant& message) {
@ -195,7 +242,8 @@ void MetavoxelSession::handleMessage(const QVariant& message) {
_lod = state.lod;
} else if (userType == MetavoxelEditMessage::Type) {
_server->applyEdit(message.value<MetavoxelEditMessage>());
QMetaObject::invokeMethod(_sender->getServer(), "applyEdit", Q_ARG(const MetavoxelEditMessage&,
message.value<MetavoxelEditMessage>()));
} else if (userType == QMetaType::QVariantList) {
foreach (const QVariant& element, message.toList()) {

View file

@ -32,28 +32,60 @@ public:
MetavoxelServer(const QByteArray& packet);
void applyEdit(const MetavoxelEditMessage& edit);
Q_INVOKABLE void applyEdit(const MetavoxelEditMessage& edit);
const MetavoxelData& getData() const { return _data; }
Q_INVOKABLE void setData(const MetavoxelData& data) { _data = data; }
Q_INVOKABLE void setData(const MetavoxelData& data);
virtual void run();
virtual void readPendingDatagrams();
virtual void aboutToFinish();
signals:
void dataChanged(const MetavoxelData& data);
private slots:
void maybeAttachSession(const SharedNodePointer& node);
void maybeDeleteSession(const SharedNodePointer& node);
void sendDeltas();
void maybeDeleteSession(const SharedNodePointer& node);
private:
QVector<MetavoxelSender*> _senders;
int _nextSender;
MetavoxelPersister* _persister;
MetavoxelData _data;
};
/// Handles update sending for one thread.
class MetavoxelSender : public QObject {
Q_OBJECT
public:
MetavoxelSender(MetavoxelServer* server);
MetavoxelServer* getServer() const { return _server; }
const MetavoxelData& getData() const { return _data; }
Q_INVOKABLE void start();
private slots:
void setData(const MetavoxelData& data) { _data = data; }
void sendDeltas();
private:
MetavoxelServer* _server;
QTimer _sendTimer;
qint64 _lastSend;
@ -66,7 +98,7 @@ class MetavoxelSession : public Endpoint {
public:
MetavoxelSession(const SharedNodePointer& node, MetavoxelServer* server);
MetavoxelSession(const SharedNodePointer& node, MetavoxelSender* sender);
virtual void update();
@ -85,7 +117,7 @@ private:
void sendPacketGroup(int alreadySent = 0);
MetavoxelServer* _server;
MetavoxelSender* _sender;
MetavoxelLOD _lod;

View file

@ -29,7 +29,7 @@ const float DEFAULT_SLOW_START_THRESHOLD = 1000.0f;
DatagramSequencer::DatagramSequencer(const QByteArray& datagramHeader, QObject* parent) :
QObject(parent),
_outgoingPacketStream(&_outgoingPacketData, QIODevice::WriteOnly),
_outputStream(_outgoingPacketStream),
_outputStream(_outgoingPacketStream, Bitstream::NO_METADATA, Bitstream::NO_GENERICS, this),
_incomingDatagramStream(&_incomingDatagramBuffer),
_datagramHeaderSize(datagramHeader.size()),
_outgoingPacketNumber(0),
@ -38,7 +38,7 @@ DatagramSequencer::DatagramSequencer(const QByteArray& datagramHeader, QObject*
_outgoingDatagramStream(&_outgoingDatagramBuffer),
_incomingPacketNumber(0),
_incomingPacketStream(&_incomingPacketData, QIODevice::ReadOnly),
_inputStream(_incomingPacketStream),
_inputStream(_incomingPacketStream, Bitstream::NO_METADATA, Bitstream::NO_GENERICS, this),
_receivedHighPriorityMessages(0),
_maxPacketSize(DEFAULT_MAX_PACKET_SIZE),
_packetsPerGroup(1.0f),
@ -752,7 +752,7 @@ ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool o
_index(index),
_output(output),
_dataStream(&_buffer),
_bitstream(_dataStream),
_bitstream(_dataStream, Bitstream::NO_METADATA, Bitstream::NO_GENERICS, this),
_priority(1.0f),
_offset(0),
_writePosition(0),

View file

@ -15,7 +15,7 @@
Endpoint::Endpoint(const SharedNodePointer& node, PacketRecord* baselineSendRecord, PacketRecord* baselineReceiveRecord) :
_node(node),
_sequencer(byteArrayWithPopulatedHeader(PacketTypeMetavoxelData)) {
_sequencer(byteArrayWithPopulatedHeader(PacketTypeMetavoxelData), this) {
connect(&_sequencer, SIGNAL(readyToWrite(const QByteArray&)), SLOT(sendDatagram(const QByteArray&)));
connect(&_sequencer, SIGNAL(readyToRead(Bitstream&)), SLOT(readMessage(Bitstream&)));