Bring reliable delta streaming code over from test.

This commit is contained in:
Andrzej Kapolka 2014-07-07 11:08:43 -07:00
parent da32c6e89b
commit e15f003639
6 changed files with 91 additions and 18 deletions

View file

@ -91,24 +91,56 @@ void MetavoxelServer::sendDeltas() {
MetavoxelSession::MetavoxelSession(const SharedNodePointer& node, MetavoxelServer* server) : MetavoxelSession::MetavoxelSession(const SharedNodePointer& node, MetavoxelServer* server) :
Endpoint(node, new PacketRecord(), NULL), Endpoint(node, new PacketRecord(), NULL),
_server(server) { _server(server),
_reliableDeltaChannel(NULL) {
connect(&_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)), SLOT(handleMessage(const QVariant&))); connect(&_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)), SLOT(handleMessage(const QVariant&)));
connect(&_sequencer, SIGNAL(sendAcknowledged(int)), SLOT(checkReliableDeltaReceived()));
connect(_sequencer.getReliableInputChannel(), SIGNAL(receivedMessage(const QVariant&, Bitstream&)), connect(_sequencer.getReliableInputChannel(), SIGNAL(receivedMessage(const QVariant&, Bitstream&)),
SLOT(handleMessage(const QVariant&, Bitstream&))); SLOT(handleMessage(const QVariant&, Bitstream&)));
} }
void MetavoxelSession::update() { void MetavoxelSession::update() {
// wait until we have a valid lod // wait until we have a valid lod before sending
if (_lod.isValid()) { if (!_lod.isValid()) {
Endpoint::update(); return;
} }
} // if we're sending a reliable delta, wait until it's acknowledged
if (_reliableDeltaChannel) {
void MetavoxelSession::writeUpdateMessage(Bitstream& out) { Bitstream& out = _sequencer.startPacket();
out << QVariant::fromValue(MetavoxelDeltaPendingMessage());
_sequencer.endPacket();
return;
}
Bitstream& out = _sequencer.startPacket();
out << QVariant::fromValue(MetavoxelDeltaMessage()); out << QVariant::fromValue(MetavoxelDeltaMessage());
PacketRecord* sendRecord = getLastAcknowledgedSendRecord(); PacketRecord* sendRecord = getLastAcknowledgedSendRecord();
_server->getData().writeDelta(sendRecord->getData(), sendRecord->getLOD(), out, _lod); out.setBytesRemaining(_sequencer.getMaxPacketSize());
try {
_server->getData().writeDelta(sendRecord->getData(), sendRecord->getLOD(), out, _lod);
_sequencer.endPacket();
} catch (const ByteLimitExceededException& exception) {
_sequencer.cancelPacket();
// we need to send the delta on the reliable channel
_reliableDeltaChannel = _sequencer.getReliableOutputChannel(RELIABLE_DELTA_CHANNEL_INDEX);
_reliableDeltaChannel->getBitstream().copyPersistentMappings(_sequencer.getOutputStream());
_reliableDeltaChannel->startMessage();
_reliableDeltaChannel->getBitstream() << QVariant::fromValue(MetavoxelDeltaMessage());
_server->getData().writeDelta(sendRecord->getData(), sendRecord->getLOD(), _reliableDeltaChannel->getBitstream(), _lod);
_reliableDeltaWriteMappings = _reliableDeltaChannel->getBitstream().getAndResetWriteMappings();
_reliableDeltaChannel->getBitstream().clearPersistentMappings();
_reliableDeltaChannel->endMessage();
_reliableDeltaReceivedOffset = _reliableDeltaChannel->getBytesWritten();
_reliableDeltaData = _server->getData();
_reliableDeltaLOD = _lod;
Bitstream& out = _sequencer.startPacket();
out << QVariant::fromValue(MetavoxelDeltaPendingMessage());
_sequencer.endPacket();
}
} }
void MetavoxelSession::handleMessage(const QVariant& message, Bitstream& in) { void MetavoxelSession::handleMessage(const QVariant& message, Bitstream& in) {
@ -116,7 +148,8 @@ void MetavoxelSession::handleMessage(const QVariant& message, Bitstream& in) {
} }
PacketRecord* MetavoxelSession::maybeCreateSendRecord() const { PacketRecord* MetavoxelSession::maybeCreateSendRecord() const {
return new PacketRecord(_lod, _server->getData()); return _reliableDeltaChannel ? new PacketRecord(_reliableDeltaLOD, _reliableDeltaData) :
new PacketRecord(_lod, _server->getData());
} }
void MetavoxelSession::handleMessage(const QVariant& message) { void MetavoxelSession::handleMessage(const QVariant& message) {
@ -134,3 +167,13 @@ void MetavoxelSession::handleMessage(const QVariant& message) {
} }
} }
} }
void MetavoxelSession::checkReliableDeltaReceived() {
if (!_reliableDeltaChannel || _reliableDeltaChannel->getOffset() < _reliableDeltaReceivedOffset) {
return;
}
_sequencer.getOutputStream().persistWriteMappings(_reliableDeltaWriteMappings);
_reliableDeltaWriteMappings = Bitstream::WriteMappings();
_reliableDeltaData = MetavoxelData();
_reliableDeltaChannel = NULL;
}

View file

@ -63,7 +63,6 @@ public:
protected: protected:
virtual void writeUpdateMessage(Bitstream& out);
virtual void handleMessage(const QVariant& message, Bitstream& in); virtual void handleMessage(const QVariant& message, Bitstream& in);
virtual PacketRecord* maybeCreateSendRecord() const; virtual PacketRecord* maybeCreateSendRecord() const;
@ -71,12 +70,19 @@ protected:
private slots: private slots:
void handleMessage(const QVariant& message); void handleMessage(const QVariant& message);
void checkReliableDeltaReceived();
private: private:
MetavoxelServer* _server; MetavoxelServer* _server;
MetavoxelLOD _lod; MetavoxelLOD _lod;
ReliableChannel* _reliableDeltaChannel;
int _reliableDeltaReceivedOffset;
MetavoxelData _reliableDeltaData;
MetavoxelLOD _reliableDeltaLOD;
Bitstream::WriteMappings _reliableDeltaWriteMappings;
}; };
#endif // hifi_MetavoxelServer_h #endif // hifi_MetavoxelServer_h

View file

@ -24,6 +24,9 @@ class Endpoint : public NodeData {
Q_OBJECT Q_OBJECT
public: public:
/// The index of the input/output channel used to transmit reliable deltas.
static const int RELIABLE_DELTA_CHANNEL_INDEX = 1;
Endpoint(const SharedNodePointer& node, PacketRecord* baselineSendRecord = NULL, Endpoint(const SharedNodePointer& node, PacketRecord* baselineSendRecord = NULL,
PacketRecord* baselineReceiveRecord = NULL); PacketRecord* baselineReceiveRecord = NULL);

View file

@ -86,7 +86,11 @@ void MetavoxelClientManager::updateClient(MetavoxelClient* client) {
MetavoxelClient::MetavoxelClient(const SharedNodePointer& node, MetavoxelClientManager* manager) : MetavoxelClient::MetavoxelClient(const SharedNodePointer& node, MetavoxelClientManager* manager) :
Endpoint(node, new PacketRecord(), new PacketRecord()), Endpoint(node, new PacketRecord(), new PacketRecord()),
_manager(manager) { _manager(manager),
_reliableDeltaChannel(NULL) {
connect(_sequencer.getReliableInputChannel(RELIABLE_DELTA_CHANNEL_INDEX),
SIGNAL(receivedMessage(const QVariant&, Bitstream&)), SLOT(handleMessage(const QVariant&, Bitstream&)));
} }
void MetavoxelClient::guide(MetavoxelVisitor& visitor) { void MetavoxelClient::guide(MetavoxelVisitor& visitor) {
@ -124,19 +128,34 @@ void MetavoxelClient::readMessage(Bitstream& in) {
} }
void MetavoxelClient::handleMessage(const QVariant& message, Bitstream& in) { void MetavoxelClient::handleMessage(const QVariant& message, Bitstream& in) {
if (message.userType() == MetavoxelDeltaMessage::Type) { int userType = message.userType();
if (userType == MetavoxelDeltaMessage::Type) {
PacketRecord* receiveRecord = getLastAcknowledgedReceiveRecord(); PacketRecord* receiveRecord = getLastAcknowledgedReceiveRecord();
_data.readDelta(receiveRecord->getData(), receiveRecord->getLOD(), in, getLastAcknowledgedSendRecord()->getLOD()); if (_reliableDeltaChannel) {
_data.readDelta(receiveRecord->getData(), receiveRecord->getLOD(), in, _dataLOD = _reliableDeltaLOD);
_sequencer.getInputStream().persistReadMappings(in.getAndResetReadMappings());
in.clearPersistentMappings();
_reliableDeltaChannel = NULL;
} else {
_data.readDelta(receiveRecord->getData(), receiveRecord->getLOD(), in,
_dataLOD = getLastAcknowledgedSendRecord()->getLOD());
}
} else if (userType == MetavoxelDeltaPendingMessage::Type) {
if (!_reliableDeltaChannel) {
_reliableDeltaChannel = _sequencer.getReliableInputChannel(RELIABLE_DELTA_CHANNEL_INDEX);
_reliableDeltaChannel->getBitstream().copyPersistentMappings(_sequencer.getInputStream());
_reliableDeltaLOD = getLastAcknowledgedSendRecord()->getLOD();
}
} else { } else {
Endpoint::handleMessage(message, in); Endpoint::handleMessage(message, in);
} }
} }
PacketRecord* MetavoxelClient::maybeCreateSendRecord() const { PacketRecord* MetavoxelClient::maybeCreateSendRecord() const {
return new PacketRecord(_manager->getLOD()); return new PacketRecord(_reliableDeltaChannel ? _reliableDeltaLOD : _manager->getLOD());
} }
PacketRecord* MetavoxelClient::maybeCreateReceiveRecord() const { PacketRecord* MetavoxelClient::maybeCreateReceiveRecord() const {
return new PacketRecord(getLastAcknowledgedSendRecord()->getLOD(), _data); return new PacketRecord(_dataLOD, _data);
} }

View file

@ -70,6 +70,10 @@ private:
MetavoxelClientManager* _manager; MetavoxelClientManager* _manager;
MetavoxelData _data; MetavoxelData _data;
MetavoxelLOD _dataLOD;
ReliableChannel* _reliableDeltaChannel;
MetavoxelLOD _reliableDeltaLOD;
}; };
#endif // hifi_MetavoxelClientManager_h #endif // hifi_MetavoxelClientManager_h

View file

@ -642,8 +642,6 @@ TestReceiveRecord::TestReceiveRecord(const MetavoxelLOD& lod,
_remoteState(remoteState) { _remoteState(remoteState) {
} }
const int RELIABLE_DELTA_CHANNEL_INDEX = 1;
TestEndpoint::TestEndpoint(Mode mode) : TestEndpoint::TestEndpoint(Mode mode) :
Endpoint(SharedNodePointer(), new TestSendRecord(), new TestReceiveRecord()), Endpoint(SharedNodePointer(), new TestSendRecord(), new TestReceiveRecord()),
_mode(mode), _mode(mode),