Merge pull request #6405 from ZappoMan/messaging

Messaging Improvements
This commit is contained in:
Brad Davis 2015-11-17 16:41:35 -08:00
commit d87b8a749f
7 changed files with 53 additions and 56 deletions

View file

@ -37,7 +37,9 @@ MessagesMixer::~MessagesMixer() {
}
void MessagesMixer::nodeKilled(SharedNodePointer killedNode) {
// FIXME - remove the node from the subscription maps
for (auto& channel : _channelSubscribers) {
channel.remove(killedNode->getUUID());
}
}
void MessagesMixer::handleMessages(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode) {

View file

@ -12,32 +12,28 @@ Script.update.connect(function (deltaTime) {
subscribedForTime += deltaTime;
}
if (totalTime > 5) {
// if we've been unsubscribed for SWITCH_SUBSCRIPTION_TIME seconds, subscribe
if (!subscribed && unsubscribedForTime > SWITCH_SUBSCRIPTION_TIME) {
print("---- subscribing ----");
Messages.subscribe(channel);
subscribed = true;
subscribedForTime = 0;
}
// if we've been subscribed for SWITCH_SUBSCRIPTION_TIME seconds, unsubscribe
if (subscribed && subscribedForTime > SWITCH_SUBSCRIPTION_TIME) {
print("---- unsubscribing ----");
Messages.unsubscribe(channel);
subscribed = false;
unsubscribedForTime = 0;
}
// Even if not subscribed, still publish
var message = "update() deltaTime:" + deltaTime;
//print(message);
Messages.sendMessage(channel, message);
// if we've been unsubscribed for SWITCH_SUBSCRIPTION_TIME seconds, subscribe
if (!subscribed && unsubscribedForTime > SWITCH_SUBSCRIPTION_TIME) {
print("---- subscribing ----");
Messages.subscribe(channel);
subscribed = true;
subscribedForTime = 0;
}
// if we've been subscribed for SWITCH_SUBSCRIPTION_TIME seconds, unsubscribe
if (subscribed && subscribedForTime > SWITCH_SUBSCRIPTION_TIME) {
print("---- unsubscribing ----");
Messages.unsubscribe(channel);
subscribed = false;
unsubscribedForTime = 0;
}
// Even if not subscribed, still publish
var message = "update() deltaTime:" + deltaTime;
Messages.sendMessage(channel, message);
});
Messages.messageReceived.connect(function (channel, message) {
print("message received on channel:" + channel + ", message:" + message);
Messages.messageReceived.connect(function (channel, message, senderID) {
print("message received on channel:" + channel + ", message:" + message + ", senderID:" + senderID);
});

View file

@ -1,22 +1,7 @@
var totalTime = 0;
var subscribed = false;
var WAIT_FOR_SUBSCRIPTION_TIME = 10;
function myUpdate(deltaTime) {
var channel = "example";
totalTime += deltaTime;
print("---- subscribing ----");
Messages.subscribe("example");
if (totalTime > WAIT_FOR_SUBSCRIPTION_TIME && !subscribed) {
print("---- subscribing ----");
Messages.subscribe(channel);
subscribed = true;
Script.update.disconnect(myUpdate);
}
}
Script.update.connect(myUpdate);
Messages.messageReceived.connect(function (channel, message) {
print("message received on channel:" + channel + ", message:" + message);
Messages.messageReceived.connect(function (channel, message, senderID) {
print("message received on channel:" + channel + ", message:" + message + ", senderID:" + senderID);
});

View file

@ -530,11 +530,21 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t
qCDebug(networking) << "Added" << *newNode;
emit nodeAdded(newNodePointer);
if (newNodePointer->getActiveSocket()) {
emit nodeActivated(newNodePointer);
} else {
connect(newNodePointer.data(), &NetworkPeer::socketActivated, this, [=] {
emit nodeActivated(newNodePointer);
disconnect(newNodePointer.data(), &NetworkPeer::socketActivated, this, 0);
});
}
return newNodePointer;
}
}
std::unique_ptr<NLPacket> LimitedNodeList::constructPingPacket(PingType_t pingType) {
int packetSize = sizeof(PingType_t) + sizeof(quint64);

View file

@ -239,6 +239,7 @@ signals:
void uuidChanged(const QUuid& ownerUUID, const QUuid& oldUUID);
void nodeAdded(SharedNodePointer);
void nodeKilled(SharedNodePointer);
void nodeActivated(SharedNodePointer);
void localSockAddrChanged(const HifiSockAddr& localSockAddr);
void publicSockAddrChanged(const HifiSockAddr& publicSockAddr);

View file

@ -27,7 +27,7 @@ MessagesClient::MessagesClient() {
auto nodeList = DependencyManager::get<NodeList>();
auto& packetReceiver = nodeList->getPacketReceiver();
packetReceiver.registerMessageListener(PacketType::MessagesData, this, "handleMessagesPacket");
connect(nodeList.data(), &LimitedNodeList::nodeKilled, this, &MessagesClient::handleNodeKilled);
connect(nodeList.data(), &LimitedNodeList::nodeActivated, this, &MessagesClient::handleNodeActivated);
}
void MessagesClient::init() {
@ -51,7 +51,7 @@ void MessagesClient::handleMessagesPacket(QSharedPointer<NLPacketList> packetLis
auto messageData = packet.read(messageLength);
QString message = QString::fromUtf8(messageData);
emit messageReceived(channel, message);
emit messageReceived(channel, message, senderNode->getUUID());
}
void MessagesClient::sendMessage(const QString& channel, const QString& message) {
@ -75,10 +75,8 @@ void MessagesClient::sendMessage(const QString& channel, const QString& message)
}
}
// FIXME - we should keep track of the channels we are subscribed to locally, and
// in the event that they mixer goes away and/or comes back we should automatically
// resubscribe to those channels
void MessagesClient::subscribe(const QString& channel) {
_subscribedChannels << channel;
auto nodeList = DependencyManager::get<NodeList>();
SharedNodePointer messagesMixer = nodeList->soloNodeOfType(NodeType::MessagesMixer);
@ -90,6 +88,7 @@ void MessagesClient::subscribe(const QString& channel) {
}
void MessagesClient::unsubscribe(const QString& channel) {
_subscribedChannels.remove(channel);
auto nodeList = DependencyManager::get<NodeList>();
SharedNodePointer messagesMixer = nodeList->soloNodeOfType(NodeType::MessagesMixer);
@ -100,9 +99,10 @@ void MessagesClient::unsubscribe(const QString& channel) {
}
}
void MessagesClient::handleNodeKilled(SharedNodePointer node) {
if (node->getType() != NodeType::MessagesMixer) {
return;
void MessagesClient::handleNodeActivated(SharedNodePointer node) {
if (node->getType() == NodeType::MessagesMixer) {
for (const auto& channel : _subscribedChannels) {
subscribe(channel);
}
}
// FIXME - do we need to do any special bookkeeping for when the messages mixer is no longer available
}
}

View file

@ -33,11 +33,14 @@ public:
Q_INVOKABLE void unsubscribe(const QString& channel);
signals:
void messageReceived(const QString& channel, const QString& message);
void messageReceived(const QString& channel, const QString& message, const QUuid& senderUUID);
private slots:
void handleMessagesPacket(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode);
void handleNodeKilled(SharedNodePointer node);
void handleNodeActivated(SharedNodePointer node);
protected:
QSet<QString> _subscribedChannels;
};
#endif