diff --git a/libraries/metavoxels/src/Bitstream.h b/libraries/metavoxels/src/Bitstream.h index 6122c5c3be..fe8b296f61 100644 --- a/libraries/metavoxels/src/Bitstream.h +++ b/libraries/metavoxels/src/Bitstream.h @@ -69,6 +69,8 @@ public: int takePersistentID(T value) { return _persistentIDs.take(value); } + void removePersistentValue(int id) { _persistentValues.remove(id); } + RepeatedValueStreamer& operator<<(T value); RepeatedValueStreamer& operator>>(T& value); @@ -227,6 +229,9 @@ public: /// Persists a set of read mappings recorded earlier. void persistReadMappings(const ReadMappings& mappings); + /// Removes a shared object from the read mappings. + void clearSharedObject(int id) { _sharedObjectStreamer.removePersistentValue(id); } + Bitstream& operator<<(bool value); Bitstream& operator>>(bool& value); diff --git a/libraries/metavoxels/src/DatagramSequencer.cpp b/libraries/metavoxels/src/DatagramSequencer.cpp index b51f573712..e3e3d65838 100644 --- a/libraries/metavoxels/src/DatagramSequencer.cpp +++ b/libraries/metavoxels/src/DatagramSequencer.cpp @@ -11,6 +11,7 @@ #include #include "DatagramSequencer.h" +#include "MetavoxelMessages.h" const int MAX_DATAGRAM_SIZE = 1500; @@ -41,10 +42,6 @@ DatagramSequencer::DatagramSequencer(const QByteArray& datagramHeader) : memcpy(_outgoingDatagram.data(), datagramHeader.constData(), _datagramHeaderSize); } -void DatagramSequencer::sendReliableMessage(const QVariant& data, int channel) { - -} - void DatagramSequencer::sendHighPriorityMessage(const QVariant& data) { HighPriorityMessage message = { data, _outgoingPacketNumber + 1 }; _highPriorityMessages.append(message); @@ -53,7 +50,7 @@ void DatagramSequencer::sendHighPriorityMessage(const QVariant& data) { ReliableChannel* DatagramSequencer::getReliableOutputChannel(int index) { ReliableChannel*& channel = _reliableOutputChannels[index]; if (!channel) { - channel = new ReliableChannel(this); + channel = new ReliableChannel(this, index); } return channel; } @@ -61,7 +58,7 @@ ReliableChannel* DatagramSequencer::getReliableOutputChannel(int index) { ReliableChannel* DatagramSequencer::getReliableInputChannel(int index) { ReliableChannel*& channel = _reliableInputChannels[index]; if (!channel) { - channel = new ReliableChannel(this); + channel = new ReliableChannel(this, index); } return channel; } @@ -88,14 +85,15 @@ void DatagramSequencer::endPacket() { // if we have space remaining, send some data from our reliable channels int remaining = _maxPacketSize - _outgoingPacketStream.device()->pos(); - const int MINIMUM_RELIABLE_SIZE = sizeof(quint32) * 4; // count, channel number, offset, size + const int MINIMUM_RELIABLE_SIZE = sizeof(quint32) * 5; // count, channel number, segment count, offset, size + QVector spans; if (remaining > MINIMUM_RELIABLE_SIZE) { - appendReliableData(remaining); + appendReliableData(remaining, spans); } else { _outgoingPacketStream << (quint32)0; } - sendPacket(QByteArray::fromRawData(_outgoingPacketData.constData(), _outgoingPacketStream.device()->pos())); + sendPacket(QByteArray::fromRawData(_outgoingPacketData.constData(), _outgoingPacketStream.device()->pos()), spans); _outgoingPacketStream.device()->seek(0); } @@ -180,7 +178,7 @@ void DatagramSequencer::receivedDatagram(const QByteArray& datagram) { QVariant data; _inputStream >> data; if (i >= _receivedHighPriorityMessages) { - emit receivedHighPriorityMessage(data); + handleHighPriorityMessage(data); } } _receivedHighPriorityMessages = highPriorityMessageCount; @@ -206,7 +204,9 @@ void DatagramSequencer::receivedDatagram(const QByteArray& datagram) { } void DatagramSequencer::sendClearSharedObjectMessage(int id) { - qDebug() << "cleared " << id; + // for now, high priority + ClearSharedObjectMessage message = { id }; + sendHighPriorityMessage(QVariant::fromValue(message)); } void DatagramSequencer::sendRecordAcknowledged(const SendRecord& record) { @@ -227,18 +227,46 @@ void DatagramSequencer::sendRecordAcknowledged(const SendRecord& record) { break; } } -} - -void DatagramSequencer::appendReliableData(int bytes) { - _outgoingPacketStream << (quint32)0; - - for (QHash::const_iterator it = _reliableOutputChannels.constBegin(); - it != _reliableOutputChannels.constEnd(); it++) { - + + // acknowledge the received spans + foreach (const ChannelSpan& span, record.spans) { + getReliableOutputChannel(span.channel)->spanAcknowledged(span); } } -void DatagramSequencer::sendPacket(const QByteArray& packet) { +void DatagramSequencer::appendReliableData(int bytes, QVector& spans) { + // gather total number of bytes to write, priority + int totalBytes = 0; + float totalPriority = 0.0f; + int totalChannels = 0; + foreach (ReliableChannel* channel, _reliableOutputChannels) { + int channelBytes = channel->getBytesAvailable(); + if (channelBytes > 0) { + totalBytes += channelBytes; + totalPriority += channel->getPriority(); + totalChannels++; + } + } + _outgoingPacketStream << (quint32)totalChannels; + if (totalChannels == 0) { + return; + } + totalBytes = qMin(bytes, totalBytes); + + foreach (ReliableChannel* channel, _reliableOutputChannels) { + int channelBytes = channel->getBytesAvailable(); + if (channelBytes == 0) { + continue; + } + _outgoingPacketStream << (quint32)channel->getIndex(); + channelBytes = qMin(channelBytes, (int)(totalBytes * channel->getPriority() / totalPriority)); + channel->writeData(_outgoingPacketStream, channelBytes, spans); + totalBytes -= channelBytes; + totalPriority -= channel->getPriority(); + } +} + +void DatagramSequencer::sendPacket(const QByteArray& packet, const QVector& spans) { QIODeviceOpener opener(&_outgoingDatagramBuffer, QIODevice::WriteOnly); // increment the packet number @@ -246,7 +274,7 @@ void DatagramSequencer::sendPacket(const QByteArray& packet) { // record the send SendRecord record = { _outgoingPacketNumber, _receiveRecords.isEmpty() ? 0 : _receiveRecords.last().packetNumber, - _outputStream.getAndResetWriteMappings() }; + _outputStream.getAndResetWriteMappings(), spans }; _sendRecords.append(record); // write the sequence number and size, which are the same between all fragments @@ -271,18 +299,36 @@ void DatagramSequencer::sendPacket(const QByteArray& packet) { } while(offset < packet.size()); } +void DatagramSequencer::handleHighPriorityMessage(const QVariant& data) { + if (data.userType() == ClearSharedObjectMessage::Type) { + _inputStream.clearSharedObject(data.value().id); + + } else { + emit receivedHighPriorityMessage(data); + } +} + +int ReliableChannel::getBytesAvailable() const { + return _buffer.pos() - _sent; +} + void ReliableChannel::sendMessage(const QVariant& message) { _bitstream << message; } void ReliableChannel::sendClearSharedObjectMessage(int id) { + ClearSharedObjectMessage message = { id }; + sendMessage(QVariant::fromValue(message)); } -ReliableChannel::ReliableChannel(DatagramSequencer* sequencer) : +ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index) : QObject(sequencer), + _index(index), _dataStream(&_buffer), _bitstream(_dataStream), - _priority(1.0f) { + _priority(1.0f), + _offset(0), + _sent(0) { _buffer.open(QIODevice::WriteOnly); _dataStream.setByteOrder(QDataStream::LittleEndian); @@ -290,10 +336,61 @@ ReliableChannel::ReliableChannel(DatagramSequencer* sequencer) : connect(&_bitstream, SIGNAL(sharedObjectCleared(int)), SLOT(sendClearSharedObjectMessage(int))); } -void ReliableChannel::readData(QDataStream& in) { - quint32 offset, size; - in >> offset >> size; +void ReliableChannel::writeData(QDataStream& out, int bytes, QVector& spans) { + // determine how many spans we can send + int remainingBytes = bytes; + int position = 0; + int spanCount = 0; + foreach (const RemainingSpan& remainingSpan, _remainingSpans) { + if (remainingBytes == 0) { + break; + } + int spanBytes = qMin(remainingSpan.unacknowledged, remainingBytes); + remainingBytes -= spanBytes; + spanCount++; + position += remainingSpan.unacknowledged + remainingSpan.acknowledged; + } + if (remainingBytes > 0 && position < _buffer.pos()) { + spanCount++; + } + out << (quint32)spanCount; - in.skipRawData(size); + remainingBytes = bytes; + position = 0; + foreach (const RemainingSpan& remainingSpan, _remainingSpans) { + if (remainingBytes == 0) { + break; + } + int spanBytes = qMin(remainingSpan.unacknowledged, remainingBytes); + writeSpan(out, position, spanBytes, spans); + remainingBytes -= spanBytes; + position += remainingSpan.unacknowledged + remainingSpan.acknowledged; + } + if (remainingBytes > 0 && position < _buffer.pos()) { + int spanBytes = qMin((int)_buffer.pos() - position, remainingBytes); + writeSpan(out, position, spanBytes, spans); + } } + +void ReliableChannel::writeSpan(QDataStream& out, int position, int length, QVector& spans) { + DatagramSequencer::ChannelSpan span = { _index, _offset + position, length }; + spans.append(span); + out << (quint32)span.offset; + out << (quint32)length; + out.writeRawData(_buffer.data().constData() + position, length); +} + +void ReliableChannel::spanAcknowledged(const DatagramSequencer::ChannelSpan& span) { +} + +void ReliableChannel::readData(QDataStream& in) { + quint32 segments; + in >> segments; + for (int i = 0; i < segments; i++) { + quint32 offset, size; + in >> offset >> size; + in.skipRawData(size); + } +} + diff --git a/libraries/metavoxels/src/DatagramSequencer.h b/libraries/metavoxels/src/DatagramSequencer.h index e3c884938c..b53ace2b82 100644 --- a/libraries/metavoxels/src/DatagramSequencer.h +++ b/libraries/metavoxels/src/DatagramSequencer.h @@ -14,6 +14,7 @@ #include #include #include +#include #include "Bitstream.h" @@ -42,9 +43,6 @@ public: /// Returns the packet number of the sent packet at the specified index. int getSentPacketNumber(int index) const { return _sendRecords.at(index).packetNumber; } - /// Sends a normal-priority reliable message. - void sendReliableMessage(const QVariant& data, int channel = 0); - /// Adds a message to the high priority queue. Will be sent with every outgoing packet until received. void sendHighPriorityMessage(const QVariant& data); @@ -96,14 +94,24 @@ signals: private slots: void sendClearSharedObjectMessage(int id); - + private: + friend class ReliableChannel; + + class ChannelSpan { + public: + int channel; + int offset; + int length; + }; + class SendRecord { public: int packetNumber; int lastReceivedPacketNumber; Bitstream::WriteMappings mappings; + QVector spans; }; class ReceiveRecord { @@ -119,11 +127,13 @@ private: void sendRecordAcknowledged(const SendRecord& record); /// Appends some reliable data to the outgoing packet. - void appendReliableData(int bytes); + void appendReliableData(int bytes, QVector& spans); /// Sends a packet to the other party, fragmenting it into multiple datagrams (and emitting /// readyToWrite) as necessary. - void sendPacket(const QByteArray& packet); + void sendPacket(const QByteArray& packet, const QVector& spans); + + void handleHighPriorityMessage(const QVariant& data); QList _sendRecords; QList _receiveRecords; @@ -163,12 +173,16 @@ class ReliableChannel : public QObject { public: + int getIndex() const { return _index; } + QDataStream& getDataStream() { return _dataStream; } Bitstream& getBitstream() { return _bitstream; } void setPriority(float priority) { _priority = priority; } float getPriority() const { return _priority; } + int getBytesAvailable() const; + void sendMessage(const QVariant& message); private slots: @@ -179,14 +193,30 @@ private: friend class DatagramSequencer; - ReliableChannel(DatagramSequencer* sequencer); + class RemainingSpan { + public: + int unacknowledged; + int acknowledged; + }; + + ReliableChannel(DatagramSequencer* sequencer, int index); + + void writeData(QDataStream& out, int bytes, QVector& spans); + void writeSpan(QDataStream& out, int position, int length, QVector& spans); + + void spanAcknowledged(const DatagramSequencer::ChannelSpan& span); void readData(QDataStream& in); + int _index; QBuffer _buffer; QDataStream _dataStream; Bitstream _bitstream; float _priority; + + int _offset; + int _sent; + QList _remainingSpans; }; #endif /* defined(__interface__DatagramSequencer__) */