re-work AvatarMixer to new QCA infrastructure, closes #1300

This commit is contained in:
Stephen Birarda 2013-12-03 10:58:04 -08:00
parent 606eaa579a
commit ed854dcb7b
2 changed files with 118 additions and 90 deletions

View file

@ -10,6 +10,9 @@
// The avatar mixer receives head, hand and positional data from all connected
// nodes, and broadcasts that data back to them, every BROADCAST_INTERVAL ms.
#include <QtCore/QCoreApplication>
#include <QtCore/QTimer>
#include <Logging.h>
#include <NodeList.h>
#include <PacketHeaders.h>
@ -22,6 +25,14 @@
const char AVATAR_MIXER_LOGGING_NAME[] = "avatar-mixer";
const unsigned int AVATAR_DATA_SEND_INTERVAL_USECS = (1 / 60.0) * 1000 * 1000;
AvatarMixer::AvatarMixer(const unsigned char* dataBuffer, int numBytes) :
ThreadedAssignment(dataBuffer, numBytes)
{
}
unsigned char* addNodeToBroadcastPacket(unsigned char *currentPosition, Node *nodeToAdd) {
QByteArray rfcUUID = nodeToAdd->getUUID().toRfc4122();
memcpy(currentPosition, rfcUUID.constData(), rfcUUID.size());
@ -46,7 +57,7 @@ void attachAvatarDataToNode(Node* newNode) {
// 3) if we need to rate limit the amount of data we send, we can use a distance weighted "semi-random" function to
// determine which avatars are included in the packet stream
// 4) we should optimize the avatar data format to be more compact (100 bytes is pretty wasteful).
void broadcastAvatarData(NodeList* nodeList, const QUuid& receiverUUID, const HifiSockAddr& receiverSockAddr) {
void broadcastAvatarData() {
static unsigned char broadcastPacketBuffer[MAX_PACKET_SIZE];
static unsigned char avatarDataBuffer[MAX_PACKET_SIZE];
unsigned char* broadcastPacket = (unsigned char*)&broadcastPacketBuffer[0];
@ -55,40 +66,91 @@ void broadcastAvatarData(NodeList* nodeList, const QUuid& receiverUUID, const Hi
int packetLength = currentBufferPosition - broadcastPacket;
int packetsSent = 0;
// send back a packet with other active node data to this node
NodeList* nodeList = NodeList::getInstance();
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
if (node->getLinkedData() && node->getUUID() != receiverUUID) {
unsigned char* avatarDataEndpoint = addNodeToBroadcastPacket((unsigned char*)&avatarDataBuffer[0], &*node);
int avatarDataLength = avatarDataEndpoint - (unsigned char*)&avatarDataBuffer;
if (avatarDataLength + packetLength <= MAX_PACKET_SIZE) {
memcpy(currentBufferPosition, &avatarDataBuffer[0], avatarDataLength);
packetLength += avatarDataLength;
currentBufferPosition += avatarDataLength;
} else {
packetsSent++;
//printf("packetsSent=%d packetLength=%d\n", packetsSent, packetLength);
nodeList->getNodeSocket().writeDatagram((char*) broadcastPacket, currentBufferPosition - broadcastPacket,
receiverSockAddr.getAddress(), receiverSockAddr.getPort());
// reset the packet
currentBufferPosition = broadcastPacket + numHeaderBytes;
packetLength = currentBufferPosition - broadcastPacket;
// copy the avatar that didn't fit into the next packet
memcpy(currentBufferPosition, &avatarDataBuffer[0], avatarDataLength);
packetLength += avatarDataLength;
currentBufferPosition += avatarDataLength;
if (node->getLinkedData() && node->getType() == NODE_TYPE_AGENT && node->getActiveSocket()) {
// this is an AGENT we have received head data from
// send back a packet with other active node data to this node
for (NodeList::iterator otherNode = nodeList->begin(); otherNode != nodeList->end(); otherNode++) {
if (otherNode->getLinkedData() && otherNode->getUUID() != node->getUUID()) {
unsigned char* avatarDataEndpoint = addNodeToBroadcastPacket((unsigned char*)&avatarDataBuffer[0], &*node);
int avatarDataLength = avatarDataEndpoint - (unsigned char*)&avatarDataBuffer;
if (avatarDataLength + packetLength <= MAX_PACKET_SIZE) {
memcpy(currentBufferPosition, &avatarDataBuffer[0], avatarDataLength);
packetLength += avatarDataLength;
currentBufferPosition += avatarDataLength;
} else {
packetsSent++;
//printf("packetsSent=%d packetLength=%d\n", packetsSent, packetLength);
nodeList->getNodeSocket().writeDatagram((char*) broadcastPacket, currentBufferPosition - broadcastPacket,
node->getActiveSocket()->getAddress(),
node->getActiveSocket()->getPort());
// reset the packet
currentBufferPosition = broadcastPacket + numHeaderBytes;
packetLength = currentBufferPosition - broadcastPacket;
// copy the avatar that didn't fit into the next packet
memcpy(currentBufferPosition, &avatarDataBuffer[0], avatarDataLength);
packetLength += avatarDataLength;
currentBufferPosition += avatarDataLength;
}
}
}
packetsSent++;
//printf("packetsSent=%d packetLength=%d\n", packetsSent, packetLength);
NodeList::getInstance()->getNodeSocket().writeDatagram((char*) broadcastPacket, currentBufferPosition - broadcastPacket,
node->getActiveSocket()->getAddress(),
node->getActiveSocket()->getPort());
}
}
packetsSent++;
//printf("packetsSent=%d packetLength=%d\n", packetsSent, packetLength);
nodeList->getNodeSocket().writeDatagram((char*) broadcastPacket, currentBufferPosition - broadcastPacket,
receiverSockAddr.getAddress(), receiverSockAddr.getPort());
}
AvatarMixer::AvatarMixer(const unsigned char* dataBuffer, int numBytes) : Assignment(dataBuffer, numBytes) {
void AvatarMixer::processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr) {
NodeList* nodeList = NodeList::getInstance();
switch (dataByteArray.data()[0]) {
case PACKET_TYPE_HEAD_DATA: {
QUuid nodeUUID = QUuid::fromRfc4122(dataByteArray.mid(numBytesForPacketHeader((unsigned char*) dataByteArray.data()),
NUM_BYTES_RFC4122_UUID));
// add or update the node in our list
Node* avatarNode = nodeList->nodeWithUUID(nodeUUID);
if (avatarNode) {
// parse positional data from an node
nodeList->updateNodeWithData(avatarNode, senderSockAddr,
(unsigned char*) dataByteArray.data(), dataByteArray.size());
} else {
break;
}
}
case PACKET_TYPE_KILL_NODE:
case PACKET_TYPE_AVATAR_URLS:
case PACKET_TYPE_AVATAR_FACE_VIDEO: {
QUuid nodeUUID = QUuid::fromRfc4122(dataByteArray.mid(numBytesForPacketHeader((unsigned char*) dataByteArray.data()),
NUM_BYTES_RFC4122_UUID));
// let everyone else know about the update
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
if (node->getActiveSocket() && node->getUUID() != nodeUUID) {
nodeList->getNodeSocket().writeDatagram(dataByteArray,
node->getActiveSocket()->getAddress(),
node->getActiveSocket()->getPort());
}
}
// let node kills fall through to default behavior
}
default:
// hand this off to the NodeList
nodeList->processNodeData(senderSockAddr, (unsigned char*) dataByteArray.data(), dataByteArray.size());
break;
}
}
@ -103,76 +165,39 @@ void AvatarMixer::run() {
nodeList->linkedDataCreateCallback = attachAvatarDataToNode;
nodeList->startSilentNodeRemovalThread();
QTimer* domainServerTimer = new QTimer(this);
connect(domainServerTimer, SIGNAL(timeout()), this, SLOT(checkInWithDomainServerOrExit()));
domainServerTimer->start(DOMAIN_SERVER_CHECK_IN_USECS / 1000);
HifiSockAddr nodeSockAddr;
ssize_t receivedBytes = 0;
QTimer* pingNodesTimer = new QTimer(this);
connect(pingNodesTimer, SIGNAL(timeout()), nodeList, SLOT(pingInactiveNodes()));
pingNodesTimer->start(PING_INACTIVE_NODE_INTERVAL_USECS / 1000);
unsigned char packetData[MAX_PACKET_SIZE];
QTimer* silentNodeTimer = new QTimer(this);
connect(silentNodeTimer, SIGNAL(timeout()), nodeList, SLOT(removeSilentNodes()));
silentNodeTimer->start(NODE_SILENCE_THRESHOLD_USECS / 1000);
QUuid nodeUUID;
Node* avatarNode = NULL;
int nextFrame = 0;
timeval startTime;
timeval lastDomainServerCheckIn = {};
gettimeofday(&startTime, NULL);
while (true) {
while (!_isFinished) {
if (NodeList::getInstance()->getNumNoReplyDomainCheckIns() == MAX_SILENT_DOMAIN_SERVER_CHECK_INS) {
QCoreApplication::processEvents();
if (_isFinished) {
break;
}
// send a check in packet to the domain server if DOMAIN_SERVER_CHECK_IN_USECS has elapsed
if (usecTimestampNow() - usecTimestamp(&lastDomainServerCheckIn) >= DOMAIN_SERVER_CHECK_IN_USECS) {
gettimeofday(&lastDomainServerCheckIn, NULL);
NodeList::getInstance()->sendDomainServerCheckIn();
}
broadcastAvatarData();
nodeList->possiblyPingInactiveNodes();
int usecToSleep = usecTimestamp(&startTime) + (++nextFrame * AVATAR_DATA_SEND_INTERVAL_USECS) - usecTimestampNow();
if (nodeList->getNodeSocket().hasPendingDatagrams() &&
(receivedBytes = nodeList->getNodeSocket().readDatagram((char*) packetData, MAX_PACKET_SIZE,
nodeSockAddr.getAddressPointer(),
nodeSockAddr.getPortPointer())) &&
packetVersionMatch(packetData)) {
switch (packetData[0]) {
case PACKET_TYPE_HEAD_DATA:
nodeUUID = QUuid::fromRfc4122(QByteArray((char*) packetData + numBytesForPacketHeader(packetData),
NUM_BYTES_RFC4122_UUID));
// add or update the node in our list
avatarNode = nodeList->nodeWithUUID(nodeUUID);
if (avatarNode) {
// parse positional data from an node
nodeList->updateNodeWithData(avatarNode, nodeSockAddr, packetData, receivedBytes);
} else {
break;
}
case PACKET_TYPE_INJECT_AUDIO:
broadcastAvatarData(nodeList, nodeUUID, nodeSockAddr);
break;
case PACKET_TYPE_KILL_NODE:
case PACKET_TYPE_AVATAR_URLS:
case PACKET_TYPE_AVATAR_FACE_VIDEO:
nodeUUID = QUuid::fromRfc4122(QByteArray((char*) packetData + numBytesForPacketHeader(packetData),
NUM_BYTES_RFC4122_UUID));
// let everyone else know about the update
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
if (node->getActiveSocket() && node->getUUID() != nodeUUID) {
nodeList->getNodeSocket().writeDatagram((char*) packetData, receivedBytes,
node->getActiveSocket()->getAddress(),
node->getActiveSocket()->getPort());
}
}
// let node kills fall through to default behavior
default:
// hand this off to the NodeList
nodeList->processNodeData(nodeSockAddr, packetData, receivedBytes);
break;
}
if (usecToSleep > 0) {
usleep(usecToSleep);
} else {
qDebug() << "Took too much time, not sleeping!\n";
}
}
nodeList->stopSilentNodeRemovalThread();
}

View file

@ -9,15 +9,18 @@
#ifndef __hifi__AvatarMixer__
#define __hifi__AvatarMixer__
#include <Assignment.h>
#include "../ThreadedAssignment.h"
/// Handles assignments of type AvatarMixer - distribution of avatar data to various clients
class AvatarMixer : public Assignment {
class AvatarMixer : public ThreadedAssignment {
public:
AvatarMixer(const unsigned char* dataBuffer, int numBytes);
public slots:
/// runs the avatar mixer
void run();
void processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr);
};
#endif /* defined(__hifi__AvatarMixer__) */