diff --git a/libraries/metavoxels/src/DatagramSequencer.cpp b/libraries/metavoxels/src/DatagramSequencer.cpp index fcbe6b5e87..77f893ccb6 100644 --- a/libraries/metavoxels/src/DatagramSequencer.cpp +++ b/libraries/metavoxels/src/DatagramSequencer.cpp @@ -343,16 +343,32 @@ void CircularBuffer::remove(int length) { } QByteArray CircularBuffer::readBytes(int offset, int length) const { - // write in up to two segments + QByteArray bytes(length, 0); + readBytes(offset, length, bytes.data()); + return bytes; +} + +void CircularBuffer::readBytes(int offset, int length, char* data) const { + // read in up to two segments QByteArray array; int start = (_position + offset) % _data.size(); int firstSegment = qMin(length, _data.size() - start); - array.append(_data.constData() + start, firstSegment); + memcpy(data, _data.constData() + start, firstSegment); int secondSegment = length - firstSegment; if (secondSegment > 0) { - array.append(_data.constData(), secondSegment); + memcpy(data + firstSegment, _data.constData(), secondSegment); + } +} + +void CircularBuffer::writeBytes(int offset, int length, const char* data) { + // write in up to two segments + int start = (_position + offset) % _data.size(); + int firstSegment = qMin(length, _data.size() - start); + memcpy(_data.data() + start, data, firstSegment); + int secondSegment = length - firstSegment; + if (secondSegment > 0) { + memcpy(_data.data(), data + firstSegment, secondSegment); } - return array; } void CircularBuffer::writeToStream(int offset, int length, QDataStream& out) const { @@ -561,7 +577,14 @@ int ReliableChannel::getBytesAvailable() const { } void ReliableChannel::sendMessage(const QVariant& message) { + // write a placeholder for the length, then fill it in when we know what it is + int placeholder = _buffer.pos(); + _dataStream << (quint32)0; _bitstream << message; + _bitstream.flush(); + + quint32 length = _buffer.pos() - placeholder; + _buffer.writeBytes(placeholder, sizeof(quint32), (const char*)&length); } void ReliableChannel::sendClearSharedObjectMessage(int id) { @@ -576,7 +599,8 @@ ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool o _bitstream(_dataStream), _priority(1.0f), _offset(0), - _writePosition(0) { + _writePosition(0), + _expectingMessage(true) { _buffer.open(output ? QIODevice::WriteOnly : QIODevice::ReadOnly); _dataStream.setByteOrder(QDataStream::LittleEndian); @@ -688,10 +712,32 @@ void ReliableChannel::readData(QDataStream& in) { readSome = true; } } + if (!readSome) { + return; + } - // let listeners know that there's data to read - if (readSome) { - emit _buffer.readyRead(); + forever { + // if we're expecting a message, peek into the buffer to see if we have the whole thing. + // if so, read it in, handle it, and loop back around in case there are more + if (_expectingMessage) { + int available = _buffer.bytesAvailable(); + if (available >= sizeof(quint32)) { + quint32 length; + _buffer.readBytes(_buffer.pos(), sizeof(quint32), (char*)&length); + if (available >= length) { + _dataStream.skipRawData(sizeof(quint32)); + QVariant message; + _bitstream >> message; + _bitstream.reset(); + handleMessage(message); + continue; + } + } + // otherwise, just let whoever's listening know that data is available + } else { + emit _buffer.readyRead(); + } + break; } // prune any read data from the buffer @@ -701,3 +747,11 @@ void ReliableChannel::readData(QDataStream& in) { } } +void ReliableChannel::handleMessage(const QVariant& message) { + if (message.userType() == ClearSharedObjectMessage::Type) { + _bitstream.clearSharedObject(message.value().id); + + } else { + emit receivedMessage(message); + } +} diff --git a/libraries/metavoxels/src/DatagramSequencer.h b/libraries/metavoxels/src/DatagramSequencer.h index 27a4f05379..9adebfbfa4 100644 --- a/libraries/metavoxels/src/DatagramSequencer.h +++ b/libraries/metavoxels/src/DatagramSequencer.h @@ -185,6 +185,12 @@ public: /// Reads part of the data from the buffer. QByteArray readBytes(int offset, int length) const; + /// Reads part of the data from the buffer. + void readBytes(int offset, int length, char* data) const; + + /// Writes to part of the data in the buffer. + void writeBytes(int offset, int length, const char* data); + /// Writes part of the buffer to the supplied stream. void writeToStream(int offset, int length, QDataStream& out) const; @@ -267,8 +273,16 @@ public: int getBytesAvailable() const; + /// Sends a framed message on this channel. void sendMessage(const QVariant& message); + /// For input channels, sets whether the channel is expecting a framed message. + void setExpectingMessage(bool expectingMessage) { _expectingMessage = expectingMessage; } + +signals: + + void receivedMessage(const QVariant& message); + private slots: void sendClearSharedObjectMessage(int id); @@ -286,6 +300,7 @@ private: void spanAcknowledged(const DatagramSequencer::ChannelSpan& span); void readData(QDataStream& in); + void handleMessage(const QVariant& message); int _index; CircularBuffer _buffer; @@ -297,6 +312,7 @@ private: int _offset; int _writePosition; SpanList _acknowledged; + bool _expectingMessage; }; #endif /* defined(__interface__DatagramSequencer__) */ diff --git a/tests/metavoxels/src/MetavoxelTests.cpp b/tests/metavoxels/src/MetavoxelTests.cpp index 15d7463742..49ff5714c0 100644 --- a/tests/metavoxels/src/MetavoxelTests.cpp +++ b/tests/metavoxels/src/MetavoxelTests.cpp @@ -83,8 +83,14 @@ Endpoint::Endpoint(const QByteArray& datagramHeader) : connect(_sequencer, SIGNAL(readyToRead(Bitstream&)), SLOT(readMessage(Bitstream&))); connect(_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)), SLOT(handleHighPriorityMessage(const QVariant&))); - connect(&_sequencer->getReliableInputChannel()->getBuffer(), SIGNAL(readyRead()), SLOT(readReliableChannel())); - connect(&_sequencer->getReliableInputChannel(1)->getBuffer(), SIGNAL(readyRead()), SLOT(readLowPriorityReliableChannel())); + + ReliableChannel* firstInput = _sequencer->getReliableInputChannel(); + firstInput->setExpectingMessage(false); + connect(&firstInput->getBuffer(), SIGNAL(readyRead()), SLOT(readReliableChannel())); + + ReliableChannel* secondInput = _sequencer->getReliableInputChannel(1); + secondInput->setExpectingMessage(false); + connect(&secondInput->getBuffer(), SIGNAL(readyRead()), SLOT(readLowPriorityReliableChannel())); // enqueue a large amount of data in a low-priority channel ReliableChannel* output = _sequencer->getReliableOutputChannel(1);