diff --git a/libraries/metavoxels/src/DatagramSequencer.cpp b/libraries/metavoxels/src/DatagramSequencer.cpp index 9c300113c6..26354bc9f5 100644 --- a/libraries/metavoxels/src/DatagramSequencer.cpp +++ b/libraries/metavoxels/src/DatagramSequencer.cpp @@ -311,6 +311,174 @@ void DatagramSequencer::handleHighPriorityMessage(const QVariant& data) { } } +const int INITIAL_CIRCULAR_BUFFER_CAPACITY = 16; + +CircularBuffer::CircularBuffer(QObject* parent) : + QIODevice(parent), + _data(INITIAL_CIRCULAR_BUFFER_CAPACITY, 0), + _position(0), + _size(0), + _offset(0) { +} + +void CircularBuffer::append(const char* data, int length) { + // resize to fit + int oldSize = _size; + resize(_size + length); + + // write our data in up to two segments: one from the position to the end, one from the beginning + int end = (_position + oldSize) % _data.size(); + int firstSegment = qMin(length, _data.size() - end); + memcpy(_data.data() + end, data, firstSegment); + int secondSegment = length - firstSegment; + if (secondSegment > 0) { + memcpy(_data.data(), data + firstSegment, secondSegment); + } +} + +void CircularBuffer::remove(int length) { + _position = (_position + length) % _data.size(); + _size -= length; +} + +QByteArray CircularBuffer::readBytes(int offset, int length) const { + // write in up to two segments + QByteArray array; + int start = (_position + offset) % _data.size(); + int firstSegment = qMin(length, _data.size() - start); + array.append(_data.constData() + start, firstSegment); + int secondSegment = length - firstSegment; + if (secondSegment > 0) { + array.append(_data.constData(), secondSegment); + } + return array; +} + +void CircularBuffer::writeToStream(int offset, int length, QDataStream& out) const { + // write in up to two segments + int start = (_position + offset) % _data.size(); + int firstSegment = qMin(length, _data.size() - start); + out.writeRawData(_data.constData() + start, firstSegment); + int secondSegment = length - firstSegment; + if (secondSegment > 0) { + out.writeRawData(_data.constData(), secondSegment); + } +} + +void CircularBuffer::readFromStream(int offset, int length, QDataStream& in) { + // resize to fit + int requiredSize = offset + length; + if (requiredSize > _size) { + resize(requiredSize); + } + + // read in up to two segments + int start = (_position + offset) % _data.size(); + int firstSegment = qMin(length, _data.size() - start); + in.readRawData(_data.data() + start, firstSegment); + int secondSegment = length - firstSegment; + if (secondSegment > 0) { + in.readRawData(_data.data(), secondSegment); + } +} + +void CircularBuffer::appendToBuffer(int offset, int length, CircularBuffer& buffer) const { + // append in up to two segments + int start = (_position + offset) % _data.size(); + int firstSegment = qMin(length, _data.size() - start); + buffer.append(_data.constData() + start, firstSegment); + int secondSegment = length - firstSegment; + if (secondSegment > 0) { + buffer.append(_data.constData(), secondSegment); + } +} + +bool CircularBuffer::atEnd() const { + return _offset >= _size; +} + +bool CircularBuffer::canReadLine() const { + for (int offset = _offset; offset < _size; offset++) { + if (_data.at((_position + offset) % _data.size()) == '\n') { + return true; + } + } + return false; +} + +bool CircularBuffer::open(OpenMode flags) { + return QIODevice::open(flags | QIODevice::Unbuffered); +} + +qint64 CircularBuffer::pos() const { + return _offset; +} + +bool CircularBuffer::seek(qint64 pos) { + if (pos < 0 || pos > _size) { + return false; + } + _offset = pos; + return true; +} + +qint64 CircularBuffer::size() const { + return _size; +} + +qint64 CircularBuffer::readData(char* data, qint64 length) { + int readable = qMin((int)length, _size - _offset); + + // read in up to two segments + int start = (_position + _offset) % _data.size(); + int firstSegment = qMin((int)length, _data.size() - start); + memcpy(data, _data.constData() + start, firstSegment); + int secondSegment = length - firstSegment; + if (secondSegment > 0) { + memcpy(data + firstSegment, _data.constData(), secondSegment); + } + _offset += readable; + return readable; +} + +qint64 CircularBuffer::writeData(const char* data, qint64 length) { + // resize to fit + int requiredSize = _offset + length; + if (requiredSize > _size) { + resize(requiredSize); + } + + // write in up to two segments + int start = (_position + _offset) % _data.size(); + int firstSegment = qMin((int)length, _data.size() - start); + memcpy(_data.data() + start, data, firstSegment); + int secondSegment = length - firstSegment; + if (secondSegment > 0) { + memcpy(_data.data(), data + firstSegment, secondSegment); + } + _offset += length; + return length; +} + +void CircularBuffer::resize(int size) { + if (size > _data.size()) { + // double our capacity until we can fit the desired length + int newCapacity = _data.size(); + do { + newCapacity *= 2; + } while (size > newCapacity); + + int oldCapacity = _data.size(); + _data.resize(newCapacity); + + int trailing = _position + _size - oldCapacity; + if (trailing > 0) { + memcpy(_data.data() + oldCapacity, _data.constData(), trailing); + } + } + _size = size; +} + SpanList::SpanList() : _totalSet(0) { } @@ -472,16 +640,16 @@ int ReliableChannel::writeSpan(QDataStream& out, bool& first, int position, int spans.append(span); out << (quint32)span.offset; out << (quint32)length; - out.writeRawData(_buffer.data().constData() + position, length); + _buffer.writeToStream(position, length, out); return length; } void ReliableChannel::spanAcknowledged(const DatagramSequencer::ChannelSpan& span) { int advancement = _acknowledged.set(span.offset - _offset, span.length); if (advancement > 0) { - // TODO: better way of pruning buffer - _buffer.buffer() = _buffer.buffer().right(_buffer.size() - advancement); + _buffer.remove(advancement); _buffer.seek(_buffer.size()); + _offset += advancement; _writePosition = qMax(_writePosition - advancement, 0); } @@ -490,38 +658,40 @@ void ReliableChannel::spanAcknowledged(const DatagramSequencer::ChannelSpan& spa void ReliableChannel::readData(QDataStream& in) { quint32 segments; in >> segments; + bool readSome = false; for (int i = 0; i < segments; i++) { quint32 offset, size; in >> offset >> size; int position = offset - _offset; int end = position + size; - if (_assemblyBuffer.size() < end) { - _assemblyBuffer.resize(end); - } if (end <= 0) { in.skipRawData(size); + } else if (position < 0) { in.skipRawData(-position); - in.readRawData(_assemblyBuffer.data(), size + position); + _assemblyBuffer.readFromStream(0, end, in); + } else { - in.readRawData(_assemblyBuffer.data() + position, size); + _assemblyBuffer.readFromStream(position, size, in); } int advancement = _acknowledged.set(position, size); if (advancement > 0) { - // TODO: better way of pruning buffer - _buffer.buffer().append(_assemblyBuffer.constData(), advancement); - emit _buffer.readyRead(); - _assemblyBuffer = _assemblyBuffer.right(_assemblyBuffer.size() - advancement); + _assemblyBuffer.appendToBuffer(0, advancement, _buffer); + _assemblyBuffer.remove(advancement); _offset += advancement; + readSome = true; } } - // when the read head is sufficiently advanced into the buffer, prune it off. this along - // with other buffer usages should be replaced with a circular buffer - const int PRUNE_SIZE = 8192; - if (_buffer.pos() > PRUNE_SIZE) { - _buffer.buffer() = _buffer.buffer().right(_buffer.size() - _buffer.pos()); + // let listeners know that there's data to read + if (readSome) { + emit _buffer.readyRead(); + } + + // prune any read data from the buffer + if (_buffer.pos() > 0) { + _buffer.remove((int)_buffer.pos()); _buffer.seek(0); } } diff --git a/libraries/metavoxels/src/DatagramSequencer.h b/libraries/metavoxels/src/DatagramSequencer.h index 0d9fc0d2b7..5afd5175a2 100644 --- a/libraries/metavoxels/src/DatagramSequencer.h +++ b/libraries/metavoxels/src/DatagramSequencer.h @@ -167,6 +167,55 @@ private: QHash _reliableInputChannels; }; +/// A circular buffer, where one may efficiently append data to the end or remove data from the beginning. +class CircularBuffer : public QIODevice { +public: + + CircularBuffer(QObject* parent = NULL); + + /// Appends data to the end of the buffer. + void append(const QByteArray& data) { append(data.constData(), data.size()); } + + /// Appends data to the end of the buffer. + void append(const char* data, int length); + + /// Removes data from the beginning of the buffer. + void remove(int length); + + /// Reads part of the data from the buffer. + QByteArray readBytes(int offset, int length) const; + + /// Writes part of the buffer to the supplied stream. + void writeToStream(int offset, int length, QDataStream& out) const; + + /// Reads part of the buffer from the supplied stream. + void readFromStream(int offset, int length, QDataStream& in); + + /// Appends part of the buffer to the supplied other buffer. + void appendToBuffer(int offset, int length, CircularBuffer& buffer) const; + + virtual bool atEnd() const; + virtual bool canReadLine() const; + virtual bool open(OpenMode flags); + virtual qint64 pos() const; + virtual bool seek(qint64 pos); + virtual qint64 size() const; + +protected: + + virtual qint64 readData(char* data, qint64 length); + virtual qint64 writeData(const char* data, qint64 length); + +private: + + void resize(int size); + + QByteArray _data; + int _position; + int _size; + int _offset; +}; + /// A list of contiguous spans, alternating between set and unset. Conceptually, the list is preceeded by a set /// span of infinite length and followed by an unset span of infinite length. Within those bounds, it alternates /// between unset and set. @@ -208,7 +257,7 @@ public: int getIndex() const { return _index; } - QBuffer& getBuffer() { return _buffer; } + CircularBuffer& getBuffer() { return _buffer; } QDataStream& getDataStream() { return _dataStream; } Bitstream& getBitstream() { return _bitstream; } @@ -238,8 +287,8 @@ private: void readData(QDataStream& in); int _index; - QBuffer _buffer; - QByteArray _assemblyBuffer; + CircularBuffer _buffer; + CircularBuffer _assemblyBuffer; QDataStream _dataStream; Bitstream _bitstream; float _priority; diff --git a/tests/metavoxels/src/MetavoxelTests.cpp b/tests/metavoxels/src/MetavoxelTests.cpp index edfc45ac0d..7c09e60ef9 100644 --- a/tests/metavoxels/src/MetavoxelTests.cpp +++ b/tests/metavoxels/src/MetavoxelTests.cpp @@ -8,7 +8,6 @@ #include -#include #include #include "MetavoxelTests.h" @@ -88,9 +87,9 @@ Endpoint::Endpoint(const QByteArray& datagramHeader) : output->setPriority(0.25f); const int MIN_LOW_PRIORITY_DATA = 100000; const int MAX_LOW_PRIORITY_DATA = 200000; - _lowPriorityDataStreamed = createRandomBytes(MIN_LOW_PRIORITY_DATA, MAX_LOW_PRIORITY_DATA); - output->getBuffer().write(_lowPriorityDataStreamed); - lowPriorityStreamedBytesSent += _lowPriorityDataStreamed.size(); + _lowPriorityDataStreamed.append(createRandomBytes(MIN_LOW_PRIORITY_DATA, MAX_LOW_PRIORITY_DATA)); + //output->getBuffer().write(_lowPriorityDataStreamed); + //lowPriorityStreamedBytesSent += _lowPriorityDataStreamed.size(); } static QVariant createRandomMessage() { @@ -210,26 +209,26 @@ void Endpoint::readMessage(Bitstream& in) { } void Endpoint::readReliableChannel() { - QByteArray bytes = _sequencer->getReliableInputChannel()->getBuffer().readAll(); + CircularBuffer& buffer = _sequencer->getReliableInputChannel()->getBuffer(); + QByteArray bytes = buffer.read(buffer.bytesAvailable()); if (_other->_dataStreamed.size() < bytes.size()) { throw QString("Received unsent/already sent streamed data."); } - QByteArray compare = _other->_dataStreamed; - _other->_dataStreamed = _other->_dataStreamed.mid(bytes.size()); - compare.truncate(bytes.size()); + QByteArray compare = _other->_dataStreamed.readBytes(0, bytes.size()); + _other->_dataStreamed.remove(bytes.size()); if (compare != bytes) { throw QString("Sent/received streamed data mismatch."); } } void Endpoint::readLowPriorityReliableChannel() { - QByteArray bytes = _sequencer->getReliableInputChannel(1)->getBuffer().readAll(); + CircularBuffer& buffer = _sequencer->getReliableInputChannel(1)->getBuffer(); + QByteArray bytes = buffer.read(buffer.bytesAvailable()); if (_other->_lowPriorityDataStreamed.size() < bytes.size()) { throw QString("Received unsent/already sent low-priority streamed data."); } - QByteArray compare = _other->_lowPriorityDataStreamed; - _other->_lowPriorityDataStreamed = _other->_lowPriorityDataStreamed.mid(bytes.size()); - compare.truncate(bytes.size()); + QByteArray compare = _other->_lowPriorityDataStreamed.readBytes(0, bytes.size()); + _other->_lowPriorityDataStreamed.remove(bytes.size()); if (compare != bytes) { throw QString("Sent/received low-priority streamed data mismatch."); } diff --git a/tests/metavoxels/src/MetavoxelTests.h b/tests/metavoxels/src/MetavoxelTests.h index 7c2bd29b2f..372718afc4 100644 --- a/tests/metavoxels/src/MetavoxelTests.h +++ b/tests/metavoxels/src/MetavoxelTests.h @@ -12,9 +12,8 @@ #include #include -#include +#include -class DatagramSequencer; class SequencedTestMessage; /// Tests various aspects of the metavoxel library. @@ -59,8 +58,8 @@ private: float _highPriorityMessagesToSend; QVariantList _highPriorityMessagesSent; QList _unreliableMessagesSent; - QByteArray _dataStreamed; - QByteArray _lowPriorityDataStreamed; + CircularBuffer _dataStreamed; + CircularBuffer _lowPriorityDataStreamed; }; /// A simple test message.