make ThreadedAssignment subclasses handle QUdpSocket readyRead, closes #1895

This commit is contained in:
Stephen Birarda 2014-02-07 11:52:37 -08:00
parent 66d4eeb805
commit a5636d2dc0
14 changed files with 218 additions and 162 deletions

View file

@ -31,36 +31,43 @@ Agent::Agent(const QByteArray& packet) :
_scriptEngine.getParticlesScriptingInterface()->setPacketSender(&_particleEditSender);
}
void Agent::processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr) {
PacketType datagramPacketType = packetTypeForPacket(dataByteArray);
if (datagramPacketType == PacketTypeJurisdiction) {
int headerBytes = numBytesForPacketHeader(dataByteArray);
QUuid nodeUUID;
SharedNodePointer matchedNode = NodeList::getInstance()->nodeWithUUID(nodeUUID);
if (matchedNode) {
// PacketType_JURISDICTION, first byte is the node type...
switch (dataByteArray[headerBytes]) {
case NodeType::VoxelServer:
_scriptEngine.getVoxelsScriptingInterface()->getJurisdictionListener()->queueReceivedPacket(matchedNode,
dataByteArray);
break;
case NodeType::ParticleServer:
_scriptEngine.getParticlesScriptingInterface()->getJurisdictionListener()->queueReceivedPacket(matchedNode,
dataByteArray);
break;
void Agent::readPendingDatagrams() {
QByteArray receivedPacket;
HifiSockAddr senderSockAddr;
NodeList* nodeList = NodeList::getInstance();
while (readAvailableDatagram(receivedPacket, senderSockAddr)) {
if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
PacketType datagramPacketType = packetTypeForPacket(receivedPacket);
if (datagramPacketType == PacketTypeJurisdiction) {
int headerBytes = numBytesForPacketHeader(receivedPacket);
SharedNodePointer matchedNode = nodeList->sendingNodeForPacket(receivedPacket);
if (matchedNode) {
// PacketType_JURISDICTION, first byte is the node type...
switch (receivedPacket[headerBytes]) {
case NodeType::VoxelServer:
_scriptEngine.getVoxelsScriptingInterface()->getJurisdictionListener()->queueReceivedPacket(matchedNode,
receivedPacket);
break;
case NodeType::ParticleServer:
_scriptEngine.getParticlesScriptingInterface()->getJurisdictionListener()->queueReceivedPacket(matchedNode,
receivedPacket);
break;
}
}
} else if (datagramPacketType == PacketTypeParticleAddResponse) {
// this will keep creatorTokenIDs to IDs mapped correctly
Particle::handleAddParticleResponse(receivedPacket);
// also give our local particle tree a chance to remap any internal locally created particles
_particleTree.handleAddParticleResponse(receivedPacket);
} else {
NodeList::getInstance()->processNodeData(senderSockAddr, receivedPacket);
}
}
} else if (datagramPacketType == PacketTypeParticleAddResponse) {
// this will keep creatorTokenIDs to IDs mapped correctly
Particle::handleAddParticleResponse(dataByteArray);
// also give our local particle tree a chance to remap any internal locally created particles
_particleTree.handleAddParticleResponse(dataByteArray);
} else {
NodeList::getInstance()->processNodeData(senderSockAddr, dataByteArray);
}
}

View file

@ -35,7 +35,7 @@ public:
public slots:
void run();
void processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr);
void readPendingDatagrams();
signals:
void willSendAudioDataCallback();
void willSendVisualDataCallback();

View file

@ -90,8 +90,7 @@ AssignmentClient::AssignmentClient(int &argc, char **argv) :
timer->start(ASSIGNMENT_REQUEST_INTERVAL_MSECS);
// connect our readPendingDatagrams method to the readyRead() signal of the socket
connect(&nodeList->getNodeSocket(), &QUdpSocket::readyRead, this, &AssignmentClient::readPendingDatagrams,
Qt::QueuedConnection);
connect(&nodeList->getNodeSocket(), &QUdpSocket::readyRead, this, &AssignmentClient::readPendingDatagrams);
}
void AssignmentClient::sendAssignmentRequest() {
@ -112,49 +111,44 @@ void AssignmentClient::readPendingDatagrams() {
senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer());
if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
if (_currentAssignment) {
// have the threaded current assignment handle this datagram
QMetaObject::invokeMethod(_currentAssignment, "processDatagram", Qt::QueuedConnection,
Q_ARG(QByteArray, receivedPacket),
Q_ARG(HifiSockAddr, senderSockAddr));
} else if (packetTypeForPacket(receivedPacket) == PacketTypeCreateAssignment) {
if (packetTypeForPacket(receivedPacket) == PacketTypeCreateAssignment) {
// construct the deployed assignment from the packet data
_currentAssignment = AssignmentFactory::unpackAssignment(receivedPacket);
if (_currentAssignment) {
qDebug() << "Dropping received assignment since we are currently running one.";
} else {
// construct the deployed assignment from the packet data
_currentAssignment = AssignmentFactory::unpackAssignment(receivedPacket);
qDebug() << "Received an assignment -" << *_currentAssignment;
if (_currentAssignment) {
qDebug() << "Received an assignment -" << *_currentAssignment;
// switch our nodelist domain IP and port to whoever sent us the assignment
nodeList->setDomainSockAddr(senderSockAddr);
nodeList->setSessionUUID(_currentAssignment->getUUID());
qDebug() << "Destination IP for assignment is" << nodeList->getDomainIP().toString();
// start the deployed assignment
QThread* workerThread = new QThread(this);
connect(workerThread, SIGNAL(started()), _currentAssignment, SLOT(run()));
connect(_currentAssignment, SIGNAL(finished()), this, SLOT(assignmentCompleted()));
connect(_currentAssignment, SIGNAL(finished()), workerThread, SLOT(quit()));
connect(_currentAssignment, SIGNAL(finished()), _currentAssignment, SLOT(deleteLater()));
connect(workerThread, SIGNAL(finished()), workerThread, SLOT(deleteLater()));
_currentAssignment->moveToThread(workerThread);
// move the NodeList to the thread used for the _current assignment
nodeList->moveToThread(workerThread);
// Starts an event loop, and emits workerThread->started()
workerThread->start();
} else {
qDebug() << "Received an assignment that could not be unpacked. Re-requesting.";
}
// switch our nodelist domain IP and port to whoever sent us the assignment
nodeList->setDomainSockAddr(senderSockAddr);
nodeList->setSessionUUID(_currentAssignment->getUUID());
qDebug() << "Destination IP for assignment is" << nodeList->getDomainIP().toString();
// start the deployed assignment
QThread* workerThread = new QThread(this);
connect(workerThread, SIGNAL(started()), _currentAssignment, SLOT(run()));
connect(_currentAssignment, SIGNAL(finished()), this, SLOT(assignmentCompleted()));
connect(_currentAssignment, SIGNAL(finished()), workerThread, SLOT(quit()));
connect(_currentAssignment, SIGNAL(finished()), _currentAssignment, SLOT(deleteLater()));
connect(workerThread, SIGNAL(finished()), workerThread, SLOT(deleteLater()));
_currentAssignment->moveToThread(workerThread);
// move the NodeList to the thread used for the _current assignment
nodeList->moveToThread(workerThread);
// let the assignment handle the incoming datagrams for its duration
disconnect(&nodeList->getNodeSocket(), 0, this, 0);
connect(&nodeList->getNodeSocket(), &QUdpSocket::readyRead, _currentAssignment,
&ThreadedAssignment::readPendingDatagrams);
// Starts an event loop, and emits workerThread->started()
workerThread->start();
} else {
qDebug() << "Received an assignment that could not be unpacked. Re-requesting.";
}
} else {
// have the NodeList attempt to handle it
@ -170,10 +164,14 @@ void AssignmentClient::assignmentCompleted() {
qDebug("Assignment finished or never started - waiting for new assignment.");
_currentAssignment = NULL;
NodeList* nodeList = NodeList::getInstance();
// have us handle incoming NodeList datagrams again
disconnect(&nodeList->getNodeSocket(), 0, _currentAssignment, 0);
connect(&nodeList->getNodeSocket(), &QUdpSocket::readyRead, this, &AssignmentClient::readPendingDatagrams);
_currentAssignment = NULL;
// reset our NodeList by switching back to unassigned and clearing the list
nodeList->setOwnerType(NodeType::Unassigned);
nodeList->reset();

View file

@ -207,17 +207,25 @@ void AudioMixer::prepareMixForListeningNode(Node* node) {
}
void AudioMixer::processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr) {
// pull any new audio data from nodes off of the network stack
PacketType mixerPacketType = packetTypeForPacket(dataByteArray);
if (mixerPacketType == PacketTypeMicrophoneAudioNoEcho
|| mixerPacketType == PacketTypeMicrophoneAudioWithEcho
|| mixerPacketType == PacketTypeInjectAudio) {
NodeList::getInstance()->findNodeAndUpdateWithDataFromPacket(dataByteArray);
} else {
// let processNodeData handle it.
NodeList::getInstance()->processNodeData(senderSockAddr, dataByteArray);
void AudioMixer::readPendingDatagrams() {
QByteArray receivedPacket;
HifiSockAddr senderSockAddr;
NodeList* nodeList = NodeList::getInstance();
while (readAvailableDatagram(receivedPacket, senderSockAddr)) {
if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
// pull any new audio data from nodes off of the network stack
PacketType mixerPacketType = packetTypeForPacket(receivedPacket);
if (mixerPacketType == PacketTypeMicrophoneAudioNoEcho
|| mixerPacketType == PacketTypeMicrophoneAudioWithEcho
|| mixerPacketType == PacketTypeInjectAudio) {
nodeList->findNodeAndUpdateWithDataFromPacket(receivedPacket);
} else {
// let processNodeData handle it.
nodeList->processNodeData(senderSockAddr, receivedPacket);
}
}
}
}

View file

@ -25,7 +25,7 @@ public slots:
/// threaded run of assignment
void run();
void processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr);
void readPendingDatagrams();
private:
/// adds one buffer to the mix for a listening node
void addBufferToMixForListeningNodeWithBuffer(PositionalAudioRingBuffer* bufferToAdd,

View file

@ -136,47 +136,52 @@ void AvatarMixer::nodeKilled(SharedNodePointer killedNode) {
}
}
void AvatarMixer::processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr) {
void AvatarMixer::readPendingDatagrams() {
QByteArray receivedPacket;
HifiSockAddr senderSockAddr;
NodeList* nodeList = NodeList::getInstance();
switch (packetTypeForPacket(dataByteArray)) {
case PacketTypeAvatarData: {
nodeList->findNodeAndUpdateWithDataFromPacket(dataByteArray);
break;
}
case PacketTypeAvatarIdentity: {
// check if we have a matching node in our list
SharedNodePointer avatarNode = nodeList->sendingNodeForPacket(dataByteArray);
if (avatarNode && avatarNode->getLinkedData()) {
AvatarMixerClientData* nodeData = reinterpret_cast<AvatarMixerClientData*>(avatarNode->getLinkedData());
if (nodeData->hasIdentityChangedAfterParsing(dataByteArray)
&& !nodeData->hasSentIdentityBetweenKeyFrames()) {
// this avatar changed their identity in some way and we haven't sent a packet in this keyframe
QByteArray identityPacket = byteArrayWithPopluatedHeader(PacketTypeAvatarIdentity);
QByteArray individualByteArray = nodeData->identityByteArray();
individualByteArray.replace(0, NUM_BYTES_RFC4122_UUID, avatarNode->getUUID().toRfc4122());
identityPacket.append(individualByteArray);
nodeData->setHasSentIdentityBetweenKeyFrames(true);
nodeList->broadcastToNodes(identityPacket, NodeSet() << NodeType::Agent);
while (readAvailableDatagram(receivedPacket, senderSockAddr)) {
if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
switch (packetTypeForPacket(receivedPacket)) {
case PacketTypeAvatarData: {
nodeList->findNodeAndUpdateWithDataFromPacket(receivedPacket);
break;
}
case PacketTypeAvatarIdentity: {
// check if we have a matching node in our list
SharedNodePointer avatarNode = nodeList->sendingNodeForPacket(receivedPacket);
if (avatarNode && avatarNode->getLinkedData()) {
AvatarMixerClientData* nodeData = reinterpret_cast<AvatarMixerClientData*>(avatarNode->getLinkedData());
if (nodeData->hasIdentityChangedAfterParsing(receivedPacket)
&& !nodeData->hasSentIdentityBetweenKeyFrames()) {
// this avatar changed their identity in some way and we haven't sent a packet in this keyframe
QByteArray identityPacket = byteArrayWithPopluatedHeader(PacketTypeAvatarIdentity);
QByteArray individualByteArray = nodeData->identityByteArray();
individualByteArray.replace(0, NUM_BYTES_RFC4122_UUID, avatarNode->getUUID().toRfc4122());
identityPacket.append(individualByteArray);
nodeData->setHasSentIdentityBetweenKeyFrames(true);
nodeList->broadcastToNodes(identityPacket, NodeSet() << NodeType::Agent);
}
}
}
case PacketTypeKillAvatar: {
nodeList->processKillNode(receivedPacket);
break;
}
default:
// hand this off to the NodeList
nodeList->processNodeData(senderSockAddr, receivedPacket);
break;
}
}
case PacketTypeKillAvatar: {
nodeList->processKillNode(dataByteArray);
break;
}
default:
// hand this off to the NodeList
nodeList->processNodeData(senderSockAddr, dataByteArray);
break;
}
}
const qint64 AVATAR_IDENTITY_KEYFRAME_MSECS = 5000;

View file

@ -23,7 +23,7 @@ public slots:
void nodeAdded(SharedNodePointer nodeAdded);
void nodeKilled(SharedNodePointer killedNode);
void processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr);
void readPendingDatagrams();
};
#endif /* defined(__hifi__AvatarMixer__) */

View file

@ -41,15 +41,24 @@ void MetavoxelServer::run() {
_sendTimer.start(SEND_INTERVAL);
}
void MetavoxelServer::processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr) {
switch (dataByteArray.at(0)) {
case PacketTypeMetavoxelData:
processData(dataByteArray, senderSockAddr);
break;
default:
NodeList::getInstance()->processNodeData(senderSockAddr, dataByteArray);
break;
void MetavoxelServer::readPendingDatagrams() {
QByteArray receivedPacket;
HifiSockAddr senderSockAddr;
NodeList* nodeList = NodeList::getInstance();
while (readAvailableDatagram(receivedPacket, senderSockAddr)) {
if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
switch (packetTypeForPacket(receivedPacket)) {
case PacketTypeMetavoxelData:
processData(receivedPacket, senderSockAddr);
break;
default:
NodeList::getInstance()->processNodeData(senderSockAddr, receivedPacket);
break;
}
}
}
}

View file

@ -39,7 +39,7 @@ public:
virtual void run();
virtual void processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr);
virtual void readPendingDatagrams();
private slots:

View file

@ -458,36 +458,43 @@ void OctreeServer::parsePayload() {
}
}
void OctreeServer::processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr) {
NodeList* nodeList = NodeList::getInstance();
PacketType packetType = packetTypeForPacket(dataByteArray);
void OctreeServer::readPendingDatagrams() {
QByteArray receivedPacket;
HifiSockAddr senderSockAddr;
SharedNodePointer matchingNode = nodeList->sendingNodeForPacket(dataByteArray);
if (packetType == getMyQueryMessageType()) {
bool debug = false;
if (debug) {
qDebug() << "Got PacketTypeVoxelQuery at" << usecTimestampNow();
}
// If we got a PacketType_VOXEL_QUERY, then we're talking to an NodeType_t_AVATAR, and we
// need to make sure we have it in our nodeList.
if (matchingNode) {
nodeList->updateNodeWithDataFromPacket(matchingNode, dataByteArray);
NodeList* nodeList = NodeList::getInstance();
while (readAvailableDatagram(receivedPacket, senderSockAddr)) {
if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
PacketType packetType = packetTypeForPacket(receivedPacket);
OctreeQueryNode* nodeData = (OctreeQueryNode*) matchingNode->getLinkedData();
if (nodeData && !nodeData->isOctreeSendThreadInitalized()) {
nodeData->initializeOctreeSendThread(this, matchingNode->getUUID());
SharedNodePointer matchingNode = nodeList->sendingNodeForPacket(receivedPacket);
if (packetType == getMyQueryMessageType()) {
bool debug = false;
if (debug) {
qDebug() << "Got PacketTypeVoxelQuery at" << usecTimestampNow();
}
// If we got a PacketType_VOXEL_QUERY, then we're talking to an NodeType_t_AVATAR, and we
// need to make sure we have it in our nodeList.
if (matchingNode) {
nodeList->updateNodeWithDataFromPacket(matchingNode, receivedPacket);
OctreeQueryNode* nodeData = (OctreeQueryNode*) matchingNode->getLinkedData();
if (nodeData && !nodeData->isOctreeSendThreadInitalized()) {
nodeData->initializeOctreeSendThread(this, matchingNode->getUUID());
}
}
} else if (packetType == PacketTypeJurisdictionRequest) {
_jurisdictionSender->queueReceivedPacket(matchingNode, receivedPacket);
} else if (_octreeInboundPacketProcessor && getOctree()->handlesEditPacketType(packetType)) {
_octreeInboundPacketProcessor->queueReceivedPacket(matchingNode, receivedPacket);
} else {
// let processNodeData handle it.
NodeList::getInstance()->processNodeData(senderSockAddr, receivedPacket);
}
}
} else if (packetType == PacketTypeJurisdictionRequest) {
_jurisdictionSender->queueReceivedPacket(matchingNode, dataByteArray);
} else if (_octreeInboundPacketProcessor && getOctree()->handlesEditPacketType(packetType)) {
_octreeInboundPacketProcessor->queueReceivedPacket(matchingNode, dataByteArray);
} else {
// let processNodeData handle it.
NodeList::getInstance()->processNodeData(senderSockAddr, dataByteArray);
}
}

View file

@ -67,7 +67,7 @@ public:
public slots:
/// runs the voxel server assignment
void run();
void processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr);
void readPendingDatagrams();
protected:
void parsePayload();

View file

@ -94,7 +94,12 @@ bool NodeList::packetVersionAndHashMatch(const QByteArray& packet) {
<< qPrintable(QString::number(versionForPacketType(mismatchType))) << "expected.";
}
if (packetTypeForPacket(packet) != PacketTypeDomainList && packetTypeForPacket(packet) != PacketTypeDomainListRequest) {
const QSet<PacketType> NON_VERIFIED_PACKETS = QSet<PacketType>() << PacketTypeDomainList
<< PacketTypeDomainListRequest << PacketTypeStunResponse << PacketTypeDataServerConfirm
<< PacketTypeDataServerGet << PacketTypeDataServerPut << PacketTypeDataServerSend
<< PacketTypeCreateAssignment << PacketTypeRequestAssignment;
if (!NON_VERIFIED_PACKETS.contains(packetTypeForPacket(packet))) {
// figure out which node this is from
SharedNodePointer sendingNode = sendingNodeForPacket(packet);
if (sendingNode) {
@ -102,13 +107,15 @@ bool NodeList::packetVersionAndHashMatch(const QByteArray& packet) {
if (hashFromPacketHeader(packet) == hashForPacketAndConnectionUUID(packet, sendingNode->getConnectionSecret())) {
return true;
} else {
qDebug() << "Packet hash mismatch" << packetTypeForPacket(packet) << "received from known node with UUID"
<< uuidFromPacketHeader(packet);
qDebug() << "Packet hash mismatch on" << packetTypeForPacket(packet) << "- Sender"
<< uuidFromPacketHeader(packet);
}
} else {
qDebug() << "Packet of type" << packetTypeForPacket(packet) << "received from unknown node with UUID"
<< uuidFromPacketHeader(packet);
<< uuidFromPacketHeader(packet);
}
} else {
return true;
}
return false;

View file

@ -60,3 +60,16 @@ void ThreadedAssignment::checkInWithDomainServerOrExit() {
NodeList::getInstance()->sendDomainServerCheckIn();
}
}
bool ThreadedAssignment::readAvailableDatagram(QByteArray& destinationByteArray, HifiSockAddr& senderSockAddr) {
NodeList* nodeList = NodeList::getInstance();
if (nodeList->getNodeSocket().hasPendingDatagrams()) {
destinationByteArray.resize(nodeList->getNodeSocket().pendingDatagramSize());
nodeList->getNodeSocket().readDatagram(destinationByteArray.data(), destinationByteArray.size(),
senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer());
return true;
} else {
return false;
}
}

View file

@ -23,8 +23,10 @@ public slots:
virtual void deleteLater();
virtual void processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr) = 0;
virtual void readPendingDatagrams() = 0;
protected:
bool readAvailableDatagram(QByteArray& destinationByteArray, HifiSockAddr& senderSockAddr);
void commonInit(const char* targetName, NodeType_t nodeType);
bool _isFinished;
private slots: