Merge pull request #3141 from ey6es/metavoxels

Fixes, congestion control for metavoxel streaming.
This commit is contained in:
Philip Rosedale 2014-07-08 21:30:13 -07:00
commit 32eb51a224
10 changed files with 72 additions and 23 deletions

View file

@ -87,13 +87,14 @@ void MetavoxelServer::sendDeltas() {
int elapsed = now - _lastSend; int elapsed = now - _lastSend;
_lastSend = now; _lastSend = now;
_sendTimer.start(qMax(0, 2 * SEND_INTERVAL - elapsed)); _sendTimer.start(qMax(0, 2 * SEND_INTERVAL - qMax(elapsed, SEND_INTERVAL)));
} }
MetavoxelSession::MetavoxelSession(const SharedNodePointer& node, MetavoxelServer* server) : MetavoxelSession::MetavoxelSession(const SharedNodePointer& node, MetavoxelServer* server) :
Endpoint(node, new PacketRecord(), NULL), Endpoint(node, new PacketRecord(), NULL),
_server(server), _server(server),
_reliableDeltaChannel(NULL) { _reliableDeltaChannel(NULL),
_reliableDeltaID(0) {
connect(&_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)), SLOT(handleMessage(const QVariant&))); connect(&_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)), SLOT(handleMessage(const QVariant&)));
connect(&_sequencer, SIGNAL(sendAcknowledged(int)), SLOT(checkReliableDeltaReceived())); connect(&_sequencer, SIGNAL(sendAcknowledged(int)), SLOT(checkReliableDeltaReceived()));
@ -108,9 +109,7 @@ void MetavoxelSession::update() {
} }
// if we're sending a reliable delta, wait until it's acknowledged // if we're sending a reliable delta, wait until it's acknowledged
if (_reliableDeltaChannel) { if (_reliableDeltaChannel) {
Bitstream& out = _sequencer.startPacket(); sendPacketGroup();
out << QVariant::fromValue(MetavoxelDeltaPendingMessage());
_sequencer.endPacket();
return; return;
} }
Bitstream& out = _sequencer.startPacket(); Bitstream& out = _sequencer.startPacket();
@ -134,12 +133,16 @@ void MetavoxelSession::update() {
// go back to the beginning with the current packet and note that there's a delta pending // go back to the beginning with the current packet and note that there's a delta pending
_sequencer.getOutputStream().getUnderlying().device()->seek(start); _sequencer.getOutputStream().getUnderlying().device()->seek(start);
out << QVariant::fromValue(MetavoxelDeltaPendingMessage()); MetavoxelDeltaPendingMessage msg = { ++_reliableDeltaID };
out << QVariant::fromValue(msg);
_sequencer.endPacket(); _sequencer.endPacket();
} else { } else {
_sequencer.endPacket(); _sequencer.endPacket();
} }
// perhaps send additional packets to fill out the group
sendPacketGroup(1);
} }
void MetavoxelSession::handleMessage(const QVariant& message, Bitstream& in) { void MetavoxelSession::handleMessage(const QVariant& message, Bitstream& in) {
@ -176,3 +179,17 @@ void MetavoxelSession::checkReliableDeltaReceived() {
_reliableDeltaData = MetavoxelData(); _reliableDeltaData = MetavoxelData();
_reliableDeltaChannel = NULL; _reliableDeltaChannel = NULL;
} }
void MetavoxelSession::sendPacketGroup(int alreadySent) {
int additionalPackets = _sequencer.notePacketGroup() - alreadySent;
for (int i = 0; i < additionalPackets; i++) {
Bitstream& out = _sequencer.startPacket();
if (_reliableDeltaChannel) {
MetavoxelDeltaPendingMessage msg = { _reliableDeltaID };
out << QVariant::fromValue(msg);
} else {
out << QVariant();
}
_sequencer.endPacket();
}
}

View file

@ -74,6 +74,8 @@ private slots:
private: private:
void sendPacketGroup(int alreadySent = 0);
MetavoxelServer* _server; MetavoxelServer* _server;
MetavoxelLOD _lod; MetavoxelLOD _lod;
@ -83,6 +85,7 @@ private:
MetavoxelData _reliableDeltaData; MetavoxelData _reliableDeltaData;
MetavoxelLOD _reliableDeltaLOD; MetavoxelLOD _reliableDeltaLOD;
Bitstream::WriteMappings _reliableDeltaWriteMappings; Bitstream::WriteMappings _reliableDeltaWriteMappings;
int _reliableDeltaID;
}; };
#endif // hifi_MetavoxelServer_h #endif // hifi_MetavoxelServer_h

View file

@ -79,7 +79,7 @@ ReliableChannel* DatagramSequencer::getReliableInputChannel(int index) {
return channel; return channel;
} }
int DatagramSequencer::startPacketGroup(int desiredPackets) { int DatagramSequencer::notePacketGroup(int desiredPackets) {
// figure out how much data we have enqueued and increase the number of packets desired // figure out how much data we have enqueued and increase the number of packets desired
int totalAvailable = 0; int totalAvailable = 0;
foreach (ReliableChannel* channel, _reliableOutputChannels) { foreach (ReliableChannel* channel, _reliableOutputChannels) {

View file

@ -108,10 +108,10 @@ public:
/// Returns the intput channel at the specified index, creating it if necessary. /// Returns the intput channel at the specified index, creating it if necessary.
ReliableChannel* getReliableInputChannel(int index = 0); ReliableChannel* getReliableInputChannel(int index = 0);
/// Starts a packet group. /// Notes that we're sending a group of packets.
/// \param desiredPackets the number of packets we'd like to write in the group /// \param desiredPackets the number of packets we'd like to write in the group
/// \return the number of packets to write in the group /// \return the number of packets to write in the group
int startPacketGroup(int desiredPackets = 1); int notePacketGroup(int desiredPackets = 1);
/// Starts a new packet for transmission. /// Starts a new packet for transmission.
/// \return a reference to the Bitstream to use for writing to the packet /// \return a reference to the Bitstream to use for writing to the packet

View file

@ -39,9 +39,12 @@ Endpoint::~Endpoint() {
} }
void Endpoint::update() { void Endpoint::update() {
Bitstream& out = _sequencer.startPacket(); int packetsToSend = _sequencer.notePacketGroup();
writeUpdateMessage(out); for (int i = 0; i < packetsToSend; i++) {
_sequencer.endPacket(); Bitstream& out = _sequencer.startPacket();
writeUpdateMessage(out);
_sequencer.endPacket();
}
} }
int Endpoint::parseData(const QByteArray& packet) { int Endpoint::parseData(const QByteArray& packet) {

View file

@ -87,7 +87,8 @@ void MetavoxelClientManager::updateClient(MetavoxelClient* client) {
MetavoxelClient::MetavoxelClient(const SharedNodePointer& node, MetavoxelClientManager* manager) : MetavoxelClient::MetavoxelClient(const SharedNodePointer& node, MetavoxelClientManager* manager) :
Endpoint(node, new PacketRecord(), new PacketRecord()), Endpoint(node, new PacketRecord(), new PacketRecord()),
_manager(manager), _manager(manager),
_reliableDeltaChannel(NULL) { _reliableDeltaChannel(NULL),
_reliableDeltaID(0) {
connect(_sequencer.getReliableInputChannel(RELIABLE_DELTA_CHANNEL_INDEX), connect(_sequencer.getReliableInputChannel(RELIABLE_DELTA_CHANNEL_INDEX),
SIGNAL(receivedMessage(const QVariant&, Bitstream&)), SLOT(handleMessage(const QVariant&, Bitstream&))); SIGNAL(receivedMessage(const QVariant&, Bitstream&)), SLOT(handleMessage(const QVariant&, Bitstream&)));
@ -139,10 +140,16 @@ void MetavoxelClient::handleMessage(const QVariant& message, Bitstream& in) {
} }
} }
} else if (userType == MetavoxelDeltaPendingMessage::Type) { } else if (userType == MetavoxelDeltaPendingMessage::Type) {
if (!_reliableDeltaChannel) { // check the id to make sure this is not a delta we've already processed
int id = message.value<MetavoxelDeltaPendingMessage>().id;
if (id > _reliableDeltaID) {
_reliableDeltaID = id;
_reliableDeltaChannel = _sequencer.getReliableInputChannel(RELIABLE_DELTA_CHANNEL_INDEX); _reliableDeltaChannel = _sequencer.getReliableInputChannel(RELIABLE_DELTA_CHANNEL_INDEX);
_reliableDeltaChannel->getBitstream().copyPersistentMappings(_sequencer.getInputStream()); _reliableDeltaChannel->getBitstream().copyPersistentMappings(_sequencer.getInputStream());
_reliableDeltaLOD = getLastAcknowledgedSendRecord()->getLOD(); _reliableDeltaLOD = getLastAcknowledgedSendRecord()->getLOD();
PacketRecord* receiveRecord = getLastAcknowledgedReceiveRecord();
_remoteDataLOD = receiveRecord->getLOD();
_remoteData = receiveRecord->getData();
} }
} else { } else {
Endpoint::handleMessage(message, in); Endpoint::handleMessage(message, in);

View file

@ -74,6 +74,7 @@ private:
ReliableChannel* _reliableDeltaChannel; ReliableChannel* _reliableDeltaChannel;
MetavoxelLOD _reliableDeltaLOD; MetavoxelLOD _reliableDeltaLOD;
int _reliableDeltaID;
}; };
#endif // hifi_MetavoxelClientManager_h #endif // hifi_MetavoxelClientManager_h

View file

@ -64,6 +64,10 @@ DECLARE_STREAMABLE_METATYPE(MetavoxelDeltaMessage)
/// A message indicating that metavoxel delta information is being sent on a reliable channel. /// A message indicating that metavoxel delta information is being sent on a reliable channel.
class MetavoxelDeltaPendingMessage { class MetavoxelDeltaPendingMessage {
STREAMABLE STREAMABLE
public:
STREAM int id;
}; };
DECLARE_STREAMABLE_METATYPE(MetavoxelDeltaPendingMessage) DECLARE_STREAMABLE_METATYPE(MetavoxelDeltaPendingMessage)

View file

@ -647,7 +647,8 @@ TestEndpoint::TestEndpoint(Mode mode) :
_mode(mode), _mode(mode),
_highPriorityMessagesToSend(0.0f), _highPriorityMessagesToSend(0.0f),
_reliableMessagesToSend(0.0f), _reliableMessagesToSend(0.0f),
_reliableDeltaChannel(NULL) { _reliableDeltaChannel(NULL),
_reliableDeltaID(0) {
connect(&_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)), connect(&_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)),
SLOT(handleHighPriorityMessage(const QVariant&))); SLOT(handleHighPriorityMessage(const QVariant&)));
@ -858,7 +859,7 @@ bool TestEndpoint::simulate(int iterationNumber) {
bytesReceived += datagram.size(); bytesReceived += datagram.size();
_remainingPipelineCapacity += datagram.size(); _remainingPipelineCapacity += datagram.size();
} }
int packetCount = _sequencer.startPacketGroup(); int packetCount = _sequencer.notePacketGroup();
groupsSent++; groupsSent++;
maxPacketsPerGroup = qMax(maxPacketsPerGroup, packetCount); maxPacketsPerGroup = qMax(maxPacketsPerGroup, packetCount);
for (int i = 0; i < packetCount; i++) { for (int i = 0; i < packetCount; i++) {
@ -908,7 +909,8 @@ bool TestEndpoint::simulate(int iterationNumber) {
// if we're sending a reliable delta, wait until it's acknowledged // if we're sending a reliable delta, wait until it's acknowledged
if (_reliableDeltaChannel) { if (_reliableDeltaChannel) {
Bitstream& out = _sequencer.startPacket(); Bitstream& out = _sequencer.startPacket();
out << QVariant::fromValue(MetavoxelDeltaPendingMessage()); MetavoxelDeltaPendingMessage msg = { _reliableDeltaID };
out << QVariant::fromValue(msg);
_sequencer.endPacket(); _sequencer.endPacket();
return false; return false;
} }
@ -932,7 +934,8 @@ bool TestEndpoint::simulate(int iterationNumber) {
_reliableDeltaLOD = _lod; _reliableDeltaLOD = _lod;
_sequencer.getOutputStream().getUnderlying().device()->seek(start); _sequencer.getOutputStream().getUnderlying().device()->seek(start);
out << QVariant::fromValue(MetavoxelDeltaPendingMessage()); MetavoxelDeltaPendingMessage msg = { ++_reliableDeltaID };
out << QVariant::fromValue(msg);
_sequencer.endPacket(); _sequencer.endPacket();
} else { } else {
@ -1081,15 +1084,22 @@ void TestEndpoint::handleMessage(const QVariant& message, Bitstream& in) {
} else if (userType == MetavoxelDeltaMessage::Type) { } else if (userType == MetavoxelDeltaMessage::Type) {
PacketRecord* receiveRecord = getLastAcknowledgedReceiveRecord(); PacketRecord* receiveRecord = getLastAcknowledgedReceiveRecord();
_data.readDelta(receiveRecord->getData(), receiveRecord->getLOD(), in, _remoteData.readDelta(receiveRecord->getData(), receiveRecord->getLOD(), in,
_dataLOD = getLastAcknowledgedSendRecord()->getLOD()); _remoteDataLOD = getLastAcknowledgedSendRecord()->getLOD());
in.reset();
_data = _remoteData;
compareMetavoxelData(); compareMetavoxelData();
} else if (userType == MetavoxelDeltaPendingMessage::Type) { } else if (userType == MetavoxelDeltaPendingMessage::Type) {
if (!_reliableDeltaChannel) { int id = message.value<MetavoxelDeltaPendingMessage>().id;
if (id > _reliableDeltaID) {
_reliableDeltaID = id;
_reliableDeltaChannel = _sequencer.getReliableInputChannel(RELIABLE_DELTA_CHANNEL_INDEX); _reliableDeltaChannel = _sequencer.getReliableInputChannel(RELIABLE_DELTA_CHANNEL_INDEX);
_reliableDeltaChannel->getBitstream().copyPersistentMappings(_sequencer.getInputStream()); _reliableDeltaChannel->getBitstream().copyPersistentMappings(_sequencer.getInputStream());
_reliableDeltaLOD = getLastAcknowledgedSendRecord()->getLOD(); _reliableDeltaLOD = getLastAcknowledgedSendRecord()->getLOD();
PacketRecord* receiveRecord = getLastAcknowledgedReceiveRecord();
_remoteDataLOD = receiveRecord->getLOD();
_remoteData = receiveRecord->getData();
} }
} else if (userType == QMetaType::QVariantList) { } else if (userType == QMetaType::QVariantList) {
foreach (const QVariant& element, message.toList()) { foreach (const QVariant& element, message.toList()) {
@ -1107,7 +1117,7 @@ PacketRecord* TestEndpoint::maybeCreateSendRecord() const {
} }
PacketRecord* TestEndpoint::maybeCreateReceiveRecord() const { PacketRecord* TestEndpoint::maybeCreateReceiveRecord() const {
return new TestReceiveRecord(_dataLOD, (_mode == METAVOXEL_SERVER_MODE) ? MetavoxelData() : _data, _remoteState); return new TestReceiveRecord(_remoteDataLOD, _remoteData, _remoteState);
} }
void TestEndpoint::handleHighPriorityMessage(const QVariant& message) { void TestEndpoint::handleHighPriorityMessage(const QVariant& message) {
@ -1127,9 +1137,10 @@ void TestEndpoint::handleHighPriorityMessage(const QVariant& message) {
void TestEndpoint::handleReliableMessage(const QVariant& message, Bitstream& in) { void TestEndpoint::handleReliableMessage(const QVariant& message, Bitstream& in) {
if (message.userType() == MetavoxelDeltaMessage::Type) { if (message.userType() == MetavoxelDeltaMessage::Type) {
PacketRecord* receiveRecord = getLastAcknowledgedReceiveRecord(); PacketRecord* receiveRecord = getLastAcknowledgedReceiveRecord();
_data.readDelta(receiveRecord->getData(), receiveRecord->getLOD(), in, _dataLOD = _reliableDeltaLOD); _remoteData.readDelta(receiveRecord->getData(), receiveRecord->getLOD(), in, _remoteDataLOD = _reliableDeltaLOD);
_sequencer.getInputStream().persistReadMappings(in.getAndResetReadMappings()); _sequencer.getInputStream().persistReadMappings(in.getAndResetReadMappings());
in.clearPersistentMappings(); in.clearPersistentMappings();
_data = _remoteData;
compareMetavoxelData(); compareMetavoxelData();
_reliableDeltaChannel = NULL; _reliableDeltaChannel = NULL;
return; return;

View file

@ -79,6 +79,8 @@ private:
MetavoxelData _data; MetavoxelData _data;
MetavoxelLOD _dataLOD; MetavoxelLOD _dataLOD;
MetavoxelData _remoteData;
MetavoxelLOD _remoteDataLOD;
MetavoxelLOD _lod; MetavoxelLOD _lod;
SharedObjectPointer _sphere; SharedObjectPointer _sphere;
@ -104,6 +106,7 @@ private:
MetavoxelData _reliableDeltaData; MetavoxelData _reliableDeltaData;
MetavoxelLOD _reliableDeltaLOD; MetavoxelLOD _reliableDeltaLOD;
Bitstream::WriteMappings _reliableDeltaWriteMappings; Bitstream::WriteMappings _reliableDeltaWriteMappings;
int _reliableDeltaID;
}; };
/// A simple shared object. /// A simple shared object.