Merge pull request #10665 from birarda/feat/replicant-audio-mixer

update audio replication from hack to feature
This commit is contained in:
Stephen Birarda 2017-06-13 12:38:40 -07:00 committed by GitHub
commit 9322637d80
14 changed files with 202 additions and 57 deletions

View file

@ -97,10 +97,9 @@ AudioMixer::AudioMixer(ReceivedMessage& message) :
PacketType::ReplicatedMicrophoneAudioNoEcho,
PacketType::ReplicatedMicrophoneAudioWithEcho,
PacketType::ReplicatedInjectAudio,
PacketType::ReplicatedSilentAudioFrame,
PacketType::ReplicatedNegotiateAudioFormat
PacketType::ReplicatedSilentAudioFrame
},
this, "queueMirroredAudioPacket"
this, "queueReplicatedAudioPacket"
);
connect(nodeList.data(), &NodeList::nodeKilled, this, &AudioMixer::handleNodeKilled);
@ -120,13 +119,13 @@ void AudioMixer::queueReplicatedAudioPacket(QSharedPointer<ReceivedMessage> mess
QUuid nodeID = QUuid::fromRfc4122(message->readWithoutCopy(NUM_BYTES_RFC4122_UUID));
auto node = nodeList->addOrUpdateNode(nodeID, NodeType::Agent,
message->getSenderSockAddr(), message->getSenderSockAddr());
node->setIsUpstream(true);
node->setIsMirror(true);
node->setLastHeardMicrostamp(usecTimestampNow());
auto replicatedNode = nodeList->addOrUpdateNode(nodeID, NodeType::Agent,
message->getSenderSockAddr(), message->getSenderSockAddr(),
DEFAULT_AGENT_PERMISSIONS, true);
replicatedNode->setLastHeardMicrostamp(usecTimestampNow());
replicatedNode->setIsUpstream(true);
getOrCreateClientData(node.data())->queuePacket(message, node);
getOrCreateClientData(replicatedNode.data())->queuePacket(message, replicatedNode);
}
void AudioMixer::handleMuteEnvironmentPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer sendingNode) {
@ -532,7 +531,7 @@ void AudioMixer::clearDomainSettings() {
_zoneReverbSettings.clear();
}
void AudioMixer::parseSettingsObject(const QJsonObject &settingsObject) {
void AudioMixer::parseSettingsObject(const QJsonObject& settingsObject) {
qDebug() << "AVX2 Support:" << (cpuSupportsAVX2() ? "enabled" : "disabled");
if (settingsObject.contains(AUDIO_THREADING_GROUP_KEY)) {
@ -750,6 +749,8 @@ void AudioMixer::parseSettingsObject(const QJsonObject &settingsObject) {
}
}
}
parseDownstreamServers(settingsObject, NodeType::AudioMixer);
}
AudioMixer::Timer::Timing::Timing(uint64_t& sum) : _sum(sum) {

View file

@ -73,17 +73,20 @@ void AudioMixerClientData::processPackets() {
case PacketType::ReplicatedMicrophoneAudioWithEcho:
case PacketType::ReplicatedInjectAudio:
case PacketType::ReplicatedSilentAudioFrame: {
if (node->isUpstream() && !_hasSetupCodecForUpstreamNode) {
setupCodecForReplicatedAgent(packet);
}
QMutexLocker lock(&getMutex());
parseData(*packet);
replicatePacket(*packet);
optionallyReplicatePacket(*packet, *node);
break;
}
case PacketType::NegotiateAudioFormat:
case PacketType::ReplicatedNegotiateAudioFormat:
negotiateAudioFormat(*packet, node);
replicatePacket(*packet);
break;
case PacketType::RequestsDomainListData:
parseRequestsDomainListData(*packet);
@ -106,32 +109,71 @@ void AudioMixerClientData::processPackets() {
assert(_packetQueue.empty());
}
void AudioMixerClientData::replicatePacket(ReceivedMessage& message) {
auto nodeList = DependencyManager::get<NodeList>();
if (!nodeList->getMirrorSocket().isNull()) {
bool isReplicatedPacket(PacketType packetType) {
return packetType == PacketType::ReplicatedMicrophoneAudioNoEcho
|| packetType == PacketType::ReplicatedMicrophoneAudioWithEcho
|| packetType == PacketType::ReplicatedInjectAudio
|| packetType == PacketType::ReplicatedSilentAudioFrame;
}
void AudioMixerClientData::optionallyReplicatePacket(ReceivedMessage& message, const Node& node) {
// first, make sure that this is a packet from a node we are supposed to replicate
if (node.isReplicated()) {
auto nodeList = DependencyManager::get<NodeList>();
// now make sure it's a packet type that we want to replicate
PacketType mirroredType;
if (message.getType() == PacketType::MicrophoneAudioNoEcho) {
mirroredType = PacketType::ReplicatedMicrophoneAudioNoEcho;
} else if (message.getType() == PacketType::MicrophoneAudioWithEcho) {
mirroredType = PacketType::ReplicatedMicrophoneAudioNoEcho;
} else if (message.getType() == PacketType::InjectAudio) {
mirroredType = PacketType::ReplicatedInjectAudio;
} else if (message.getType() == PacketType::SilentAudioFrame) {
mirroredType = PacketType::ReplicatedSilentAudioFrame;
} else if (message.getType() == PacketType::NegotiateAudioFormat) {
mirroredType = PacketType::ReplicatedNegotiateAudioFormat;
} else {
return;
switch (message.getType()) {
case PacketType::MicrophoneAudioNoEcho:
case PacketType::ReplicatedMicrophoneAudioNoEcho:
mirroredType = PacketType::ReplicatedMicrophoneAudioNoEcho;
break;
case PacketType::MicrophoneAudioWithEcho:
case PacketType::ReplicatedMicrophoneAudioWithEcho:
mirroredType = PacketType::ReplicatedMicrophoneAudioWithEcho;
break;
case PacketType::InjectAudio:
case PacketType::ReplicatedInjectAudio:
mirroredType = PacketType::ReplicatedInjectAudio;
break;
case PacketType::SilentAudioFrame:
case PacketType::ReplicatedSilentAudioFrame:
mirroredType = PacketType::ReplicatedSilentAudioFrame;
break;
default:
return;
}
// construct an NLPacket to send to the replicant that has the contents of the received packet
auto packet = NLPacket::create(mirroredType, message.getSize() + NUM_BYTES_RFC4122_UUID);
packet->write(message.getSourceID().toRfc4122());
packet->write(message.getMessage());
std::unique_ptr<NLPacket> packet;
nodeList->sendPacket(std::move(packet), nodeList->getMirrorSocket());
// enumerate the downstream audio mixers and send them the replicated version of this packet
nodeList->unsafeEachNode([&](const SharedNodePointer& downstreamNode) {
if (downstreamNode->getType() == NodeType::DownstreamAudioMixer) {
// construct the packet only once, if we have any downstream audio mixers to send to
if (!packet) {
// construct an NLPacket to send to the replicant that has the contents of the received packet
packet = NLPacket::create(mirroredType);
if (!isReplicatedPacket(message.getType())) {
// since this packet will be non-sourced, we add the replicated node's ID here
packet->write(node.getUUID().toRfc4122());
// we won't negotiate an audio format with the replicant, because we aren't a listener
// so pack the codec string here so that it can statelessly setup a decoder for this string when it needs
packet->writeString(_selectedCodecName);
}
packet->write(message.getMessage());
}
nodeList->sendUnreliablePacket(*packet, downstreamNode->getPublicSocket());
}
});
}
}
void AudioMixerClientData::negotiateAudioFormat(ReceivedMessage& message, const SharedNodePointer& node) {
@ -249,7 +291,8 @@ int AudioMixerClientData::parseData(ReceivedMessage& message) {
avatarAudioStream->setupCodec(_codec, _selectedCodecName, AudioConstants::MONO);
qDebug() << "creating new AvatarAudioStream... codec:" << _selectedCodecName;
connect(avatarAudioStream, &InboundAudioStream::mismatchedAudioCodec, this, &AudioMixerClientData::handleMismatchAudioFormat);
connect(avatarAudioStream, &InboundAudioStream::mismatchedAudioCodec,
this, &AudioMixerClientData::handleMismatchAudioFormat);
auto emplaced = _audioStreams.emplace(
QUuid(),
@ -307,7 +350,12 @@ int AudioMixerClientData::parseData(ReceivedMessage& message) {
|| packetType == PacketType::ReplicatedMicrophoneAudioNoEcho
|| packetType == PacketType::ReplicatedSilentAudioFrame
|| packetType == PacketType::ReplicatedInjectAudio) {
// skip past source ID for the replicated packet
message.seek(NUM_BYTES_RFC4122_UUID);
// skip past the codec string
message.readString();
}
// check the overflow count before we parse data
@ -656,3 +704,18 @@ bool AudioMixerClientData::shouldIgnore(const SharedNodePointer self, const Shar
return shouldIgnore;
}
void AudioMixerClientData::setupCodecForReplicatedAgent(QSharedPointer<ReceivedMessage> message) {
// first pull the codec string from the packet
// read the string for the codec
auto codecString = message->readString();
qDebug() << "Manually setting codec for replicated agent" << uuidStringWithoutCurlyBraces(getNodeID())
<< "-" << codecString;
const std::pair<QString, CodecPluginPointer> codec = AudioMixer::negotiateCodec({ codecString });
setupCodec(codec.second, codec.first);
_hasSetupCodecForUpstreamNode = true;
}

View file

@ -108,6 +108,8 @@ public:
bool getRequestsDomainListData() { return _requestsDomainListData; }
void setRequestsDomainListData(bool requesting) { _requestsDomainListData = requesting; }
void setupCodecForReplicatedAgent(QSharedPointer<ReceivedMessage> message);
signals:
void injectorStreamFinished(const QUuid& streamIdentifier);
@ -124,7 +126,7 @@ private:
QReadWriteLock _streamsLock;
AudioStreamMap _audioStreams; // microphone stream from avatar is stored under key of null UUID
void replicatePacket(ReceivedMessage& packet);
void optionallyReplicatePacket(ReceivedMessage& packet, const Node& node);
using IgnoreZone = AABox;
class IgnoreZoneMemo {
@ -183,6 +185,8 @@ private:
bool _shouldMuteClient { false };
bool _requestsDomainListData { false };
bool _hasSetupCodecForUpstreamNode { false };
};
#endif // hifi_AudioMixerClientData_h

View file

@ -147,7 +147,8 @@ int InboundAudioStream::parseData(ReceivedMessage& message) {
}
case SequenceNumberStats::OnTime: {
// Packet is on time; parse its data to the ringbuffer
if (message.getType() == PacketType::SilentAudioFrame) {
if (message.getType() == PacketType::SilentAudioFrame
|| message.getType() == PacketType::ReplicatedSilentAudioFrame) {
// If we recieved a SilentAudioFrame from our sender, we might want to drop
// some of the samples in order to catch up to our desired jitter buffer size.
writeDroppableSilentFrames(networkFrames);
@ -168,7 +169,10 @@ int InboundAudioStream::parseData(ReceivedMessage& message) {
// inform others of the mismatch
auto sendingNode = DependencyManager::get<NodeList>()->nodeWithUUID(message.getSourceID());
emit mismatchedAudioCodec(sendingNode, _selectedCodecName, codecInPacket);
if (sendingNode) {
emit mismatchedAudioCodec(sendingNode, _selectedCodecName, codecInPacket);
}
}
}
break;

View file

@ -588,6 +588,9 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t
// we didn't have this node, so add them
Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket, permissions, isReplicated, connectionSecret);
// move the newly constructed node to the LNL thread
newNode->moveToThread(thread());
if (nodeType == NodeType::AudioMixer) {
LimitedNodeList::flagTimeForConnectionStep(LimitedNodeList::AddedAudioMixer);
}
@ -744,7 +747,8 @@ void LimitedNodeList::removeSilentNodes() {
SharedNodePointer node = it->second;
node->getMutex().lock();
if ((usecTimestampNow() - node->getLastHeardMicrostamp()) > (NODE_SILENCE_THRESHOLD_MSECS * USECS_PER_MSEC)) {
if (!NodeType::isDownstream(node->getType())
&& (usecTimestampNow() - node->getLastHeardMicrostamp()) > (NODE_SILENCE_THRESHOLD_MSECS * USECS_PER_MSEC)) {
// call the NodeHash erase to get rid of this node
it = _nodeHash.unsafe_erase(it);

View file

@ -257,6 +257,16 @@ public:
return SharedNodePointer();
}
// This is unsafe because it does not take a lock
// Must only be called when you know that a read lock on the node mutex is held
// and will be held for the duration of your iteration
template<typename NodeLambda>
void unsafeEachNode(NodeLambda functor) {
for (NodeHash::const_iterator it = _nodeHash.cbegin(); it != _nodeHash.cend(); ++it) {
functor(it->second);
}
}
void putLocalPortIntoSharedMemory(const QString key, QObject* parent, quint16 localPort);
bool getLocalServerPortFromSharedMemory(const QString key, quint16& localPort);

View file

@ -42,6 +42,8 @@ void NodeType::init() {
TypeNameHash.insert(NodeType::MessagesMixer, "Messages Mixer");
TypeNameHash.insert(NodeType::AssetServer, "Asset Server");
TypeNameHash.insert(NodeType::EntityScriptServer, "Entity Script Server");
TypeNameHash.insert(NodeType::DownstreamAudioMixer, "Downstream Audio Mixer");
TypeNameHash.insert(NodeType::DownstreamAvatarMixer, "Downstream Avatar Mixer");
TypeNameHash.insert(NodeType::Unassigned, "Unassigned");
}
@ -50,6 +52,21 @@ const QString& NodeType::getNodeTypeName(NodeType_t nodeType) {
return matchedTypeName != TypeNameHash.end() ? matchedTypeName.value() : UNKNOWN_NodeType_t_NAME;
}
bool NodeType::isDownstream(NodeType_t nodeType) {
return nodeType == NodeType::DownstreamAudioMixer || nodeType == NodeType::DownstreamAvatarMixer;
}
NodeType_t NodeType::downstreamType(NodeType_t primaryType) {
switch (primaryType) {
case AudioMixer:
return DownstreamAudioMixer;
case AvatarMixer:
return DownstreamAvatarMixer;
default:
return Unassigned;
}
}
Node::Node(const QUuid& uuid, NodeType_t type, const HifiSockAddr& publicSocket,
const HifiSockAddr& localSocket, const NodePermissions& permissions, bool isReplicated,
const QUuid& connectionSecret, QObject* parent) :

View file

@ -37,6 +37,7 @@
class Node : public NetworkPeer {
Q_OBJECT
public:
Node(const QUuid& uuid, NodeType_t type,
const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket,
const NodePermissions& permissions, bool isReplicated, const QUuid& connectionSecret = QUuid(),
@ -86,9 +87,6 @@ public:
bool isIgnoreRadiusEnabled() const { return _ignoreRadiusEnabled; }
bool isMirror() const { return _isMirror; }
void setIsMirror(bool isMirror) { _isMirror = isMirror; }
private:
// privatize copy and assignment operator to disallow Node copying
Node(const Node &otherNode);
@ -109,8 +107,6 @@ private:
mutable QReadWriteLock _ignoredNodeIDSetLock;
std::atomic_bool _ignoreRadiusEnabled;
bool _isMirror { false };
};
Q_DECLARE_METATYPE(Node*)

View file

@ -712,14 +712,20 @@ void NodeList::pingPunchForInactiveNode(const SharedNodePointer& node) {
}
void NodeList::startNodeHolePunch(const SharedNodePointer& node) {
// connect to the correct signal on this node so we know when to ping it
connect(node.data(), &Node::pingTimerTimeout, this, &NodeList::handleNodePingTimeout);
// start the ping timer for this node
node->startPingTimer();
// we don't hole punch to downstream servers, since it is assumed that we have a direct line to them
// we also don't hole punch to relayed upstream nodes, since we do not communicate directly with them
// ping this node immediately
pingPunchForInactiveNode(node);
if (!NodeType::isDownstream(node->getType()) && !node->isUpstream()) {
// connect to the correct signal on this node so we know when to ping it
connect(node.data(), &Node::pingTimerTimeout, this, &NodeList::handleNodePingTimeout);
// start the ping timer for this node
node->startPingTimer();
// ping this node immediately
pingPunchForInactiveNode(node);
}
}
void NodeList::handleNodePingTimeout() {
@ -762,8 +768,10 @@ void NodeList::stopKeepalivePingTimer() {
}
void NodeList::sendKeepAlivePings() {
// send keep-alive ping packets to nodes of types we care about that are not relayed to us from an upstream node
eachMatchingNode([this](const SharedNodePointer& node)->bool {
return _nodeTypesOfInterest.contains(node->getType());
return !node->isUpstream() && _nodeTypesOfInterest.contains(node->getType());
}, [&](const SharedNodePointer& node) {
sendPacket(constructPingPacket(), *node);
});
@ -1121,4 +1129,4 @@ void NodeList::setRequestsDomainListData(bool isRequesting) {
void NodeList::startThread() {
moveToNewNamedThread(this, "NodeList Thread", QThread::TimeCriticalPriority);
}
}

View file

@ -25,12 +25,15 @@ namespace NodeType {
const NodeType_t AssetServer = 'A';
const NodeType_t MessagesMixer = 'm';
const NodeType_t EntityScriptServer = 'S';
const NodeType_t ReplicantAudioMixer = 'a';
const NodeType_t ReplicantAvatarMixer = 'w';
const NodeType_t DownstreamAudioMixer = 'a';
const NodeType_t DownstreamAvatarMixer = 'w';
const NodeType_t Unassigned = 1;
void init();
const QString& getNodeTypeName(NodeType_t nodeType);
bool isDownstream(NodeType_t nodeType);
NodeType_t downstreamType(NodeType_t primaryType);
}
typedef QSet<NodeType_t> NodeSet;

View file

@ -10,6 +10,7 @@
//
#include <QtCore/QCoreApplication>
#include <QtCore/QJsonArray>
#include <QtCore/QJsonObject>
#include <QtCore/QThread>
#include <QtCore/QTimer>
@ -132,3 +133,37 @@ void ThreadedAssignment::domainSettingsRequestFailed() {
qCDebug(networking) << "Failed to retreive settings object from domain-server. Bailing on assignment.";
setFinished(true);
}
void ThreadedAssignment::parseDownstreamServers(const QJsonObject& settingsObject, NodeType_t nodeType) {
static const QString REPLICATION_GROUP_KEY = "replication";
static const QString DOWNSTREAM_SERVERS_SETTING_KEY = "downstream_servers";
if (settingsObject.contains(REPLICATION_GROUP_KEY)) {
const QJsonObject replicationObject = settingsObject[REPLICATION_GROUP_KEY].toObject();
const QJsonArray downstreamServers = replicationObject[DOWNSTREAM_SERVERS_SETTING_KEY].toArray();
auto nodeList = DependencyManager::get<NodeList>();
foreach(const QJsonValue& downstreamServerValue, downstreamServers) {
const QJsonObject downstreamServer = downstreamServerValue.toObject();
static const QString DOWNSTREAM_SERVER_ADDRESS = "address";
static const QString DOWNSTREAM_SERVER_PORT = "port";
static const QString DOWNSTREAM_SERVER_TYPE = "server_type";
// make sure we have the settings we need for this downstream server
if (downstreamServer.contains(DOWNSTREAM_SERVER_ADDRESS) && downstreamServer.contains(DOWNSTREAM_SERVER_PORT)
&& downstreamServer[DOWNSTREAM_SERVER_TYPE].toString() == NodeType::getNodeTypeName(nodeType)) {
// read the address and port and construct a HifiSockAddr from them
HifiSockAddr downstreamServerAddr {
downstreamServer[DOWNSTREAM_SERVER_ADDRESS].toString(),
(quint16) downstreamServer[DOWNSTREAM_SERVER_PORT].toString().toInt()
};
// manually add the downstream node to our node list
nodeList->addOrUpdateNode(QUuid::createUuid(), NodeType::downstreamType(nodeType),
downstreamServerAddr, downstreamServerAddr);
}
}
}
}

View file

@ -40,6 +40,8 @@ signals:
protected:
void commonInit(const QString& targetName, NodeType_t nodeType);
void parseDownstreamServers(const QJsonObject& settingsObject, NodeType_t nodeType);
bool _isFinished;
QTimer _domainServerTimer;
QTimer _statsTimer;

View file

@ -41,8 +41,7 @@ const QSet<PacketType> NON_SOURCED_PACKETS = QSet<PacketType>()
<< PacketType::ICEServerHeartbeatDenied << PacketType::AssignmentClientStatus << PacketType::StopNode
<< PacketType::DomainServerRemovedNode << PacketType::UsernameFromIDReply << PacketType::OctreeFileReplacement
<< PacketType::ReplicatedMicrophoneAudioNoEcho << PacketType::ReplicatedMicrophoneAudioWithEcho
<< PacketType::ReplicatedInjectAudio << PacketType::ReplicatedSilentAudioFrame
<< PacketType::ReplicatedNegotiateAudioFormat;
<< PacketType::ReplicatedInjectAudio << PacketType::ReplicatedSilentAudioFrame;
PacketVersion versionForPacketType(PacketType packetType) {
switch (packetType) {

View file

@ -115,7 +115,6 @@ public:
AdjustAvatarSorting,
OctreeFileReplacement,
CollisionEventChanges,
ReplicatedNegotiateAudioFormat,
ReplicatedMicrophoneAudioNoEcho,
ReplicatedMicrophoneAudioWithEcho,
ReplicatedInjectAudio,