add writeDatagram helper to NodeList to insert md5 hash

This commit is contained in:
Stephen Birarda 2014-02-06 14:28:58 -08:00
parent cb6316bf4d
commit a823722d27
27 changed files with 142 additions and 107 deletions

View file

@ -835,7 +835,13 @@ void AnimationServer::readPendingDatagrams() {
int headerBytes = numBytesForPacketHeader(receivedPacket); int headerBytes = numBytesForPacketHeader(receivedPacket);
// PacketType_JURISDICTION, first byte is the node type... // PacketType_JURISDICTION, first byte is the node type...
if (receivedPacket.data()[headerBytes] == NodeType::VoxelServer && ::jurisdictionListener) { if (receivedPacket.data()[headerBytes] == NodeType::VoxelServer && ::jurisdictionListener) {
::jurisdictionListener->queueReceivedPacket(nodeSockAddr, receivedPacket); QUuid nodeUUID;
deconstructPacketHeader(receivedPacket, nodeUUID);
SharedNodePointer matchedNode = NodeList::getInstance()->nodeWithUUID(nodeUUID);
if (matchedNode) {
::jurisdictionListener->queueReceivedPacket(matchedNode, receivedPacket);
}
} }
} }
NodeList::getInstance()->processNodeData(nodeSockAddr, receivedPacket); NodeList::getInstance()->processNodeData(nodeSockAddr, receivedPacket);

View file

@ -35,17 +35,24 @@ void Agent::processDatagram(const QByteArray& dataByteArray, const HifiSockAddr&
PacketType datagramPacketType = packetTypeForPacket(dataByteArray); PacketType datagramPacketType = packetTypeForPacket(dataByteArray);
if (datagramPacketType == PacketTypeJurisdiction) { if (datagramPacketType == PacketTypeJurisdiction) {
int headerBytes = numBytesForPacketHeader(dataByteArray); int headerBytes = numBytesForPacketHeader(dataByteArray);
// PacketType_JURISDICTION, first byte is the node type...
switch (dataByteArray[headerBytes]) { QUuid nodeUUID;
case NodeType::VoxelServer: SharedNodePointer matchedNode = NodeList::getInstance()->nodeWithUUID(nodeUUID);
_scriptEngine.getVoxelsScriptingInterface()->getJurisdictionListener()->queueReceivedPacket(senderSockAddr,
dataByteArray); if (matchedNode) {
break; // PacketType_JURISDICTION, first byte is the node type...
case NodeType::ParticleServer: switch (dataByteArray[headerBytes]) {
_scriptEngine.getParticlesScriptingInterface()->getJurisdictionListener()->queueReceivedPacket(senderSockAddr, case NodeType::VoxelServer:
dataByteArray); _scriptEngine.getVoxelsScriptingInterface()->getJurisdictionListener()->queueReceivedPacket(matchedNode,
break; dataByteArray);
break;
case NodeType::ParticleServer:
_scriptEngine.getParticlesScriptingInterface()->getJurisdictionListener()->queueReceivedPacket(matchedNode,
dataByteArray);
break;
}
} }
} else if (datagramPacketType == PacketTypeParticleAddResponse) { } else if (datagramPacketType == PacketTypeParticleAddResponse) {
// this will keep creatorTokenIDs to IDs mapped correctly // this will keep creatorTokenIDs to IDs mapped correctly
Particle::handleAddParticleResponse(dataByteArray); Particle::handleAddParticleResponse(dataByteArray);

View file

@ -279,9 +279,7 @@ void AudioMixer::run() {
prepareMixForListeningNode(node.data()); prepareMixForListeningNode(node.data());
memcpy(clientPacket + numBytesPacketHeader, _clientSamples, sizeof(_clientSamples)); memcpy(clientPacket + numBytesPacketHeader, _clientSamples, sizeof(_clientSamples));
nodeList->getNodeSocket().writeDatagram((char*) clientPacket, sizeof(clientPacket), nodeList->writeDatagram((char*) clientPacket, sizeof(clientPacket), node);
node->getActiveSocket()->getAddress(),
node->getActiveSocket()->getPort());
} }
} }

View file

@ -73,9 +73,7 @@ void broadcastAvatarData() {
avatarByteArray.append(nodeData->toByteArray()); avatarByteArray.append(nodeData->toByteArray());
if (avatarByteArray.size() + mixedAvatarByteArray.size() > MAX_PACKET_SIZE) { if (avatarByteArray.size() + mixedAvatarByteArray.size() > MAX_PACKET_SIZE) {
nodeList->getNodeSocket().writeDatagram(mixedAvatarByteArray, nodeList->writeDatagram(mixedAvatarByteArray, node);
node->getActiveSocket()->getAddress(),
node->getActiveSocket()->getPort());
// reset the packet // reset the packet
mixedAvatarByteArray.resize(numPacketHeaderBytes); mixedAvatarByteArray.resize(numPacketHeaderBytes);
@ -86,9 +84,7 @@ void broadcastAvatarData() {
} }
} }
nodeList->getNodeSocket().writeDatagram(mixedAvatarByteArray, nodeList->writeDatagram(mixedAvatarByteArray, node);
node->getActiveSocket()->getAddress(),
node->getActiveSocket()->getPort());
} }
} }
} }

View file

@ -142,15 +142,11 @@ int OctreeSendThread::handlePacketSend(Node* node, OctreeQueryNode* nodeData, in
} }
// actually send it // actually send it
NodeList::getInstance()->getNodeSocket().writeDatagram((char*) statsMessage, statsMessageLength, NodeList::getInstance()->writeDatagram((char*) statsMessage, statsMessageLength, SharedNodePointer(node));
nodeAddress->getAddress(),
nodeAddress->getPort());
packetSent = true; packetSent = true;
} else { } else {
// not enough room in the packet, send two packets // not enough room in the packet, send two packets
NodeList::getInstance()->getNodeSocket().writeDatagram((char*) statsMessage, statsMessageLength, NodeList::getInstance()->writeDatagram((char*) statsMessage, statsMessageLength, SharedNodePointer(node));
nodeAddress->getAddress(),
nodeAddress->getPort());
// since a stats message is only included on end of scene, don't consider any of these bytes "wasted", since // since a stats message is only included on end of scene, don't consider any of these bytes "wasted", since
// there was nothing else to send. // there was nothing else to send.
@ -168,9 +164,8 @@ int OctreeSendThread::handlePacketSend(Node* node, OctreeQueryNode* nodeData, in
truePacketsSent++; truePacketsSent++;
packetsSent++; packetsSent++;
NodeList::getInstance()->getNodeSocket().writeDatagram((char*) nodeData->getPacket(), nodeData->getPacketLength(), NodeList::getInstance()->writeDatagram((char*) nodeData->getPacket(), nodeData->getPacketLength(),
nodeAddress->getAddress(), SharedNodePointer(node));
nodeAddress->getPort());
packetSent = true; packetSent = true;
@ -189,9 +184,8 @@ int OctreeSendThread::handlePacketSend(Node* node, OctreeQueryNode* nodeData, in
// If there's actually a packet waiting, then send it. // If there's actually a packet waiting, then send it.
if (nodeData->isPacketWaiting()) { if (nodeData->isPacketWaiting()) {
// just send the voxel packet // just send the voxel packet
NodeList::getInstance()->getNodeSocket().writeDatagram((char*) nodeData->getPacket(), nodeData->getPacketLength(), NodeList::getInstance()->writeDatagram((char*) nodeData->getPacket(), nodeData->getPacketLength(),
nodeAddress->getAddress(), SharedNodePointer(node));
nodeAddress->getPort());
packetSent = true; packetSent = true;
int thisWastedBytes = MAX_PACKET_SIZE - nodeData->getPacketLength(); int thisWastedBytes = MAX_PACKET_SIZE - nodeData->getPacketLength();

View file

@ -462,6 +462,11 @@ void OctreeServer::processDatagram(const QByteArray& dataByteArray, const HifiSo
NodeList* nodeList = NodeList::getInstance(); NodeList* nodeList = NodeList::getInstance();
PacketType packetType = packetTypeForPacket(dataByteArray); PacketType packetType = packetTypeForPacket(dataByteArray);
QUuid nodeUUID;
deconstructPacketHeader(dataByteArray, nodeUUID);
SharedNodePointer matchingNode = nodeList->nodeWithUUID(nodeUUID);
if (packetType == getMyQueryMessageType()) { if (packetType == getMyQueryMessageType()) {
bool debug = false; bool debug = false;
@ -471,27 +476,24 @@ void OctreeServer::processDatagram(const QByteArray& dataByteArray, const HifiSo
// If we got a PacketType_VOXEL_QUERY, then we're talking to an NodeType_t_AVATAR, and we // If we got a PacketType_VOXEL_QUERY, then we're talking to an NodeType_t_AVATAR, and we
// need to make sure we have it in our nodeList. // need to make sure we have it in our nodeList.
QUuid nodeUUID;
deconstructPacketHeader(dataByteArray, nodeUUID);
SharedNodePointer node = nodeList->nodeWithUUID(nodeUUID);
if (node) { if (matchingNode) {
nodeList->updateNodeWithData(node.data(), senderSockAddr, dataByteArray); nodeList->updateNodeWithData(matchingNode.data(), senderSockAddr, dataByteArray);
if (!node->getActiveSocket()) { if (!matchingNode->getActiveSocket()) {
// we don't have an active socket for this node, but they're talking to us // we don't have an active socket for this node, but they're talking to us
// this means they've heard from us and can reply, let's assume public is active // this means they've heard from us and can reply, let's assume public is active
node->activatePublicSocket(); matchingNode->activatePublicSocket();
} }
OctreeQueryNode* nodeData = (OctreeQueryNode*) node->getLinkedData(); OctreeQueryNode* nodeData = (OctreeQueryNode*) matchingNode->getLinkedData();
if (nodeData && !nodeData->isOctreeSendThreadInitalized()) { if (nodeData && !nodeData->isOctreeSendThreadInitalized()) {
nodeData->initializeOctreeSendThread(this, nodeUUID); nodeData->initializeOctreeSendThread(this, nodeUUID);
} }
} }
} else if (packetType == PacketTypeJurisdictionRequest) { } else if (packetType == PacketTypeJurisdictionRequest) {
_jurisdictionSender->queueReceivedPacket(senderSockAddr, dataByteArray); _jurisdictionSender->queueReceivedPacket(matchingNode, dataByteArray);
} else if (_octreeInboundPacketProcessor && getOctree()->handlesEditPacketType(packetType)) { } else if (_octreeInboundPacketProcessor && getOctree()->handlesEditPacketType(packetType)) {
_octreeInboundPacketProcessor->queueReceivedPacket(senderSockAddr, dataByteArray); _octreeInboundPacketProcessor->queueReceivedPacket(matchingNode, dataByteArray);
} else { } else {
// let processNodeData handle it. // let processNodeData handle it.
NodeList::getInstance()->processNodeData(senderSockAddr, dataByteArray); NodeList::getInstance()->processNodeData(senderSockAddr, dataByteArray);

View file

@ -63,9 +63,7 @@ void ParticleServer::particleCreated(const Particle& newParticle, Node* node) {
copyAt += sizeof(particleID); copyAt += sizeof(particleID);
packetLength += sizeof(particleID); packetLength += sizeof(particleID);
NodeList::getInstance()->getNodeSocket().writeDatagram((char*) outputBuffer, packetLength, NodeList::getInstance()->writeDatagram((char*) outputBuffer, packetLength, SharedNodePointer(node));
node->getActiveSocket()->getAddress(),
node->getActiveSocket()->getPort());
} }
@ -104,9 +102,7 @@ int ParticleServer::sendSpecialPacket(Node* node) {
//qDebug() << "sending PacketType_PARTICLE_ERASE packetLength:" << packetLength; //qDebug() << "sending PacketType_PARTICLE_ERASE packetLength:" << packetLength;
NodeList::getInstance()->getNodeSocket().writeDatagram((char*) outputBuffer, packetLength, NodeList::getInstance()->writeDatagram((char*) outputBuffer, packetLength, SharedNodePointer(node));
node->getActiveSocket()->getAddress(),
node->getActiveSocket()->getPort());
} }
nodeData->setLastDeletedParticlesSentAt(deletePacketSentAt); nodeData->setLastDeletedParticlesSentAt(deletePacketSentAt);

View file

@ -46,9 +46,7 @@ int VoxelServer::sendSpecialPacket(Node* node) {
envPacketLength += getEnvironmentData(i)->getBroadcastData(_tempOutputBuffer + envPacketLength); envPacketLength += getEnvironmentData(i)->getBroadcastData(_tempOutputBuffer + envPacketLength);
} }
NodeList::getInstance()->getNodeSocket().writeDatagram((char*) _tempOutputBuffer, envPacketLength, NodeList::getInstance()->writeDatagram((char*) _tempOutputBuffer, envPacketLength, SharedNodePointer(node));
node->getActiveSocket()->getAddress(),
node->getActiveSocket()->getPort());
return envPacketLength; return envPacketLength;
} }

View file

@ -2543,10 +2543,7 @@ void Application::queryOctree(NodeType_t serverType, PacketType packetType, Node
int packetLength = endOfVoxelQueryPacket - voxelQueryPacket; int packetLength = endOfVoxelQueryPacket - voxelQueryPacket;
// make sure we still have an active socket // make sure we still have an active socket
if (node->getActiveSocket()) { nodeList->writeDatagram(reinterpret_cast<const char*>(voxelQueryPacket), packetLength, node);
nodeList->getNodeSocket().writeDatagram((char*) voxelQueryPacket, packetLength,
node->getActiveSocket()->getAddress(), node->getActiveSocket()->getPort());
}
// Feed number of bytes to corresponding channel of the bandwidth meter // Feed number of bytes to corresponding channel of the bandwidth meter
_bandwidthMeter.outputStream(BandwidthMeter::VOXELS).updateValue(packetLength); _bandwidthMeter.outputStream(BandwidthMeter::VOXELS).updateValue(packetLength);

View file

@ -409,10 +409,9 @@ void Audio::handleAudioInput() {
memcpy(currentPacketPtr, &headOrientation, sizeof(headOrientation)); memcpy(currentPacketPtr, &headOrientation, sizeof(headOrientation));
currentPacketPtr += sizeof(headOrientation); currentPacketPtr += sizeof(headOrientation);
nodeList->getNodeSocket().writeDatagram(monoAudioDataPacket, nodeList->writeDatagram(monoAudioDataPacket,
NETWORK_BUFFER_LENGTH_BYTES_PER_CHANNEL + leadingBytes, NETWORK_BUFFER_LENGTH_BYTES_PER_CHANNEL + leadingBytes,
audioMixer->getActiveSocket()->getAddress(), audioMixer);
audioMixer->getActiveSocket()->getPort());
Application::getInstance()->getBandwidthMeter()->outputStream(BandwidthMeter::AUDIO) Application::getInstance()->getBandwidthMeter()->outputStream(BandwidthMeter::AUDIO)
.updateValue(NETWORK_BUFFER_LENGTH_BYTES_PER_CHANNEL + leadingBytes); .updateValue(NETWORK_BUFFER_LENGTH_BYTES_PER_CHANNEL + leadingBytes);

View file

@ -84,8 +84,16 @@ void DatagramProcessor::processDatagrams() {
printf("got PacketType_VOXEL_DATA, sequence:%d flightTime:%d\n", sequence, flightTime); printf("got PacketType_VOXEL_DATA, sequence:%d flightTime:%d\n", sequence, flightTime);
} }
// add this packet to our list of voxel packets and process them on the voxel processing QUuid nodeUUID;
application->_voxelProcessor.queueReceivedPacket(senderSockAddr, incomingPacket); deconstructPacketHeader(incomingPacket, nodeUUID);
SharedNodePointer matchedNode = NodeList::getInstance()->nodeWithUUID(nodeUUID);
if (matchedNode) {
// add this packet to our list of voxel packets and process them on the voxel processing
application->_voxelProcessor.queueReceivedPacket(matchedNode, incomingPacket);
}
break; break;
} }
case PacketTypeMetavoxelData: case PacketTypeMetavoxelData:

View file

@ -226,10 +226,7 @@ void MetavoxelClient::receivedData(const QByteArray& data) {
void MetavoxelClient::sendData(const QByteArray& data) { void MetavoxelClient::sendData(const QByteArray& data) {
QMutexLocker locker(&_node->getMutex()); QMutexLocker locker(&_node->getMutex());
const HifiSockAddr* address = _node->getActiveSocket(); NodeList::getInstance()->writeDatagram(data, _node);
if (address) {
NodeList::getInstance()->getNodeSocket().writeDatagram(data, address->getAddress(), address->getPort());
}
} }
void MetavoxelClient::readPacket(Bitstream& in) { void MetavoxelClient::readPacket(Bitstream& in) {

View file

@ -95,9 +95,7 @@ void AudioInjector::injectAudio() {
if (audioMixer && nodeList->getNodeActiveSocketOrPing(audioMixer.data())) { if (audioMixer && nodeList->getNodeActiveSocketOrPing(audioMixer.data())) {
// send off this audio packet // send off this audio packet
nodeList->getNodeSocket().writeDatagram(injectAudioPacket, nodeList->writeDatagram(injectAudioPacket, audioMixer);
audioMixer->getActiveSocket()->getAddress(),
audioMixer->getActiveSocket()->getPort());
} }
currentSendPosition += bytesToCopy; currentSendPosition += bytesToCopy;

View file

@ -74,7 +74,7 @@ class AvatarData : public NodeData {
Q_PROPERTY(float headPitch READ getHeadPitch WRITE setHeadPitch) Q_PROPERTY(float headPitch READ getHeadPitch WRITE setHeadPitch)
Q_PROPERTY(QUrl faceModelURL READ getFaceModelURL WRITE setFaceModelURL) Q_PROPERTY(QUrl faceModelURL READ getFaceModelURL WRITE setFaceModelURL)
Q_PROPERTY(QURl skeletonModelURL READ getSkeletonModelURL WRITE setSkeletonModelURL) Q_PROPERTY(QUrl skeletonModelURL READ getSkeletonModelURL WRITE setSkeletonModelURL)
public: public:
AvatarData(); AvatarData();
~AvatarData(); ~AvatarData();

View file

@ -46,8 +46,7 @@ bool JurisdictionListener::queueJurisdictionRequest() {
foreach (const SharedNodePointer& node, nodeList->getNodeHash()) { foreach (const SharedNodePointer& node, nodeList->getNodeHash()) {
if (nodeList->getNodeActiveSocketOrPing(node.data()) && node->getType() == getNodeType()) { if (nodeList->getNodeActiveSocketOrPing(node.data()) && node->getType() == getNodeType()) {
const HifiSockAddr* nodeAddress = node->getActiveSocket(); _packetSender.queuePacketForSending(node, QByteArray(reinterpret_cast<char*>(bufferOut), sizeOut));
_packetSender.queuePacketForSending(*nodeAddress, QByteArray(reinterpret_cast<char*>(bufferOut), sizeOut));
nodeCount++; nodeCount++;
} }
} }

View file

@ -65,8 +65,7 @@ bool JurisdictionSender::process() {
SharedNodePointer node = NodeList::getInstance()->nodeWithUUID(nodeUUID); SharedNodePointer node = NodeList::getInstance()->nodeWithUUID(nodeUUID);
if (node && node->getActiveSocket() != NULL) { if (node && node->getActiveSocket() != NULL) {
const HifiSockAddr* nodeAddress = node->getActiveSocket(); _packetSender.queuePacketForSending(node, QByteArray(reinterpret_cast<char *>(bufferOut), sizeOut));
_packetSender.queuePacketForSending(*nodeAddress, QByteArray(reinterpret_cast<char *>(bufferOut), sizeOut));
nodeCount++; nodeCount++;
} }
} }

View file

@ -92,8 +92,7 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned c
if (node->getType() == getMyNodeType() && if (node->getType() == getMyNodeType() &&
((node->getUUID() == nodeUUID) || (nodeUUID.isNull()))) { ((node->getUUID() == nodeUUID) || (nodeUUID.isNull()))) {
if (nodeList->getNodeActiveSocketOrPing(node.data())) { if (nodeList->getNodeActiveSocketOrPing(node.data())) {
const HifiSockAddr* nodeAddress = node->getActiveSocket(); queuePacketForSending(node, QByteArray(reinterpret_cast<char*>(buffer), length));
queuePacketForSending(*nodeAddress, QByteArray(reinterpret_cast<char*>(buffer), length));
// debugging output... // debugging output...
bool wantDebugging = false; bool wantDebugging = false;

View file

@ -14,9 +14,9 @@
#include "NetworkPacket.h" #include "NetworkPacket.h"
void NetworkPacket::copyContents(const HifiSockAddr& sockAddr, const QByteArray& packet) { void NetworkPacket::copyContents(const SharedNodePointer& destinationNode, const QByteArray& packet) {
if (packet.size() && packet.size() <= MAX_PACKET_SIZE) { if (packet.size() && packet.size() <= MAX_PACKET_SIZE) {
_sockAddr = sockAddr; _destinationNode = destinationNode;
_byteArray = packet; _byteArray = packet;
} else { } else {
qDebug(">>> NetworkPacket::copyContents() unexpected length = %d", packet.size()); qDebug(">>> NetworkPacket::copyContents() unexpected length = %d", packet.size());
@ -24,28 +24,28 @@ void NetworkPacket::copyContents(const HifiSockAddr& sockAddr, const QByteArray&
} }
NetworkPacket::NetworkPacket(const NetworkPacket& packet) { NetworkPacket::NetworkPacket(const NetworkPacket& packet) {
copyContents(packet.getSockAddr(), packet.getByteArray()); copyContents(packet.getDestinationNode(), packet.getByteArray());
} }
NetworkPacket::NetworkPacket(const HifiSockAddr& sockAddr, const QByteArray& packet) { NetworkPacket::NetworkPacket(const SharedNodePointer& destinationNode, const QByteArray& packet) {
copyContents(sockAddr, packet); copyContents(destinationNode, packet);
}; };
// copy assignment // copy assignment
NetworkPacket& NetworkPacket::operator=(NetworkPacket const& other) { NetworkPacket& NetworkPacket::operator=(NetworkPacket const& other) {
copyContents(other.getSockAddr(), other.getByteArray()); copyContents(other.getDestinationNode(), other.getByteArray());
return *this; return *this;
} }
#ifdef HAS_MOVE_SEMANTICS #ifdef HAS_MOVE_SEMANTICS
// move, same as copy, but other packet won't be used further // move, same as copy, but other packet won't be used further
NetworkPacket::NetworkPacket(NetworkPacket && packet) { NetworkPacket::NetworkPacket(NetworkPacket && packet) {
copyContents(packet.getAddress(), packet.getByteArray()); copyContents(packet.getDestinationNode(), packet.getByteArray());
} }
// move assignment // move assignment
NetworkPacket& NetworkPacket::operator=(NetworkPacket&& other) { NetworkPacket& NetworkPacket::operator=(NetworkPacket&& other) {
copyContents(other.getAddress(), other.getByteArray()); copyContents(other.getDestinationNode(), other.getByteArray());
return *this; return *this;
} }
#endif #endif

View file

@ -33,15 +33,15 @@ public:
NetworkPacket& operator= (NetworkPacket&& other); // move assignment NetworkPacket& operator= (NetworkPacket&& other); // move assignment
#endif #endif
NetworkPacket(const HifiSockAddr& sockAddr, const QByteArray& byteArray); NetworkPacket(const SharedNodePointer& destinationNode, const QByteArray& byteArray);
const HifiSockAddr& getSockAddr() const { return _sockAddr; } const SharedNodePointer& getDestinationNode() const { return _destinationNode; }
const QByteArray& getByteArray() const { return _byteArray; } const QByteArray& getByteArray() const { return _byteArray; }
private: private:
void copyContents(const HifiSockAddr& sockAddr, const QByteArray& byteArray); void copyContents(const SharedNodePointer& destinationNode, const QByteArray& byteArray);
HifiSockAddr _sockAddr; SharedNodePointer _destinationNode;
QByteArray _byteArray; QByteArray _byteArray;
}; };

View file

@ -80,6 +80,37 @@ NodeList::~NodeList() {
clear(); clear();
} }
qint64 NodeList::writeDatagram(const QByteArray& datagram, const SharedNodePointer& destinationNode,
const HifiSockAddr& overridenSockAddr) {
// setup the MD5 hash for source verification in the header
int numBytesPacketHeader = numBytesForPacketHeader(datagram);
QByteArray dataSecretHash = QCryptographicHash::hash(datagram.mid(numBytesPacketHeader)
+ destinationNode->getConnectionSecret().toRfc4122(),
QCryptographicHash::Md5);
QByteArray datagramWithHash = datagram;
datagramWithHash.replace(numBytesPacketHeader - NUM_BYTES_MD5_HASH, NUM_BYTES_MD5_HASH, dataSecretHash);
// if we don't have an ovveriden address, assume they want to send to the node's active socket
const HifiSockAddr* destinationSockAddr = &overridenSockAddr;
if (overridenSockAddr.isNull()) {
if (destinationNode->getActiveSocket()) {
// use the node's active socket as the destination socket
destinationSockAddr = destinationNode->getActiveSocket();
} else {
// we don't have a socket to send to, return 0
return 0;
}
}
return _nodeSocket.writeDatagram(datagramWithHash, destinationSockAddr->getAddress(), destinationSockAddr->getPort());
}
qint64 NodeList::writeDatagram(const char* data, qint64 size, const SharedNodePointer& destinationNode,
const HifiSockAddr& overridenSockAddr) {
return writeDatagram(QByteArray(data, size), destinationNode, overridenSockAddr);
}
void NodeList::setDomainHostname(const QString& domainHostname) { void NodeList::setDomainHostname(const QString& domainHostname) {
if (domainHostname != _domainHostname) { if (domainHostname != _domainHostname) {

View file

@ -76,6 +76,11 @@ public:
void setSessionUUID(const QUuid& sessionUUID); void setSessionUUID(const QUuid& sessionUUID);
QUdpSocket& getNodeSocket() { return _nodeSocket; } QUdpSocket& getNodeSocket() { return _nodeSocket; }
qint64 writeDatagram(const QByteArray& datagram, const SharedNodePointer& destinationNode,
const HifiSockAddr& overridenSockAddr = HifiSockAddr());
qint64 writeDatagram(const char* data, qint64 size, const SharedNodePointer& destinationNode,
const HifiSockAddr& overridenSockAddr = HifiSockAddr());
void(*linkedDataCreateCallback)(Node *); void(*linkedDataCreateCallback)(Node *);

View file

@ -69,13 +69,20 @@ int populatePacketHeader(char* packet, PacketType type, const QUuid& connectionU
int numTypeBytes = packArithmeticallyCodedValue(type, packet); int numTypeBytes = packArithmeticallyCodedValue(type, packet);
packet[numTypeBytes] = versionForPacketType(type); packet[numTypeBytes] = versionForPacketType(type);
char* position = packet + numTypeBytes + sizeof(PacketVersion);
QUuid packUUID = connectionUUID.isNull() ? NodeList::getInstance()->getSessionUUID() : connectionUUID; QUuid packUUID = connectionUUID.isNull() ? NodeList::getInstance()->getSessionUUID() : connectionUUID;
QByteArray rfcUUID = packUUID.toRfc4122(); QByteArray rfcUUID = packUUID.toRfc4122();
memcpy(packet + numTypeBytes + sizeof(PacketVersion), rfcUUID.constData(), NUM_BYTES_RFC4122_UUID); memcpy(position, rfcUUID.constData(), NUM_BYTES_RFC4122_UUID);
position += NUM_BYTES_RFC4122_UUID;
// pack 16 bytes of zeros where the md5 hash will be placed one data is packed
memset(position, 0, NUM_BYTES_MD5_HASH);
position += NUM_BYTES_MD5_HASH;
// return the number of bytes written for pointer pushing // return the number of bytes written for pointer pushing
return numTypeBytes + sizeof(PacketVersion) + NUM_BYTES_RFC4122_UUID; return position - packet;
} }
bool packetVersionMatch(const QByteArray& packet) { bool packetVersionMatch(const QByteArray& packet) {
@ -101,16 +108,16 @@ bool packetVersionMatch(const QByteArray& packet) {
int numBytesForPacketHeader(const QByteArray& packet) { int numBytesForPacketHeader(const QByteArray& packet) {
// returns the number of bytes used for the type, version, and UUID // returns the number of bytes used for the type, version, and UUID
return numBytesArithmeticCodingFromBuffer(packet.data()) + sizeof(PacketVersion) + NUM_BYTES_RFC4122_UUID; return numBytesArithmeticCodingFromBuffer(packet.data()) + NUM_STATIC_HEADER_BYTES;
} }
int numBytesForPacketHeader(const char* packet) { int numBytesForPacketHeader(const char* packet) {
// returns the number of bytes used for the type, version, and UUID // returns the number of bytes used for the type, version, and UUID
return numBytesArithmeticCodingFromBuffer(packet) + sizeof(PacketVersion) + NUM_BYTES_RFC4122_UUID; return numBytesArithmeticCodingFromBuffer(packet) + NUM_STATIC_HEADER_BYTES;
} }
int numBytesForPacketHeaderGivenPacketType(PacketType type) { int numBytesForPacketHeaderGivenPacketType(PacketType type) {
return (int) ceilf((float)type / 255) + sizeof(PacketVersion) + NUM_BYTES_RFC4122_UUID; return (int) ceilf((float)type / 255) + NUM_STATIC_HEADER_BYTES;
} }
void deconstructPacketHeader(const QByteArray& packet, QUuid& senderUUID) { void deconstructPacketHeader(const QByteArray& packet, QUuid& senderUUID) {

View file

@ -12,6 +12,7 @@
#ifndef hifi_PacketHeaders_h #ifndef hifi_PacketHeaders_h
#define hifi_PacketHeaders_h #define hifi_PacketHeaders_h
#include <QtCore/QCryptographicHash>
#include <QtCore/QUuid> #include <QtCore/QUuid>
#include "UUID.h" #include "UUID.h"
@ -57,7 +58,9 @@ enum PacketType {
typedef char PacketVersion; typedef char PacketVersion;
const int MAX_PACKET_HEADER_BYTES = sizeof(PacketType) + sizeof(PacketVersion) + NUM_BYTES_RFC4122_UUID;; const int NUM_BYTES_MD5_HASH = 16;
const int NUM_STATIC_HEADER_BYTES = sizeof(PacketVersion) + NUM_BYTES_RFC4122_UUID + NUM_BYTES_MD5_HASH;
const int MAX_PACKET_HEADER_BYTES = sizeof(PacketType) + NUM_STATIC_HEADER_BYTES;
PacketVersion versionForPacketType(PacketType type); PacketVersion versionForPacketType(PacketType type);

View file

@ -47,8 +47,8 @@ PacketSender::~PacketSender() {
} }
void PacketSender::queuePacketForSending(const HifiSockAddr& address, const QByteArray& packet) { void PacketSender::queuePacketForSending(const SharedNodePointer& destinationNode, const QByteArray& packet) {
NetworkPacket networkPacket(address, packet); NetworkPacket networkPacket(destinationNode, packet);
lock(); lock();
_packets.push_back(networkPacket); _packets.push_back(networkPacket);
unlock(); unlock();
@ -263,9 +263,7 @@ bool PacketSender::nonThreadedProcess() {
unlock(); unlock();
// send the packet through the NodeList... // send the packet through the NodeList...
NodeList::getInstance()->getNodeSocket().writeDatagram(temporary.getByteArray(), NodeList::getInstance()->writeDatagram(temporary.getByteArray(), temporary.getDestinationNode());
temporary.getSockAddr().getAddress(),
temporary.getSockAddr().getPort());
packetsSentThisCall++; packetsSentThisCall++;
_packetsOverCheckInterval++; _packetsOverCheckInterval++;
_totalPacketsSent++; _totalPacketsSent++;

View file

@ -37,7 +37,7 @@ public:
/// \param packetData pointer to data /// \param packetData pointer to data
/// \param ssize_t packetLength size of data /// \param ssize_t packetLength size of data
/// \thread any thread, typically the application thread /// \thread any thread, typically the application thread
void queuePacketForSending(const HifiSockAddr& address, const QByteArray& packet); void queuePacketForSending(const SharedNodePointer& destinationNode, const QByteArray& packet);
void setPacketsPerSecond(int packetsPerSecond); void setPacketsPerSecond(int packetsPerSecond);
int getPacketsPerSecond() const { return _packetsPerSecond; } int getPacketsPerSecond() const { return _packetsPerSecond; }

View file

@ -16,14 +16,11 @@ ReceivedPacketProcessor::ReceivedPacketProcessor() {
_dontSleep = false; _dontSleep = false;
} }
void ReceivedPacketProcessor::queueReceivedPacket(const HifiSockAddr& address, const QByteArray& packet) { void ReceivedPacketProcessor::queueReceivedPacket(const SharedNodePointer& destinationNode, const QByteArray& packet) {
// Make sure our Node and NodeList knows we've heard from this node. // Make sure our Node and NodeList knows we've heard from this node.
SharedNodePointer node = NodeList::getInstance()->nodeWithAddress(address); destinationNode->setLastHeardMicrostamp(usecTimestampNow());
if (node) {
node->setLastHeardMicrostamp(usecTimestampNow());
}
NetworkPacket networkPacket(address, packet); NetworkPacket networkPacket(destinationNode, packet);
lock(); lock();
_packets.push_back(networkPacket); _packets.push_back(networkPacket);
unlock(); unlock();
@ -44,7 +41,8 @@ bool ReceivedPacketProcessor::process() {
NetworkPacket temporary = packet; // make a copy of the packet in case the vector is resized on us NetworkPacket temporary = packet; // make a copy of the packet in case the vector is resized on us
_packets.erase(_packets.begin()); // remove the oldest packet _packets.erase(_packets.begin()); // remove the oldest packet
unlock(); // let others add to the packets unlock(); // let others add to the packets
processPacket(temporary.getSockAddr(), temporary.getByteArray()); // process our temporary copy processPacket(*temporary.getDestinationNode()->getActiveSocket(),
temporary.getByteArray()); // process our temporary copy
} }
return isStillRunning(); // keep running till they terminate us return isStillRunning(); // keep running till they terminate us
} }

View file

@ -24,7 +24,7 @@ public:
/// \param packetData pointer to received data /// \param packetData pointer to received data
/// \param ssize_t packetLength size of received data /// \param ssize_t packetLength size of received data
/// \thread network receive thread /// \thread network receive thread
void queueReceivedPacket(const HifiSockAddr& senderSockAddr, const QByteArray& packet); void queueReceivedPacket(const SharedNodePointer& destinationNode, const QByteArray& packet);
/// Are there received packets waiting to be processed /// Are there received packets waiting to be processed
bool hasPacketsToProcess() const { return _packets.size() > 0; } bool hasPacketsToProcess() const { return _packets.size() > 0; }