Merge pull request #1930 from birarda/authentication

groundwork for more secure node communication
This commit is contained in:
AndrewMeadows 2014-02-07 14:44:23 -08:00
commit 64efdb2ffb
32 changed files with 385 additions and 324 deletions

View file

@ -830,7 +830,7 @@ void AnimationServer::readPendingDatagrams() {
receivedPacket.resize(nodeList->getNodeSocket().pendingDatagramSize()); receivedPacket.resize(nodeList->getNodeSocket().pendingDatagramSize());
nodeList->getNodeSocket().readDatagram(receivedPacket.data(), receivedPacket.size(), nodeList->getNodeSocket().readDatagram(receivedPacket.data(), receivedPacket.size(),
nodeSockAddr.getAddressPointer(), nodeSockAddr.getPortPointer()); nodeSockAddr.getAddressPointer(), nodeSockAddr.getPortPointer());
if (packetVersionMatch(receivedPacket)) { if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
if (packetTypeForPacket(receivedPacket) == PacketTypeJurisdiction) { if (packetTypeForPacket(receivedPacket) == PacketTypeJurisdiction) {
int headerBytes = numBytesForPacketHeader(receivedPacket); int headerBytes = numBytesForPacketHeader(receivedPacket);
// PacketType_JURISDICTION, first byte is the node type... // PacketType_JURISDICTION, first byte is the node type...

View file

@ -31,36 +31,43 @@ Agent::Agent(const QByteArray& packet) :
_scriptEngine.getParticlesScriptingInterface()->setPacketSender(&_particleEditSender); _scriptEngine.getParticlesScriptingInterface()->setPacketSender(&_particleEditSender);
} }
void Agent::processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr) { void Agent::readPendingDatagrams() {
PacketType datagramPacketType = packetTypeForPacket(dataByteArray); QByteArray receivedPacket;
if (datagramPacketType == PacketTypeJurisdiction) { HifiSockAddr senderSockAddr;
int headerBytes = numBytesForPacketHeader(dataByteArray); NodeList* nodeList = NodeList::getInstance();
QUuid nodeUUID; while (readAvailableDatagram(receivedPacket, senderSockAddr)) {
SharedNodePointer matchedNode = NodeList::getInstance()->nodeWithUUID(nodeUUID); if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
PacketType datagramPacketType = packetTypeForPacket(receivedPacket);
if (matchedNode) { if (datagramPacketType == PacketTypeJurisdiction) {
// PacketType_JURISDICTION, first byte is the node type... int headerBytes = numBytesForPacketHeader(receivedPacket);
switch (dataByteArray[headerBytes]) {
case NodeType::VoxelServer: SharedNodePointer matchedNode = nodeList->sendingNodeForPacket(receivedPacket);
_scriptEngine.getVoxelsScriptingInterface()->getJurisdictionListener()->queueReceivedPacket(matchedNode,
dataByteArray); if (matchedNode) {
break; // PacketType_JURISDICTION, first byte is the node type...
case NodeType::ParticleServer: switch (receivedPacket[headerBytes]) {
_scriptEngine.getParticlesScriptingInterface()->getJurisdictionListener()->queueReceivedPacket(matchedNode, case NodeType::VoxelServer:
dataByteArray); _scriptEngine.getVoxelsScriptingInterface()->getJurisdictionListener()->queueReceivedPacket(matchedNode,
break; receivedPacket);
break;
case NodeType::ParticleServer:
_scriptEngine.getParticlesScriptingInterface()->getJurisdictionListener()->queueReceivedPacket(matchedNode,
receivedPacket);
break;
}
}
} else if (datagramPacketType == PacketTypeParticleAddResponse) {
// this will keep creatorTokenIDs to IDs mapped correctly
Particle::handleAddParticleResponse(receivedPacket);
// also give our local particle tree a chance to remap any internal locally created particles
_particleTree.handleAddParticleResponse(receivedPacket);
} else {
NodeList::getInstance()->processNodeData(senderSockAddr, receivedPacket);
} }
} }
} else if (datagramPacketType == PacketTypeParticleAddResponse) {
// this will keep creatorTokenIDs to IDs mapped correctly
Particle::handleAddParticleResponse(dataByteArray);
// also give our local particle tree a chance to remap any internal locally created particles
_particleTree.handleAddParticleResponse(dataByteArray);
} else {
NodeList::getInstance()->processNodeData(senderSockAddr, dataByteArray);
} }
} }
@ -100,10 +107,6 @@ void Agent::run() {
connect(silentNodeTimer, SIGNAL(timeout()), nodeList, SLOT(removeSilentNodes())); connect(silentNodeTimer, SIGNAL(timeout()), nodeList, SLOT(removeSilentNodes()));
silentNodeTimer->start(NODE_SILENCE_THRESHOLD_USECS / 1000); silentNodeTimer->start(NODE_SILENCE_THRESHOLD_USECS / 1000);
QTimer* pingNodesTimer = new QTimer(this);
connect(pingNodesTimer, SIGNAL(timeout()), nodeList, SLOT(pingInactiveNodes()));
pingNodesTimer->start(PING_INACTIVE_NODE_INTERVAL_USECS / 1000);
// tell our script engine about our local particle tree // tell our script engine about our local particle tree
_scriptEngine.getParticlesScriptingInterface()->setParticleTree(&_particleTree); _scriptEngine.getParticlesScriptingInterface()->setParticleTree(&_particleTree);

View file

@ -35,7 +35,7 @@ public:
public slots: public slots:
void run(); void run();
void processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr); void readPendingDatagrams();
signals: signals:
void willSendAudioDataCallback(); void willSendAudioDataCallback();
void willSendVisualDataCallback(); void willSendVisualDataCallback();

View file

@ -90,8 +90,7 @@ AssignmentClient::AssignmentClient(int &argc, char **argv) :
timer->start(ASSIGNMENT_REQUEST_INTERVAL_MSECS); timer->start(ASSIGNMENT_REQUEST_INTERVAL_MSECS);
// connect our readPendingDatagrams method to the readyRead() signal of the socket // connect our readPendingDatagrams method to the readyRead() signal of the socket
connect(&nodeList->getNodeSocket(), &QUdpSocket::readyRead, this, &AssignmentClient::readPendingDatagrams, connect(&nodeList->getNodeSocket(), &QUdpSocket::readyRead, this, &AssignmentClient::readPendingDatagrams);
Qt::QueuedConnection);
} }
void AssignmentClient::sendAssignmentRequest() { void AssignmentClient::sendAssignmentRequest() {
@ -111,50 +110,45 @@ void AssignmentClient::readPendingDatagrams() {
nodeList->getNodeSocket().readDatagram(receivedPacket.data(), receivedPacket.size(), nodeList->getNodeSocket().readDatagram(receivedPacket.data(), receivedPacket.size(),
senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer()); senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer());
if (packetVersionMatch(receivedPacket)) { if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
if (_currentAssignment) { if (packetTypeForPacket(receivedPacket) == PacketTypeCreateAssignment) {
// have the threaded current assignment handle this datagram // construct the deployed assignment from the packet data
QMetaObject::invokeMethod(_currentAssignment, "processDatagram", Qt::QueuedConnection, _currentAssignment = AssignmentFactory::unpackAssignment(receivedPacket);
Q_ARG(QByteArray, receivedPacket),
Q_ARG(HifiSockAddr, senderSockAddr));
} else if (packetTypeForPacket(receivedPacket) == PacketTypeCreateAssignment) {
if (_currentAssignment) { if (_currentAssignment) {
qDebug() << "Dropping received assignment since we are currently running one."; qDebug() << "Received an assignment -" << *_currentAssignment;
} else {
// construct the deployed assignment from the packet data
_currentAssignment = AssignmentFactory::unpackAssignment(receivedPacket);
if (_currentAssignment) { // switch our nodelist domain IP and port to whoever sent us the assignment
qDebug() << "Received an assignment -" << *_currentAssignment;
nodeList->setDomainSockAddr(senderSockAddr);
// switch our nodelist domain IP and port to whoever sent us the assignment nodeList->setSessionUUID(_currentAssignment->getUUID());
nodeList->setDomainSockAddr(senderSockAddr); qDebug() << "Destination IP for assignment is" << nodeList->getDomainIP().toString();
nodeList->setSessionUUID(_currentAssignment->getUUID());
// start the deployed assignment
qDebug() << "Destination IP for assignment is" << nodeList->getDomainIP().toString(); QThread* workerThread = new QThread(this);
// start the deployed assignment connect(workerThread, SIGNAL(started()), _currentAssignment, SLOT(run()));
QThread* workerThread = new QThread(this);
connect(_currentAssignment, SIGNAL(finished()), this, SLOT(assignmentCompleted()));
connect(workerThread, SIGNAL(started()), _currentAssignment, SLOT(run())); connect(_currentAssignment, SIGNAL(finished()), workerThread, SLOT(quit()));
connect(_currentAssignment, SIGNAL(finished()), _currentAssignment, SLOT(deleteLater()));
connect(_currentAssignment, SIGNAL(finished()), this, SLOT(assignmentCompleted())); connect(workerThread, SIGNAL(finished()), workerThread, SLOT(deleteLater()));
connect(_currentAssignment, SIGNAL(finished()), workerThread, SLOT(quit()));
connect(_currentAssignment, SIGNAL(finished()), _currentAssignment, SLOT(deleteLater())); _currentAssignment->moveToThread(workerThread);
connect(workerThread, SIGNAL(finished()), workerThread, SLOT(deleteLater()));
// move the NodeList to the thread used for the _current assignment
_currentAssignment->moveToThread(workerThread); nodeList->moveToThread(workerThread);
// move the NodeList to the thread used for the _current assignment // let the assignment handle the incoming datagrams for its duration
nodeList->moveToThread(workerThread); disconnect(&nodeList->getNodeSocket(), 0, this, 0);
connect(&nodeList->getNodeSocket(), &QUdpSocket::readyRead, _currentAssignment,
// Starts an event loop, and emits workerThread->started() &ThreadedAssignment::readPendingDatagrams);
workerThread->start();
} else { // Starts an event loop, and emits workerThread->started()
qDebug() << "Received an assignment that could not be unpacked. Re-requesting."; workerThread->start();
} } else {
qDebug() << "Received an assignment that could not be unpacked. Re-requesting.";
} }
} else { } else {
// have the NodeList attempt to handle it // have the NodeList attempt to handle it
@ -170,10 +164,14 @@ void AssignmentClient::assignmentCompleted() {
qDebug("Assignment finished or never started - waiting for new assignment."); qDebug("Assignment finished or never started - waiting for new assignment.");
_currentAssignment = NULL;
NodeList* nodeList = NodeList::getInstance(); NodeList* nodeList = NodeList::getInstance();
// have us handle incoming NodeList datagrams again
disconnect(&nodeList->getNodeSocket(), 0, _currentAssignment, 0);
connect(&nodeList->getNodeSocket(), &QUdpSocket::readyRead, this, &AssignmentClient::readPendingDatagrams);
_currentAssignment = NULL;
// reset our NodeList by switching back to unassigned and clearing the list // reset our NodeList by switching back to unassigned and clearing the list
nodeList->setOwnerType(NodeType::Unassigned); nodeList->setOwnerType(NodeType::Unassigned);
nodeList->reset(); nodeList->reset();

View file

@ -207,29 +207,25 @@ void AudioMixer::prepareMixForListeningNode(Node* node) {
} }
void AudioMixer::processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr) { void AudioMixer::readPendingDatagrams() {
// pull any new audio data from nodes off of the network stack QByteArray receivedPacket;
PacketType mixerPacketType = packetTypeForPacket(dataByteArray); HifiSockAddr senderSockAddr;
if (mixerPacketType == PacketTypeMicrophoneAudioNoEcho NodeList* nodeList = NodeList::getInstance();
|| mixerPacketType == PacketTypeMicrophoneAudioWithEcho
|| mixerPacketType == PacketTypeInjectAudio) { while (readAvailableDatagram(receivedPacket, senderSockAddr)) {
if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
NodeList* nodeList = NodeList::getInstance(); // pull any new audio data from nodes off of the network stack
PacketType mixerPacketType = packetTypeForPacket(receivedPacket);
SharedNodePointer matchingNode = nodeList->sendingNodeForPacket(dataByteArray); if (mixerPacketType == PacketTypeMicrophoneAudioNoEcho
|| mixerPacketType == PacketTypeMicrophoneAudioWithEcho
if (matchingNode) { || mixerPacketType == PacketTypeInjectAudio) {
nodeList->updateNodeWithData(matchingNode.data(), senderSockAddr, dataByteArray);
nodeList->findNodeAndUpdateWithDataFromPacket(receivedPacket);
if (!matchingNode->getActiveSocket()) { } else {
// we don't have an active socket for this node, but they're talking to us // let processNodeData handle it.
// this means they've heard from us and can reply, let's assume public is active nodeList->processNodeData(senderSockAddr, receivedPacket);
matchingNode->activatePublicSocket();
} }
} }
} else {
// let processNodeData handle it.
NodeList::getInstance()->processNodeData(senderSockAddr, dataByteArray);
} }
} }

View file

@ -25,7 +25,7 @@ public slots:
/// threaded run of assignment /// threaded run of assignment
void run(); void run();
void processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr); void readPendingDatagrams();
private: private:
/// adds one buffer to the mix for a listening node /// adds one buffer to the mix for a listening node
void addBufferToMixForListeningNodeWithBuffer(PositionalAudioRingBuffer* bufferToAdd, void addBufferToMixForListeningNodeWithBuffer(PositionalAudioRingBuffer* bufferToAdd,

View file

@ -136,55 +136,52 @@ void AvatarMixer::nodeKilled(SharedNodePointer killedNode) {
} }
} }
void AvatarMixer::processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr) { void AvatarMixer::readPendingDatagrams() {
QByteArray receivedPacket;
HifiSockAddr senderSockAddr;
NodeList* nodeList = NodeList::getInstance(); NodeList* nodeList = NodeList::getInstance();
switch (packetTypeForPacket(dataByteArray)) { while (readAvailableDatagram(receivedPacket, senderSockAddr)) {
case PacketTypeAvatarData: { if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
switch (packetTypeForPacket(receivedPacket)) {
// add or update the node in our list case PacketTypeAvatarData: {
SharedNodePointer avatarNode = nodeList->sendingNodeForPacket(dataByteArray); nodeList->findNodeAndUpdateWithDataFromPacket(receivedPacket);
break;
if (avatarNode) {
// parse positional data from an node
nodeList->updateNodeWithData(avatarNode.data(), senderSockAddr, dataByteArray);
}
break;
}
case PacketTypeAvatarIdentity: {
// check if we have a matching node in our list
SharedNodePointer avatarNode = nodeList->sendingNodeForPacket(dataByteArray);
if (avatarNode && avatarNode->getLinkedData()) {
AvatarMixerClientData* nodeData = reinterpret_cast<AvatarMixerClientData*>(avatarNode->getLinkedData());
if (nodeData->hasIdentityChangedAfterParsing(dataByteArray)
&& !nodeData->hasSentIdentityBetweenKeyFrames()) {
// this avatar changed their identity in some way and we haven't sent a packet in this keyframe
QByteArray identityPacket = byteArrayWithPopluatedHeader(PacketTypeAvatarIdentity);
QByteArray individualByteArray = nodeData->identityByteArray();
individualByteArray.replace(0, NUM_BYTES_RFC4122_UUID, avatarNode->getUUID().toRfc4122());
identityPacket.append(individualByteArray);
nodeData->setHasSentIdentityBetweenKeyFrames(true);
nodeList->broadcastToNodes(identityPacket, NodeSet() << NodeType::Agent);
} }
case PacketTypeAvatarIdentity: {
// check if we have a matching node in our list
SharedNodePointer avatarNode = nodeList->sendingNodeForPacket(receivedPacket);
if (avatarNode && avatarNode->getLinkedData()) {
AvatarMixerClientData* nodeData = reinterpret_cast<AvatarMixerClientData*>(avatarNode->getLinkedData());
if (nodeData->hasIdentityChangedAfterParsing(receivedPacket)
&& !nodeData->hasSentIdentityBetweenKeyFrames()) {
// this avatar changed their identity in some way and we haven't sent a packet in this keyframe
QByteArray identityPacket = byteArrayWithPopluatedHeader(PacketTypeAvatarIdentity);
QByteArray individualByteArray = nodeData->identityByteArray();
individualByteArray.replace(0, NUM_BYTES_RFC4122_UUID, avatarNode->getUUID().toRfc4122());
identityPacket.append(individualByteArray);
nodeData->setHasSentIdentityBetweenKeyFrames(true);
nodeList->broadcastToNodes(identityPacket, NodeSet() << NodeType::Agent);
}
}
}
case PacketTypeKillAvatar: {
nodeList->processKillNode(receivedPacket);
break;
}
default:
// hand this off to the NodeList
nodeList->processNodeData(senderSockAddr, receivedPacket);
break;
} }
} }
case PacketTypeKillAvatar: {
nodeList->processKillNode(dataByteArray);
break;
}
default:
// hand this off to the NodeList
nodeList->processNodeData(senderSockAddr, dataByteArray);
break;
} }
} }
const qint64 AVATAR_IDENTITY_KEYFRAME_MSECS = 5000; const qint64 AVATAR_IDENTITY_KEYFRAME_MSECS = 5000;

View file

@ -23,7 +23,7 @@ public slots:
void nodeAdded(SharedNodePointer nodeAdded); void nodeAdded(SharedNodePointer nodeAdded);
void nodeKilled(SharedNodePointer killedNode); void nodeKilled(SharedNodePointer killedNode);
void processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr); void readPendingDatagrams();
}; };
#endif /* defined(__hifi__AvatarMixer__) */ #endif /* defined(__hifi__AvatarMixer__) */

View file

@ -41,15 +41,27 @@ void MetavoxelServer::run() {
_sendTimer.start(SEND_INTERVAL); _sendTimer.start(SEND_INTERVAL);
} }
void MetavoxelServer::processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr) { void MetavoxelServer::readPendingDatagrams() {
switch (dataByteArray.at(0)) { QByteArray receivedPacket;
case PacketTypeMetavoxelData: HifiSockAddr senderSockAddr;
processData(dataByteArray, senderSockAddr);
break; NodeList* nodeList = NodeList::getInstance();
default: while (readAvailableDatagram(receivedPacket, senderSockAddr)) {
NodeList::getInstance()->processNodeData(senderSockAddr, dataByteArray); if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
break; switch (packetTypeForPacket(receivedPacket)) {
case PacketTypeMetavoxelData: {
SharedNodePointer matchingNode = nodeList->sendingNodeForPacket(receivedPacket);
if (matchingNode) {
processData(receivedPacket, matchingNode);
}
break;
}
default:
NodeList::getInstance()->processNodeData(senderSockAddr, receivedPacket);
break;
}
}
} }
} }
@ -67,10 +79,10 @@ void MetavoxelServer::sendDeltas() {
_sendTimer.start(qMax(0, 2 * SEND_INTERVAL - elapsed)); _sendTimer.start(qMax(0, 2 * SEND_INTERVAL - elapsed));
} }
void MetavoxelServer::processData(const QByteArray& data, const HifiSockAddr& sender) { void MetavoxelServer::processData(const QByteArray& data, const SharedNodePointer& sendingNode) {
// read the session id // read the session id
int headerPlusIDSize; int headerPlusIDSize;
QUuid sessionID = readSessionID(data, sender, headerPlusIDSize); QUuid sessionID = readSessionID(data, sendingNode, headerPlusIDSize);
if (sessionID.isNull()) { if (sessionID.isNull()) {
return; return;
} }
@ -78,18 +90,19 @@ void MetavoxelServer::processData(const QByteArray& data, const HifiSockAddr& se
// forward to session, creating if necessary // forward to session, creating if necessary
MetavoxelSession*& session = _sessions[sessionID]; MetavoxelSession*& session = _sessions[sessionID];
if (!session) { if (!session) {
session = new MetavoxelSession(this, sessionID, QByteArray::fromRawData(data.constData(), headerPlusIDSize), sender); session = new MetavoxelSession(this, sessionID, QByteArray::fromRawData(data.constData(), headerPlusIDSize),
sendingNode);
} }
session->receivedData(data, sender); session->receivedData(data, sendingNode);
} }
MetavoxelSession::MetavoxelSession(MetavoxelServer* server, const QUuid& sessionId, MetavoxelSession::MetavoxelSession(MetavoxelServer* server, const QUuid& sessionId,
const QByteArray& datagramHeader, const HifiSockAddr& sender) : const QByteArray& datagramHeader, const SharedNodePointer& sendingNode) :
QObject(server), QObject(server),
_server(server), _server(server),
_sessionId(sessionId), _sessionId(sessionId),
_sequencer(datagramHeader), _sequencer(datagramHeader),
_sender(sender) { _sendingNode(sendingNode) {
const int TIMEOUT_INTERVAL = 30 * 1000; const int TIMEOUT_INTERVAL = 30 * 1000;
_timeoutTimer.setInterval(TIMEOUT_INTERVAL); _timeoutTimer.setInterval(TIMEOUT_INTERVAL);
@ -105,15 +118,15 @@ MetavoxelSession::MetavoxelSession(MetavoxelServer* server, const QUuid& session
SendRecord record = { 0 }; SendRecord record = { 0 };
_sendRecords.append(record); _sendRecords.append(record);
qDebug() << "Opened session [sessionId=" << _sessionId << ", sender=" << _sender << "]"; qDebug() << "Opened session [sessionId=" << _sessionId << ", sendingNode=" << sendingNode << "]";
} }
void MetavoxelSession::receivedData(const QByteArray& data, const HifiSockAddr& sender) { void MetavoxelSession::receivedData(const QByteArray& data, const SharedNodePointer& sendingNode) {
// reset the timeout timer // reset the timeout timer
_timeoutTimer.start(); _timeoutTimer.start();
// save the most recent sender // save the most recent sender
_sender = sender; _sendingNode = sendingNode;
// process through sequencer // process through sequencer
_sequencer.receivedDatagram(data); _sequencer.receivedDatagram(data);
@ -131,12 +144,12 @@ void MetavoxelSession::sendDelta() {
} }
void MetavoxelSession::timedOut() { void MetavoxelSession::timedOut() {
qDebug() << "Session timed out [sessionId=" << _sessionId << ", sender=" << _sender << "]"; qDebug() << "Session timed out [sessionId=" << _sessionId << ", sendingNode=" << _sendingNode << "]";
_server->removeSession(_sessionId); _server->removeSession(_sessionId);
} }
void MetavoxelSession::sendData(const QByteArray& data) { void MetavoxelSession::sendData(const QByteArray& data) {
NodeList::getInstance()->getNodeSocket().writeDatagram(data, _sender.getAddress(), _sender.getPort()); NodeList::getInstance()->writeDatagram(data, _sendingNode);
} }
void MetavoxelSession::readPacket(Bitstream& in) { void MetavoxelSession::readPacket(Bitstream& in) {
@ -152,7 +165,7 @@ void MetavoxelSession::clearSendRecordsBefore(int index) {
void MetavoxelSession::handleMessage(const QVariant& message) { void MetavoxelSession::handleMessage(const QVariant& message) {
int userType = message.userType(); int userType = message.userType();
if (userType == CloseSessionMessage::Type) { if (userType == CloseSessionMessage::Type) {
qDebug() << "Session closed [sessionId=" << _sessionId << ", sender=" << _sender << "]"; qDebug() << "Session closed [sessionId=" << _sessionId << ", sendingNode=" << _sendingNode << "]";
_server->removeSession(_sessionId); _server->removeSession(_sessionId);
} else if (userType == ClientStateMessage::Type) { } else if (userType == ClientStateMessage::Type) {

View file

@ -39,7 +39,7 @@ public:
virtual void run(); virtual void run();
virtual void processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr); virtual void readPendingDatagrams();
private slots: private slots:
@ -47,7 +47,7 @@ private slots:
private: private:
void processData(const QByteArray& data, const HifiSockAddr& sender); void processData(const QByteArray& data, const SharedNodePointer& sendingNode);
QTimer _sendTimer; QTimer _sendTimer;
qint64 _lastSend; qint64 _lastSend;
@ -64,9 +64,9 @@ class MetavoxelSession : public QObject {
public: public:
MetavoxelSession(MetavoxelServer* server, const QUuid& sessionId, MetavoxelSession(MetavoxelServer* server, const QUuid& sessionId,
const QByteArray& datagramHeader, const HifiSockAddr& sender); const QByteArray& datagramHeader, const SharedNodePointer& sendingNode);
void receivedData(const QByteArray& data, const HifiSockAddr& sender); void receivedData(const QByteArray& data, const SharedNodePointer& sendingNode);
void sendDelta(); void sendDelta();
@ -96,7 +96,7 @@ private:
QTimer _timeoutTimer; QTimer _timeoutTimer;
DatagramSequencer _sequencer; DatagramSequencer _sequencer;
HifiSockAddr _sender; SharedNodePointer _sendingNode;
glm::vec3 _position; glm::vec3 _position;

View file

@ -458,42 +458,43 @@ void OctreeServer::parsePayload() {
} }
} }
void OctreeServer::processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr) { void OctreeServer::readPendingDatagrams() {
NodeList* nodeList = NodeList::getInstance(); QByteArray receivedPacket;
HifiSockAddr senderSockAddr;
PacketType packetType = packetTypeForPacket(dataByteArray);
SharedNodePointer matchingNode = nodeList->sendingNodeForPacket(dataByteArray); NodeList* nodeList = NodeList::getInstance();
if (packetType == getMyQueryMessageType()) { while (readAvailableDatagram(receivedPacket, senderSockAddr)) {
bool debug = false; if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
if (debug) { PacketType packetType = packetTypeForPacket(receivedPacket);
qDebug() << "Got PacketType_VOXEL_QUERY at" << usecTimestampNow();
} SharedNodePointer matchingNode = nodeList->sendingNodeForPacket(receivedPacket);
// If we got a PacketType_VOXEL_QUERY, then we're talking to an NodeType_t_AVATAR, and we if (packetType == getMyQueryMessageType()) {
// need to make sure we have it in our nodeList. bool debug = false;
if (debug) {
qDebug() << "Got PacketTypeVoxelQuery at" << usecTimestampNow();
if (matchingNode) { }
nodeList->updateNodeWithData(matchingNode.data(), senderSockAddr, dataByteArray);
if (!matchingNode->getActiveSocket()) { // If we got a PacketType_VOXEL_QUERY, then we're talking to an NodeType_t_AVATAR, and we
// we don't have an active socket for this node, but they're talking to us // need to make sure we have it in our nodeList.
// this means they've heard from us and can reply, let's assume public is active if (matchingNode) {
matchingNode->activatePublicSocket(); nodeList->updateNodeWithDataFromPacket(matchingNode, receivedPacket);
}
OctreeQueryNode* nodeData = (OctreeQueryNode*) matchingNode->getLinkedData(); OctreeQueryNode* nodeData = (OctreeQueryNode*) matchingNode->getLinkedData();
if (nodeData && !nodeData->isOctreeSendThreadInitalized()) { if (nodeData && !nodeData->isOctreeSendThreadInitalized()) {
nodeData->initializeOctreeSendThread(this, matchingNode->getUUID()); nodeData->initializeOctreeSendThread(this, matchingNode->getUUID());
}
}
} else if (packetType == PacketTypeJurisdictionRequest) {
_jurisdictionSender->queueReceivedPacket(matchingNode, receivedPacket);
} else if (_octreeInboundPacketProcessor && getOctree()->handlesEditPacketType(packetType)) {
_octreeInboundPacketProcessor->queueReceivedPacket(matchingNode, receivedPacket);
} else {
// let processNodeData handle it.
NodeList::getInstance()->processNodeData(senderSockAddr, receivedPacket);
} }
} }
} else if (packetType == PacketTypeJurisdictionRequest) {
_jurisdictionSender->queueReceivedPacket(matchingNode, dataByteArray);
} else if (_octreeInboundPacketProcessor && getOctree()->handlesEditPacketType(packetType)) {
_octreeInboundPacketProcessor->queueReceivedPacket(matchingNode, dataByteArray);
} else {
// let processNodeData handle it.
NodeList::getInstance()->processNodeData(senderSockAddr, dataByteArray);
} }
} }
@ -654,8 +655,4 @@ void OctreeServer::run() {
QTimer* silentNodeTimer = new QTimer(this); QTimer* silentNodeTimer = new QTimer(this);
connect(silentNodeTimer, SIGNAL(timeout()), nodeList, SLOT(removeSilentNodes())); connect(silentNodeTimer, SIGNAL(timeout()), nodeList, SLOT(removeSilentNodes()));
silentNodeTimer->start(NODE_SILENCE_THRESHOLD_USECS / 1000); silentNodeTimer->start(NODE_SILENCE_THRESHOLD_USECS / 1000);
QTimer* pingNodesTimer = new QTimer(this);
connect(pingNodesTimer, SIGNAL(timeout()), nodeList, SLOT(pingInactiveNodes()));
pingNodesTimer->start(PING_INACTIVE_NODE_INTERVAL_USECS / 1000);
} }

View file

@ -67,7 +67,7 @@ public:
public slots: public slots:
/// runs the voxel server assignment /// runs the voxel server assignment
void run(); void run();
void processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr); void readPendingDatagrams();
protected: protected:
void parsePayload(); void parsePayload();

View file

@ -66,7 +66,7 @@ void DataServer::readPendingDatagrams() {
PacketType requestType = packetTypeForPacket(receivedPacket); PacketType requestType = packetTypeForPacket(receivedPacket);
if ((requestType == PacketTypeDataServerPut || requestType == PacketTypeDataServerGet) && if ((requestType == PacketTypeDataServerPut || requestType == PacketTypeDataServerGet) &&
packetVersionMatch(receivedPacket)) { receivedPacket[numBytesArithmeticCodingFromBuffer(receivedPacket.data())] == versionForPacketType(requestType)) {
QDataStream packetStream(receivedPacket); QDataStream packetStream(receivedPacket);
int numReceivedHeaderBytes = numBytesForPacketHeader(receivedPacket); int numReceivedHeaderBytes = numBytesForPacketHeader(receivedPacket);

View file

@ -236,7 +236,7 @@ void DomainServer::readAvailableDatagrams() {
nodeList->getNodeSocket().readDatagram(receivedPacket.data(), receivedPacket.size(), nodeList->getNodeSocket().readDatagram(receivedPacket.data(), receivedPacket.size(),
senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer()); senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer());
if (packetVersionMatch(receivedPacket)) { if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
PacketType requestType = packetTypeForPacket(receivedPacket); PacketType requestType = packetTypeForPacket(receivedPacket);
if (requestType == PacketTypeDomainListRequest) { if (requestType == PacketTypeDomainListRequest) {

View file

@ -388,7 +388,7 @@ void Audio::handleAudioInput() {
NodeList* nodeList = NodeList::getInstance(); NodeList* nodeList = NodeList::getInstance();
SharedNodePointer audioMixer = nodeList->soloNodeOfType(NodeType::AudioMixer); SharedNodePointer audioMixer = nodeList->soloNodeOfType(NodeType::AudioMixer);
if (audioMixer && nodeList->getNodeActiveSocketOrPing(audioMixer)) { if (audioMixer && audioMixer->getActiveSocket()) {
MyAvatar* interfaceAvatar = Application::getInstance()->getAvatar(); MyAvatar* interfaceAvatar = Application::getInstance()->getAvatar();
glm::vec3 headPosition = interfaceAvatar->getHead().getPosition(); glm::vec3 headPosition = interfaceAvatar->getHead().getPosition();
glm::quat headOrientation = interfaceAvatar->getHead().getOrientation(); glm::quat headOrientation = interfaceAvatar->getHead().getOrientation();

View file

@ -40,7 +40,7 @@ void DatagramProcessor::processDatagrams() {
_packetCount++; _packetCount++;
_byteCount += incomingPacket.size(); _byteCount += incomingPacket.size();
if (packetVersionMatch(incomingPacket)) { if (nodeList->packetVersionAndHashMatch(incomingPacket)) {
// only process this packet if we have a match on the packet version // only process this packet if we have a match on the packet version
switch (packetTypeForPacket(incomingPacket)) { switch (packetTypeForPacket(incomingPacket)) {
case PacketTypeTransmitterData: case PacketTypeTransmitterData:
@ -121,7 +121,7 @@ void DatagramProcessor::processDatagrams() {
DataServerClient::processMessageFromDataServer(incomingPacket); DataServerClient::processMessageFromDataServer(incomingPacket);
break; break;
default: default:
NodeList::getInstance()->processNodeData(senderSockAddr, incomingPacket); nodeList->processNodeData(senderSockAddr, incomingPacket);
break; break;
} }
} }

View file

@ -144,9 +144,9 @@ void MetavoxelSystem::removeClient(const QUuid& uuid) {
delete client; delete client;
} }
void MetavoxelSystem::receivedData(const QByteArray& data, const HifiSockAddr& sender) { void MetavoxelSystem::receivedData(const QByteArray& data, const SharedNodePointer& sendingNode) {
int headerPlusIDSize; int headerPlusIDSize;
QUuid sessionID = readSessionID(data, sender, headerPlusIDSize); QUuid sessionID = readSessionID(data, sendingNode, headerPlusIDSize);
if (sessionID.isNull()) { if (sessionID.isNull()) {
return; return;
} }

View file

@ -52,7 +52,7 @@ private:
Q_INVOKABLE void addClient(const SharedNodePointer& node); Q_INVOKABLE void addClient(const SharedNodePointer& node);
Q_INVOKABLE void removeClient(const QUuid& uuid); Q_INVOKABLE void removeClient(const QUuid& uuid);
Q_INVOKABLE void receivedData(const QByteArray& data, const HifiSockAddr& sender); Q_INVOKABLE void receivedData(const QByteArray& data, const SharedNodePointer& sendingNode);
class Point { class Point {
public: public:

View file

@ -48,7 +48,7 @@ void VoxelPacketProcessor::processPacket(const SharedNodePointer& sendingNode, c
wasStatsPacket = true; wasStatsPacket = true;
if (messageLength > statsMessageLength) { if (messageLength > statsMessageLength) {
mutablePacket = mutablePacket.mid(statsMessageLength); mutablePacket = mutablePacket.mid(statsMessageLength);
if (!packetVersionMatch(packet)) { if (!NodeList::getInstance()->packetVersionAndHashMatch(packet)) {
return; // bail since piggyback data doesn't match our versioning return; // bail since piggyback data doesn't match our versioning
} }
} else { } else {

View file

@ -265,7 +265,7 @@ void VoxelStatsDialog::showOctreeServersOfType(int& serverCount, NodeType_t serv
std::stringstream extraDetails(""); std::stringstream extraDetails("");
std::stringstream linkDetails(""); std::stringstream linkDetails("");
if (nodeList->getNodeActiveSocketOrPing(node)) { if (node->getActiveSocket()) {
serverDetails << "active "; serverDetails << "active ";
} else { } else {
serverDetails << "inactive "; serverDetails << "inactive ";

View file

@ -106,7 +106,7 @@ static QItemEditorCreatorBase* qColorEditorCreator = createQColorEditorCreator()
static QItemEditorCreatorBase* vec3EditorCreator = createVec3EditorCreator(); static QItemEditorCreatorBase* vec3EditorCreator = createVec3EditorCreator();
static QItemEditorCreatorBase* parameterizedURLEditorCreator = createParameterizedURLEditorCreator(); static QItemEditorCreatorBase* parameterizedURLEditorCreator = createParameterizedURLEditorCreator();
QUuid readSessionID(const QByteArray& data, const HifiSockAddr& sender, int& headerPlusIDSize) { QUuid readSessionID(const QByteArray& data, const SharedNodePointer& sendingNode, int& headerPlusIDSize) {
// get the header size // get the header size
int headerSize = numBytesForPacketHeader(data); int headerSize = numBytesForPacketHeader(data);
@ -114,7 +114,7 @@ QUuid readSessionID(const QByteArray& data, const HifiSockAddr& sender, int& hea
const int UUID_BYTES = 16; const int UUID_BYTES = 16;
headerPlusIDSize = headerSize + UUID_BYTES; headerPlusIDSize = headerSize + UUID_BYTES;
if (data.size() < headerPlusIDSize) { if (data.size() < headerPlusIDSize) {
qWarning() << "Metavoxel data too short [size=" << data.size() << ", sender=" << sender << "]\n"; qWarning() << "Metavoxel data too short [size=" << data.size() << ", sendingNode=" << sendingNode << "]\n";
return QUuid(); return QUuid();
} }
return QUuid::fromRfc4122(QByteArray::fromRawData(data.constData() + headerSize, UUID_BYTES)); return QUuid::fromRfc4122(QByteArray::fromRawData(data.constData() + headerSize, UUID_BYTES));

View file

@ -15,6 +15,7 @@
#include <QUuid> #include <QUuid>
#include <QWidget> #include <QWidget>
#include <NodeList.h>
#include <RegisteredMetaTypes.h> #include <RegisteredMetaTypes.h>
#include "Bitstream.h" #include "Bitstream.h"
@ -30,7 +31,7 @@ class NetworkProgram;
/// Reads and returns the session ID from a datagram. /// Reads and returns the session ID from a datagram.
/// \param[out] headerPlusIDSize the size of the header (including the session ID) within the data /// \param[out] headerPlusIDSize the size of the header (including the session ID) within the data
/// \return the session ID, or a null ID if invalid (in which case a warning will be logged) /// \return the session ID, or a null ID if invalid (in which case a warning will be logged)
QUuid readSessionID(const QByteArray& data, const HifiSockAddr& sender, int& headerPlusIDSize); QUuid readSessionID(const QByteArray& data, const SharedNodePointer& sendingNode, int& headerPlusIDSize);
/// A streamable axis-aligned bounding box. /// A streamable axis-aligned bounding box.
class Box { class Box {

View file

@ -45,7 +45,7 @@ bool JurisdictionListener::queueJurisdictionRequest() {
NodeList* nodeList = NodeList::getInstance(); NodeList* nodeList = NodeList::getInstance();
foreach (const SharedNodePointer& node, nodeList->getNodeHash()) { foreach (const SharedNodePointer& node, nodeList->getNodeHash()) {
if (nodeList->getNodeActiveSocketOrPing(node) && node->getType() == getNodeType()) { if (node->getType() == getNodeType() && node->getActiveSocket()) {
_packetSender.queuePacketForSending(node, QByteArray(reinterpret_cast<char*>(bufferOut), sizeOut)); _packetSender.queuePacketForSending(node, QByteArray(reinterpret_cast<char*>(bufferOut), sizeOut));
nodeCount++; nodeCount++;
} }

View file

@ -60,19 +60,17 @@ bool OctreeEditPacketSender::serversExist() const {
foreach (const SharedNodePointer& node, nodeList->getNodeHash()) { foreach (const SharedNodePointer& node, nodeList->getNodeHash()) {
// only send to the NodeTypes that are getMyNodeType() // only send to the NodeTypes that are getMyNodeType()
if (node->getType() == getMyNodeType()) { if (node->getType() == getMyNodeType() && node->getActiveSocket()) {
if (nodeList->getNodeActiveSocketOrPing(node)) { QUuid nodeUUID = node->getUUID();
QUuid nodeUUID = node->getUUID(); // If we've got Jurisdictions set, then check to see if we know the jurisdiction for this server
// If we've got Jurisdictions set, then check to see if we know the jurisdiction for this server if (_serverJurisdictions) {
if (_serverJurisdictions) { // lookup our nodeUUID in the jurisdiction map, if it's missing then we're
// lookup our nodeUUID in the jurisdiction map, if it's missing then we're // missing at least one jurisdiction
// missing at least one jurisdiction if ((*_serverJurisdictions).find(nodeUUID) == (*_serverJurisdictions).end()) {
if ((*_serverJurisdictions).find(nodeUUID) == (*_serverJurisdictions).end()) { atLeastOnJurisdictionMissing = true;
atLeastOnJurisdictionMissing = true;
}
} }
hasServers = true;
} }
hasServers = true;
} }
if (atLeastOnJurisdictionMissing) { if (atLeastOnJurisdictionMissing) {
break; // no point in looking further... break; // no point in looking further...
@ -91,7 +89,7 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned c
// only send to the NodeTypes that are getMyNodeType() // only send to the NodeTypes that are getMyNodeType()
if (node->getType() == getMyNodeType() && if (node->getType() == getMyNodeType() &&
((node->getUUID() == nodeUUID) || (nodeUUID.isNull()))) { ((node->getUUID() == nodeUUID) || (nodeUUID.isNull()))) {
if (nodeList->getNodeActiveSocketOrPing(node)) { if (node->getActiveSocket()) {
queuePacketForSending(node, QByteArray(reinterpret_cast<char*>(buffer), length)); queuePacketForSending(node, QByteArray(reinterpret_cast<char*>(buffer), length));
// debugging output... // debugging output...

View file

@ -21,7 +21,7 @@ quint8 DataServerClient::_sequenceNumber = 0;
const char MULTI_KEY_VALUE_SEPARATOR = '|'; const char MULTI_KEY_VALUE_SEPARATOR = '|';
const char DATA_SERVER_HOSTNAME[] = "data.highfidelity.io"; const char DATA_SERVER_HOSTNAME[] = "localhost";
const unsigned short DATA_SERVER_PORT = 3282; const unsigned short DATA_SERVER_PORT = 3282;
const HifiSockAddr& DataServerClient::dataServerSockAddr() { const HifiSockAddr& DataServerClient::dataServerSockAddr() {

View file

@ -45,8 +45,6 @@ HifiSockAddr::HifiSockAddr(const QString& hostname, quint16 hostOrderPort) {
} }
HifiSockAddr& HifiSockAddr::operator=(const HifiSockAddr& rhsSockAddr) { HifiSockAddr& HifiSockAddr::operator=(const HifiSockAddr& rhsSockAddr) {
//HifiSockAddr temp(rhsSockAddr);
//swap(temp);
_address = rhsSockAddr._address; _address = rhsSockAddr._address;
_port = rhsSockAddr._port; _port = rhsSockAddr._port;

View file

@ -80,30 +80,71 @@ NodeList::~NodeList() {
clear(); clear();
} }
qint64 NodeList::writeDatagram(const QByteArray& datagram, const SharedNodePointer& destinationNode, bool NodeList::packetVersionAndHashMatch(const QByteArray& packet) {
const HifiSockAddr& overridenSockAddr) { // currently this just checks if the version in the packet matches our return from versionForPacketType
// may need to be expanded in the future for types and versions that take > than 1 byte
// setup the MD5 hash for source verification in the header if (packet[1] != versionForPacketType(packetTypeForPacket(packet))
int numBytesPacketHeader = numBytesForPacketHeader(datagram); && packetTypeForPacket(packet) != PacketTypeStunResponse) {
QByteArray dataSecretHash = QCryptographicHash::hash(datagram.mid(numBytesPacketHeader) PacketType mismatchType = packetTypeForPacket(packet);
+ destinationNode->getConnectionSecret().toRfc4122(), int numPacketTypeBytes = arithmeticCodingValueFromBuffer(packet.data());
QCryptographicHash::Md5);
QByteArray datagramWithHash = datagram; qDebug() << "Packet version mismatch on" << packetTypeForPacket(packet) << "- Sender"
datagramWithHash.replace(numBytesPacketHeader - NUM_BYTES_MD5_HASH, NUM_BYTES_MD5_HASH, dataSecretHash); << uuidFromPacketHeader(packet) << "sent" << qPrintable(QString::number(packet[numPacketTypeBytes])) << "but"
<< qPrintable(QString::number(versionForPacketType(mismatchType))) << "expected.";
// if we don't have an ovveriden address, assume they want to send to the node's active socket
const HifiSockAddr* destinationSockAddr = &overridenSockAddr;
if (overridenSockAddr.isNull()) {
if (getNodeActiveSocketOrPing(destinationNode)) {
// use the node's active socket as the destination socket
destinationSockAddr = destinationNode->getActiveSocket();
} else {
// we don't have a socket to send to, return 0
return 0;
}
} }
return _nodeSocket.writeDatagram(datagramWithHash, destinationSockAddr->getAddress(), destinationSockAddr->getPort()); const QSet<PacketType> NON_VERIFIED_PACKETS = QSet<PacketType>() << PacketTypeDomainList
<< PacketTypeDomainListRequest << PacketTypeStunResponse << PacketTypeDataServerConfirm
<< PacketTypeDataServerGet << PacketTypeDataServerPut << PacketTypeDataServerSend
<< PacketTypeCreateAssignment << PacketTypeRequestAssignment;
if (!NON_VERIFIED_PACKETS.contains(packetTypeForPacket(packet))) {
// figure out which node this is from
SharedNodePointer sendingNode = sendingNodeForPacket(packet);
if (sendingNode) {
// check if the md5 hash in the header matches the hash we would expect
if (hashFromPacketHeader(packet) == hashForPacketAndConnectionUUID(packet, sendingNode->getConnectionSecret())) {
return true;
} else {
qDebug() << "Packet hash mismatch on" << packetTypeForPacket(packet) << "- Sender"
<< uuidFromPacketHeader(packet);
}
} else {
qDebug() << "Packet of type" << packetTypeForPacket(packet) << "received from unknown node with UUID"
<< uuidFromPacketHeader(packet);
}
} else {
return true;
}
return false;
}
qint64 NodeList::writeDatagram(const QByteArray& datagram, const SharedNodePointer& destinationNode,
const HifiSockAddr& overridenSockAddr) {
if (destinationNode) {
// if we don't have an ovveriden address, assume they want to send to the node's active socket
const HifiSockAddr* destinationSockAddr = &overridenSockAddr;
if (overridenSockAddr.isNull()) {
if (destinationNode->getActiveSocket()) {
// use the node's active socket as the destination socket
destinationSockAddr = destinationNode->getActiveSocket();
} else {
// we don't have a socket to send to, return 0
return 0;
}
}
QByteArray datagramCopy = datagram;
// setup the MD5 hash for source verification in the header
replaceHashInPacketGivenConnectionUUID(datagramCopy, destinationNode->getConnectionSecret());
return _nodeSocket.writeDatagram(datagramCopy, destinationSockAddr->getAddress(), destinationSockAddr->getPort());
}
// didn't have a destinationNode to send to, return 0
return 0;
} }
qint64 NodeList::writeDatagram(const char* data, qint64 size, const SharedNodePointer& destinationNode, qint64 NodeList::writeDatagram(const char* data, qint64 size, const SharedNodePointer& destinationNode,
@ -191,8 +232,12 @@ void NodeList::processNodeData(const HifiSockAddr& senderSockAddr, const QByteAr
} }
case PacketTypePing: { case PacketTypePing: {
// send back a reply // send back a reply
QByteArray replyPacket = constructPingReplyPacket(packet); SharedNodePointer matchingNode = sendingNodeForPacket(packet);
writeDatagram(replyPacket, sendingNodeForPacket(packet), senderSockAddr); if (matchingNode) {
QByteArray replyPacket = constructPingReplyPacket(packet);
writeDatagram(replyPacket, matchingNode, senderSockAddr);
}
break; break;
} }
case PacketTypePingReply: { case PacketTypePingReply: {
@ -219,32 +264,28 @@ void NodeList::processNodeData(const HifiSockAddr& senderSockAddr, const QByteAr
} }
} }
int NodeList::updateNodeWithData(Node *node, const HifiSockAddr& senderSockAddr, const QByteArray& packet) { int NodeList::updateNodeWithDataFromPacket(const SharedNodePointer& matchingNode, const QByteArray &packet) {
QMutexLocker locker(&node->getMutex()); QMutexLocker locker(&matchingNode->getMutex());
node->setLastHeardMicrostamp(usecTimestampNow()); matchingNode->setLastHeardMicrostamp(usecTimestampNow());
matchingNode->recordBytesReceived(packet.size());
if (!senderSockAddr.isNull() && !node->getActiveSocket()) {
if (senderSockAddr == node->getPublicSocket()) { if (!matchingNode->getLinkedData() && linkedDataCreateCallback) {
node->activatePublicSocket(); linkedDataCreateCallback(matchingNode.data());
} else if (senderSockAddr == node->getLocalSocket()) {
node->activateLocalSocket();
}
} }
return matchingNode->getLinkedData()->parseData(packet);
}
if (node->getActiveSocket() || senderSockAddr.isNull()) { int NodeList::findNodeAndUpdateWithDataFromPacket(const QByteArray& packet) {
node->recordBytesReceived(packet.size()); SharedNodePointer matchingNode = sendingNodeForPacket(packet);
if (!node->getLinkedData() && linkedDataCreateCallback) { if (matchingNode) {
linkedDataCreateCallback(node); updateNodeWithDataFromPacket(matchingNode, packet);
}
int numParsedBytes = node->getLinkedData()->parseData(packet);
return numParsedBytes;
} else {
// we weren't able to match the sender address to the address we have for this node, unlock and don't parse
return 0;
} }
// we weren't able to match the sender address to the address we have for this node, unlock and don't parse
return 0;
} }
SharedNodePointer NodeList::nodeWithUUID(const QUuid& nodeUUID) { SharedNodePointer NodeList::nodeWithUUID(const QUuid& nodeUUID) {
@ -566,6 +607,10 @@ int NodeList::processDomainServerList(const QByteArray& packet) {
packetStream >> connectionUUID; packetStream >> connectionUUID;
node->setConnectionSecret(connectionUUID); node->setConnectionSecret(connectionUUID);
} }
// ping inactive nodes in conjunction with receipt of list from domain-server
// this makes it happen every second and also pings any newly added nodes
pingInactiveNodes();
return readNodes; return readNodes;
} }
@ -639,9 +684,9 @@ SharedNodePointer NodeList::addOrUpdateNode(const QUuid& uuid, char nodeType,
// we didn't have this node, so add them // we didn't have this node, so add them
Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket); Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket);
SharedNodePointer newNodeSharedPointer(newNode, &QObject::deleteLater); SharedNodePointer newNodeSharedPointer(newNode, &QObject::deleteLater);
_nodeHash.insert(newNode->getUUID(), newNodeSharedPointer); _nodeHash.insert(newNode->getUUID(), newNodeSharedPointer);
_nodeHashMutex.unlock(); _nodeHashMutex.unlock();
qDebug() << "Added" << *newNode; qDebug() << "Added" << *newNode;
@ -700,16 +745,6 @@ void NodeList::pingInactiveNodes() {
} }
} }
const HifiSockAddr* NodeList::getNodeActiveSocketOrPing(const SharedNodePointer& node) {
if (node && node->getActiveSocket()) {
return node->getActiveSocket();
} else if (node) {
pingPublicAndLocalSocketsForInactiveNode(node);
}
return NULL;
}
void NodeList::activateSocketFromNodeCommunication(const QByteArray& packet, const SharedNodePointer& sendingNode) { void NodeList::activateSocketFromNodeCommunication(const QByteArray& packet, const SharedNodePointer& sendingNode) {
// deconstruct this ping packet to see if it is a public or local reply // deconstruct this ping packet to see if it is a public or local reply
QDataStream packetStream(packet); QDataStream packetStream(packet);
@ -720,9 +755,9 @@ void NodeList::activateSocketFromNodeCommunication(const QByteArray& packet, con
// if this is a local or public ping then we can activate a socket // if this is a local or public ping then we can activate a socket
// we do nothing with agnostic pings, those are simply for timing // we do nothing with agnostic pings, those are simply for timing
if (pingType == PingType::Local) { if (pingType == PingType::Local && sendingNode->getActiveSocket() != &sendingNode->getLocalSocket()) {
sendingNode->activateLocalSocket(); sendingNode->activateLocalSocket();
} else if (pingType == PingType::Public) { } else if (pingType == PingType::Public && !sendingNode->getActiveSocket()) {
sendingNode->activatePublicSocket(); sendingNode->activatePublicSocket();
} }
} }

View file

@ -82,6 +82,8 @@ public:
QUdpSocket& getNodeSocket() { return _nodeSocket; } QUdpSocket& getNodeSocket() { return _nodeSocket; }
bool packetVersionAndHashMatch(const QByteArray& packet);
qint64 writeDatagram(const QByteArray& datagram, const SharedNodePointer& destinationNode, qint64 writeDatagram(const QByteArray& datagram, const SharedNodePointer& destinationNode,
const HifiSockAddr& overridenSockAddr = HifiSockAddr()); const HifiSockAddr& overridenSockAddr = HifiSockAddr());
qint64 writeDatagram(const char* data, qint64 size, const SharedNodePointer& destinationNode, qint64 writeDatagram(const char* data, qint64 size, const SharedNodePointer& destinationNode,
@ -112,20 +114,20 @@ public:
SharedNodePointer nodeWithUUID(const QUuid& nodeUUID); SharedNodePointer nodeWithUUID(const QUuid& nodeUUID);
SharedNodePointer sendingNodeForPacket(const QByteArray& packet); SharedNodePointer sendingNodeForPacket(const QByteArray& packet);
SharedNodePointer addOrUpdateNode(const QUuid& uuid, char nodeType, const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket); SharedNodePointer addOrUpdateNode(const QUuid& uuid, char nodeType,
const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket);
void processNodeData(const HifiSockAddr& senderSockAddr, const QByteArray& packet); void processNodeData(const HifiSockAddr& senderSockAddr, const QByteArray& packet);
void processKillNode(const QByteArray& datagram); void processKillNode(const QByteArray& datagram);
int updateNodeWithData(Node *node, const HifiSockAddr& senderSockAddr, const QByteArray& packet); int updateNodeWithDataFromPacket(const SharedNodePointer& matchingNode, const QByteArray& packet);
int findNodeAndUpdateWithDataFromPacket(const QByteArray& packet);
unsigned broadcastToNodes(const QByteArray& packet, const NodeSet& destinationNodeTypes); unsigned broadcastToNodes(const QByteArray& packet, const NodeSet& destinationNodeTypes);
SharedNodePointer soloNodeOfType(char nodeType); SharedNodePointer soloNodeOfType(char nodeType);
void loadData(QSettings* settings); void loadData(QSettings* settings);
void saveData(QSettings* settings); void saveData(QSettings* settings);
const HifiSockAddr* getNodeActiveSocketOrPing(const SharedNodePointer& node);
public slots: public slots:
void sendDomainServerCheckIn(); void sendDomainServerCheckIn();
void pingInactiveNodes(); void pingInactiveNodes();

View file

@ -46,6 +46,17 @@ PacketVersion versionForPacketType(PacketType type) {
switch (type) { switch (type) {
case PacketTypeParticleData: case PacketTypeParticleData:
return 1; return 1;
case PacketTypeDomainList:
case PacketTypeDomainListRequest:
return 1;
case PacketTypeCreateAssignment:
case PacketTypeRequestAssignment:
return 1;
case PacketTypeDataServerGet:
case PacketTypeDataServerPut:
case PacketTypeDataServerConfirm:
case PacketTypeDataServerSend:
return 1;
default: default:
return 0; return 0;
} }
@ -85,26 +96,6 @@ int populatePacketHeader(char* packet, PacketType type, const QUuid& connectionU
return position - packet; return position - packet;
} }
bool packetVersionMatch(const QByteArray& packet) {
// currently this just checks if the version in the packet matches our return from versionForPacketType
// may need to be expanded in the future for types and versions that take > than 1 byte
if (packet[1] == versionForPacketType(packetTypeForPacket(packet)) || packetTypeForPacket(packet) == PacketTypeStunResponse) {
return true;
} else {
PacketType mismatchType = packetTypeForPacket(packet);
int numPacketTypeBytes = arithmeticCodingValueFromBuffer(packet.data());
QUuid nodeUUID = uuidFromPacketHeader(packet);
qDebug() << "Packet mismatch on" << packetTypeForPacket(packet) << "- Sender"
<< nodeUUID << "sent" << qPrintable(QString::number(packet[numPacketTypeBytes])) << "but"
<< qPrintable(QString::number(versionForPacketType(mismatchType))) << "expected.";
return false;
}
}
int numBytesForPacketHeader(const QByteArray& packet) { int numBytesForPacketHeader(const QByteArray& packet) {
// returns the number of bytes used for the type, version, and UUID // returns the number of bytes used for the type, version, and UUID
return numBytesArithmeticCodingFromBuffer(packet.data()) + NUM_STATIC_HEADER_BYTES; return numBytesArithmeticCodingFromBuffer(packet.data()) + NUM_STATIC_HEADER_BYTES;
@ -124,6 +115,20 @@ QUuid uuidFromPacketHeader(const QByteArray& packet) {
NUM_BYTES_RFC4122_UUID)); NUM_BYTES_RFC4122_UUID));
} }
QByteArray hashFromPacketHeader(const QByteArray& packet) {
return packet.mid(numBytesForPacketHeader(packet) - NUM_BYTES_MD5_HASH, NUM_BYTES_MD5_HASH);
}
QByteArray hashForPacketAndConnectionUUID(const QByteArray& packet, const QUuid& connectionUUID) {
return QCryptographicHash::hash(packet.mid(numBytesForPacketHeader(packet)) + connectionUUID.toRfc4122(),
QCryptographicHash::Md5);
}
void replaceHashInPacketGivenConnectionUUID(QByteArray& packet, const QUuid& connectionUUID) {
packet.replace(numBytesForPacketHeader(packet) - NUM_BYTES_MD5_HASH, NUM_BYTES_MD5_HASH,
hashForPacketAndConnectionUUID(packet, connectionUUID));
}
PacketType packetTypeForPacket(const QByteArray& packet) { PacketType packetTypeForPacket(const QByteArray& packet) {
return (PacketType) arithmeticCodingValueFromBuffer(packet.data()); return (PacketType) arithmeticCodingValueFromBuffer(packet.data());
} }

View file

@ -70,17 +70,20 @@ QByteArray byteArrayWithPopluatedHeader(PacketType type, const QUuid& connection
int populatePacketHeader(QByteArray& packet, PacketType type, const QUuid& connectionUUID = nullUUID); int populatePacketHeader(QByteArray& packet, PacketType type, const QUuid& connectionUUID = nullUUID);
int populatePacketHeader(char* packet, PacketType type, const QUuid& connectionUUID = nullUUID); int populatePacketHeader(char* packet, PacketType type, const QUuid& connectionUUID = nullUUID);
bool packetVersionMatch(const QByteArray& packet);
int numBytesForPacketHeader(const QByteArray& packet); int numBytesForPacketHeader(const QByteArray& packet);
int numBytesForPacketHeader(const char* packet); int numBytesForPacketHeader(const char* packet);
int numBytesForPacketHeaderGivenPacketType(PacketType type); int numBytesForPacketHeaderGivenPacketType(PacketType type);
QUuid uuidFromPacketHeader(const QByteArray& packet); QUuid uuidFromPacketHeader(const QByteArray& packet);
QByteArray hashFromPacketHeader(const QByteArray& packet);
QByteArray hashForPacketAndConnectionUUID(const QByteArray& packet, const QUuid& connectionUUID);
void replaceHashInPacketGivenConnectionUUID(QByteArray& packet, const QUuid& connectionUUID);
PacketType packetTypeForPacket(const QByteArray& packet); PacketType packetTypeForPacket(const QByteArray& packet);
PacketType packetTypeForPacket(const char* packet); PacketType packetTypeForPacket(const char* packet);
int arithmeticCodingValueFromBuffer(const char* checkValue); int arithmeticCodingValueFromBuffer(const char* checkValue);
int numBytesArithmeticCodingFromBuffer(const char* checkValue);
#endif #endif

View file

@ -60,3 +60,16 @@ void ThreadedAssignment::checkInWithDomainServerOrExit() {
NodeList::getInstance()->sendDomainServerCheckIn(); NodeList::getInstance()->sendDomainServerCheckIn();
} }
} }
bool ThreadedAssignment::readAvailableDatagram(QByteArray& destinationByteArray, HifiSockAddr& senderSockAddr) {
NodeList* nodeList = NodeList::getInstance();
if (nodeList->getNodeSocket().hasPendingDatagrams()) {
destinationByteArray.resize(nodeList->getNodeSocket().pendingDatagramSize());
nodeList->getNodeSocket().readDatagram(destinationByteArray.data(), destinationByteArray.size(),
senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer());
return true;
} else {
return false;
}
}

View file

@ -23,8 +23,10 @@ public slots:
virtual void deleteLater(); virtual void deleteLater();
virtual void processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr) = 0; virtual void readPendingDatagrams() = 0;
protected: protected:
bool readAvailableDatagram(QByteArray& destinationByteArray, HifiSockAddr& senderSockAddr);
void commonInit(const char* targetName, NodeType_t nodeType); void commonInit(const char* targetName, NodeType_t nodeType);
bool _isFinished; bool _isFinished;
private slots: private slots: