From 8112b3b57e57d95558caa2a77fba8fa76ed78241 Mon Sep 17 00:00:00 2001 From: Brad Hefta-Gaub Date: Tue, 17 Nov 2015 14:16:22 -0800 Subject: [PATCH 1/7] add senderUUID to the messageReceived signal --- examples/example/messagesExample.js | 4 ++-- examples/example/messagesReceiverExample.js | 5 ++--- libraries/networking/src/MessagesClient.cpp | 2 +- libraries/networking/src/MessagesClient.h | 2 +- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/examples/example/messagesExample.js b/examples/example/messagesExample.js index 11827f019f..f5d8dea2d3 100644 --- a/examples/example/messagesExample.js +++ b/examples/example/messagesExample.js @@ -38,6 +38,6 @@ Script.update.connect(function (deltaTime) { }); -Messages.messageReceived.connect(function (channel, message) { - print("message received on channel:" + channel + ", message:" + message); +Messages.messageReceived.connect(function (channel, message, senderID) { + print("message received on channel:" + channel + ", message:" + message + ", senderID:" + senderID); }); \ No newline at end of file diff --git a/examples/example/messagesReceiverExample.js b/examples/example/messagesReceiverExample.js index 31020a4c8a..caab270783 100644 --- a/examples/example/messagesReceiverExample.js +++ b/examples/example/messagesReceiverExample.js @@ -16,7 +16,6 @@ function myUpdate(deltaTime) { Script.update.connect(myUpdate); -Messages.messageReceived.connect(function (channel, message) { - print("message received on channel:" + channel + ", message:" + message); +Messages.messageReceived.connect(function (channel, message, senderID) { + print("message received on channel:" + channel + ", message:" + message + ", senderID:" + senderID); }); - diff --git a/libraries/networking/src/MessagesClient.cpp b/libraries/networking/src/MessagesClient.cpp index ac2bf55033..81e9c3abdf 100644 --- a/libraries/networking/src/MessagesClient.cpp +++ b/libraries/networking/src/MessagesClient.cpp @@ -51,7 +51,7 @@ void MessagesClient::handleMessagesPacket(QSharedPointer packetLis auto messageData = packet.read(messageLength); QString message = QString::fromUtf8(messageData); - emit messageReceived(channel, message); + emit messageReceived(channel, message, senderNode->getUUID()); } void MessagesClient::sendMessage(const QString& channel, const QString& message) { diff --git a/libraries/networking/src/MessagesClient.h b/libraries/networking/src/MessagesClient.h index 13e908e129..4eb63c3e74 100644 --- a/libraries/networking/src/MessagesClient.h +++ b/libraries/networking/src/MessagesClient.h @@ -33,7 +33,7 @@ public: Q_INVOKABLE void unsubscribe(const QString& channel); signals: - void messageReceived(const QString& channel, const QString& message); + void messageReceived(const QString& channel, const QString& message, const QUuid& senderUUID); private slots: void handleMessagesPacket(QSharedPointer packetList, SharedNodePointer senderNode); From 6dfcc53c27c175e21f03863a7ffa783562072573 Mon Sep 17 00:00:00 2001 From: Brad Hefta-Gaub Date: Tue, 17 Nov 2015 14:19:13 -0800 Subject: [PATCH 2/7] properly handle removing subscribers from channels when the subscriber node disconnects --- assignment-client/src/messages/MessagesMixer.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/assignment-client/src/messages/MessagesMixer.cpp b/assignment-client/src/messages/MessagesMixer.cpp index 21e3fdc4c5..99798b2d4f 100644 --- a/assignment-client/src/messages/MessagesMixer.cpp +++ b/assignment-client/src/messages/MessagesMixer.cpp @@ -37,7 +37,9 @@ MessagesMixer::~MessagesMixer() { } void MessagesMixer::nodeKilled(SharedNodePointer killedNode) { - // FIXME - remove the node from the subscription maps + for (auto& channel : _channelSubscribers) { + channel.remove(killedNode->getUUID()); + } } void MessagesMixer::handleMessages(QSharedPointer packetList, SharedNodePointer senderNode) { From 85aa3b3f83d9412aaf4dc7dffb07fdfff789e7fa Mon Sep 17 00:00:00 2001 From: Brad Hefta-Gaub Date: Tue, 17 Nov 2015 14:28:51 -0800 Subject: [PATCH 3/7] handle subscribe when messages mixer is not available --- examples/example/messagesExample.js | 40 ++++++++++----------- examples/example/messagesReceiverExample.js | 20 ++--------- libraries/networking/src/MessagesClient.cpp | 16 ++++----- libraries/networking/src/MessagesClient.h | 5 ++- 4 files changed, 33 insertions(+), 48 deletions(-) diff --git a/examples/example/messagesExample.js b/examples/example/messagesExample.js index f5d8dea2d3..390549a135 100644 --- a/examples/example/messagesExample.js +++ b/examples/example/messagesExample.js @@ -12,29 +12,25 @@ Script.update.connect(function (deltaTime) { subscribedForTime += deltaTime; } - if (totalTime > 5) { - - // if we've been unsubscribed for SWITCH_SUBSCRIPTION_TIME seconds, subscribe - if (!subscribed && unsubscribedForTime > SWITCH_SUBSCRIPTION_TIME) { - print("---- subscribing ----"); - Messages.subscribe(channel); - subscribed = true; - subscribedForTime = 0; - } - - // if we've been subscribed for SWITCH_SUBSCRIPTION_TIME seconds, unsubscribe - if (subscribed && subscribedForTime > SWITCH_SUBSCRIPTION_TIME) { - print("---- unsubscribing ----"); - Messages.unsubscribe(channel); - subscribed = false; - unsubscribedForTime = 0; - } - - // Even if not subscribed, still publish - var message = "update() deltaTime:" + deltaTime; - //print(message); - Messages.sendMessage(channel, message); + // if we've been unsubscribed for SWITCH_SUBSCRIPTION_TIME seconds, subscribe + if (!subscribed && unsubscribedForTime > SWITCH_SUBSCRIPTION_TIME) { + print("---- subscribing ----"); + Messages.subscribe(channel); + subscribed = true; + subscribedForTime = 0; } + + // if we've been subscribed for SWITCH_SUBSCRIPTION_TIME seconds, unsubscribe + if (subscribed && subscribedForTime > SWITCH_SUBSCRIPTION_TIME) { + print("---- unsubscribing ----"); + Messages.unsubscribe(channel); + subscribed = false; + unsubscribedForTime = 0; + } + + // Even if not subscribed, still publish + var message = "update() deltaTime:" + deltaTime; + Messages.sendMessage(channel, message); }); diff --git a/examples/example/messagesReceiverExample.js b/examples/example/messagesReceiverExample.js index caab270783..2239ab1fc3 100644 --- a/examples/example/messagesReceiverExample.js +++ b/examples/example/messagesReceiverExample.js @@ -1,21 +1,7 @@ -var totalTime = 0; -var subscribed = false; -var WAIT_FOR_SUBSCRIPTION_TIME = 10; -function myUpdate(deltaTime) { - var channel = "example"; - totalTime += deltaTime; - - if (totalTime > WAIT_FOR_SUBSCRIPTION_TIME && !subscribed) { - - print("---- subscribing ----"); - Messages.subscribe(channel); - subscribed = true; - Script.update.disconnect(myUpdate); - } -} - -Script.update.connect(myUpdate); +print("---- subscribing ----"); +Messages.subscribe(channel); Messages.messageReceived.connect(function (channel, message, senderID) { print("message received on channel:" + channel + ", message:" + message + ", senderID:" + senderID); }); + diff --git a/libraries/networking/src/MessagesClient.cpp b/libraries/networking/src/MessagesClient.cpp index 81e9c3abdf..88d64adcdd 100644 --- a/libraries/networking/src/MessagesClient.cpp +++ b/libraries/networking/src/MessagesClient.cpp @@ -27,7 +27,7 @@ MessagesClient::MessagesClient() { auto nodeList = DependencyManager::get(); auto& packetReceiver = nodeList->getPacketReceiver(); packetReceiver.registerMessageListener(PacketType::MessagesData, this, "handleMessagesPacket"); - connect(nodeList.data(), &LimitedNodeList::nodeKilled, this, &MessagesClient::handleNodeKilled); + connect(nodeList.data(), &LimitedNodeList::nodeAdded, this, &MessagesClient::handleNodeAdded); } void MessagesClient::init() { @@ -75,10 +75,8 @@ void MessagesClient::sendMessage(const QString& channel, const QString& message) } } -// FIXME - we should keep track of the channels we are subscribed to locally, and -// in the event that they mixer goes away and/or comes back we should automatically -// resubscribe to those channels void MessagesClient::subscribe(const QString& channel) { + _subscribedChannels << channel; auto nodeList = DependencyManager::get(); SharedNodePointer messagesMixer = nodeList->soloNodeOfType(NodeType::MessagesMixer); @@ -90,6 +88,7 @@ void MessagesClient::subscribe(const QString& channel) { } void MessagesClient::unsubscribe(const QString& channel) { + _subscribedChannels.remove(channel); auto nodeList = DependencyManager::get(); SharedNodePointer messagesMixer = nodeList->soloNodeOfType(NodeType::MessagesMixer); @@ -100,9 +99,10 @@ void MessagesClient::unsubscribe(const QString& channel) { } } -void MessagesClient::handleNodeKilled(SharedNodePointer node) { - if (node->getType() != NodeType::MessagesMixer) { - return; +void MessagesClient::handleNodeAdded(SharedNodePointer node) { + if (node->getType() == NodeType::MessagesMixer) { + for (const auto& channel : _subscribedChannels) { + subscribe(channel); + } } - // FIXME - do we need to do any special bookkeeping for when the messages mixer is no longer available } diff --git a/libraries/networking/src/MessagesClient.h b/libraries/networking/src/MessagesClient.h index 4eb63c3e74..695e7e789e 100644 --- a/libraries/networking/src/MessagesClient.h +++ b/libraries/networking/src/MessagesClient.h @@ -37,7 +37,10 @@ signals: private slots: void handleMessagesPacket(QSharedPointer packetList, SharedNodePointer senderNode); - void handleNodeKilled(SharedNodePointer node); + void handleNodeAdded(SharedNodePointer node); + +protected: + QSet _subscribedChannels; }; #endif From d8a3927311ba83a9dab81f0ccc6bfec6418f300c Mon Sep 17 00:00:00 2001 From: Brad Hefta-Gaub Date: Tue, 17 Nov 2015 14:43:06 -0800 Subject: [PATCH 4/7] debug the late connect case --- examples/example/messagesReceiverExample.js | 2 +- libraries/networking/src/MessagesClient.cpp | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/example/messagesReceiverExample.js b/examples/example/messagesReceiverExample.js index 2239ab1fc3..b9f9e54bca 100644 --- a/examples/example/messagesReceiverExample.js +++ b/examples/example/messagesReceiverExample.js @@ -1,5 +1,5 @@ print("---- subscribing ----"); -Messages.subscribe(channel); +Messages.subscribe("example"); Messages.messageReceived.connect(function (channel, message, senderID) { print("message received on channel:" + channel + ", message:" + message + ", senderID:" + senderID); diff --git a/libraries/networking/src/MessagesClient.cpp b/libraries/networking/src/MessagesClient.cpp index 88d64adcdd..6c6ccbf25c 100644 --- a/libraries/networking/src/MessagesClient.cpp +++ b/libraries/networking/src/MessagesClient.cpp @@ -101,7 +101,9 @@ void MessagesClient::unsubscribe(const QString& channel) { void MessagesClient::handleNodeAdded(SharedNodePointer node) { if (node->getType() == NodeType::MessagesMixer) { + qDebug() << "messages-mixer node type added..."; for (const auto& channel : _subscribedChannels) { + qDebug() << "subscribing to channel:" << channel; subscribe(channel); } } From 3efbcb7062eab473b1a3889168dc74a3e14f0d4c Mon Sep 17 00:00:00 2001 From: Brad Hefta-Gaub Date: Tue, 17 Nov 2015 14:47:59 -0800 Subject: [PATCH 5/7] debug the late connect case --- libraries/networking/src/MessagesClient.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/libraries/networking/src/MessagesClient.cpp b/libraries/networking/src/MessagesClient.cpp index 6c6ccbf25c..51cfba263c 100644 --- a/libraries/networking/src/MessagesClient.cpp +++ b/libraries/networking/src/MessagesClient.cpp @@ -84,6 +84,7 @@ void MessagesClient::subscribe(const QString& channel) { auto packetList = NLPacketList::create(PacketType::MessagesSubscribe, QByteArray(), true, true); packetList->write(channel.toUtf8()); nodeList->sendPacketList(std::move(packetList), *messagesMixer); + qDebug() << "sending MessagesSubscribe for channel:" << channel; } } From 073215d067134f360b43f5d3c453554f5ce4b458 Mon Sep 17 00:00:00 2001 From: Brad Hefta-Gaub Date: Tue, 17 Nov 2015 15:55:41 -0800 Subject: [PATCH 6/7] handle socketActivated --- libraries/networking/src/MessagesClient.cpp | 19 ++++++++++++++----- libraries/networking/src/MessagesClient.h | 2 ++ 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/libraries/networking/src/MessagesClient.cpp b/libraries/networking/src/MessagesClient.cpp index 51cfba263c..f17ea4555a 100644 --- a/libraries/networking/src/MessagesClient.cpp +++ b/libraries/networking/src/MessagesClient.cpp @@ -84,7 +84,6 @@ void MessagesClient::subscribe(const QString& channel) { auto packetList = NLPacketList::create(PacketType::MessagesSubscribe, QByteArray(), true, true); packetList->write(channel.toUtf8()); nodeList->sendPacketList(std::move(packetList), *messagesMixer); - qDebug() << "sending MessagesSubscribe for channel:" << channel; } } @@ -102,10 +101,20 @@ void MessagesClient::unsubscribe(const QString& channel) { void MessagesClient::handleNodeAdded(SharedNodePointer node) { if (node->getType() == NodeType::MessagesMixer) { - qDebug() << "messages-mixer node type added..."; - for (const auto& channel : _subscribedChannels) { - qDebug() << "subscribing to channel:" << channel; - subscribe(channel); + if (!node->getActiveSocket()) { + connect(node.data(), &NetworkPeer::socketActivated, this, &MessagesClient::socketActivated); + } else { + resubscribeToAll(); } } } + +void MessagesClient::socketActivated(const HifiSockAddr& sockAddr) { + resubscribeToAll(); +} + +void MessagesClient::resubscribeToAll() { + for (const auto& channel : _subscribedChannels) { + subscribe(channel); + } +} diff --git a/libraries/networking/src/MessagesClient.h b/libraries/networking/src/MessagesClient.h index 695e7e789e..b5f590bc0d 100644 --- a/libraries/networking/src/MessagesClient.h +++ b/libraries/networking/src/MessagesClient.h @@ -38,8 +38,10 @@ signals: private slots: void handleMessagesPacket(QSharedPointer packetList, SharedNodePointer senderNode); void handleNodeAdded(SharedNodePointer node); + void socketActivated(const HifiSockAddr& sockAddr); protected: + void resubscribeToAll(); QSet _subscribedChannels; }; From 293914b84f16b0e12c7495e10d532a22ff5f5704 Mon Sep 17 00:00:00 2001 From: Brad Hefta-Gaub Date: Tue, 17 Nov 2015 16:31:34 -0800 Subject: [PATCH 7/7] added NodeActivated signal to make it easier for users to know when a recently added node has an active socket --- libraries/networking/src/LimitedNodeList.cpp | 10 +++++++++ libraries/networking/src/LimitedNodeList.h | 1 + libraries/networking/src/MessagesClient.cpp | 22 +++++--------------- libraries/networking/src/MessagesClient.h | 4 +--- 4 files changed, 17 insertions(+), 20 deletions(-) diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index 333cdb99f0..4a7844ecc7 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -530,11 +530,21 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t qCDebug(networking) << "Added" << *newNode; emit nodeAdded(newNodePointer); + if (newNodePointer->getActiveSocket()) { + emit nodeActivated(newNodePointer); + } else { + connect(newNodePointer.data(), &NetworkPeer::socketActivated, this, [=] { + emit nodeActivated(newNodePointer); + disconnect(newNodePointer.data(), &NetworkPeer::socketActivated, this, 0); + }); + } return newNodePointer; } } + + std::unique_ptr LimitedNodeList::constructPingPacket(PingType_t pingType) { int packetSize = sizeof(PingType_t) + sizeof(quint64); diff --git a/libraries/networking/src/LimitedNodeList.h b/libraries/networking/src/LimitedNodeList.h index 1aacd27572..26e648421a 100644 --- a/libraries/networking/src/LimitedNodeList.h +++ b/libraries/networking/src/LimitedNodeList.h @@ -239,6 +239,7 @@ signals: void uuidChanged(const QUuid& ownerUUID, const QUuid& oldUUID); void nodeAdded(SharedNodePointer); void nodeKilled(SharedNodePointer); + void nodeActivated(SharedNodePointer); void localSockAddrChanged(const HifiSockAddr& localSockAddr); void publicSockAddrChanged(const HifiSockAddr& publicSockAddr); diff --git a/libraries/networking/src/MessagesClient.cpp b/libraries/networking/src/MessagesClient.cpp index f17ea4555a..2e8b3bbb09 100644 --- a/libraries/networking/src/MessagesClient.cpp +++ b/libraries/networking/src/MessagesClient.cpp @@ -27,7 +27,7 @@ MessagesClient::MessagesClient() { auto nodeList = DependencyManager::get(); auto& packetReceiver = nodeList->getPacketReceiver(); packetReceiver.registerMessageListener(PacketType::MessagesData, this, "handleMessagesPacket"); - connect(nodeList.data(), &LimitedNodeList::nodeAdded, this, &MessagesClient::handleNodeAdded); + connect(nodeList.data(), &LimitedNodeList::nodeActivated, this, &MessagesClient::handleNodeActivated); } void MessagesClient::init() { @@ -99,22 +99,10 @@ void MessagesClient::unsubscribe(const QString& channel) { } } -void MessagesClient::handleNodeAdded(SharedNodePointer node) { +void MessagesClient::handleNodeActivated(SharedNodePointer node) { if (node->getType() == NodeType::MessagesMixer) { - if (!node->getActiveSocket()) { - connect(node.data(), &NetworkPeer::socketActivated, this, &MessagesClient::socketActivated); - } else { - resubscribeToAll(); + for (const auto& channel : _subscribedChannels) { + subscribe(channel); } } -} - -void MessagesClient::socketActivated(const HifiSockAddr& sockAddr) { - resubscribeToAll(); -} - -void MessagesClient::resubscribeToAll() { - for (const auto& channel : _subscribedChannels) { - subscribe(channel); - } -} +} \ No newline at end of file diff --git a/libraries/networking/src/MessagesClient.h b/libraries/networking/src/MessagesClient.h index b5f590bc0d..a1ae4cb5ba 100644 --- a/libraries/networking/src/MessagesClient.h +++ b/libraries/networking/src/MessagesClient.h @@ -37,11 +37,9 @@ signals: private slots: void handleMessagesPacket(QSharedPointer packetList, SharedNodePointer senderNode); - void handleNodeAdded(SharedNodePointer node); - void socketActivated(const HifiSockAddr& sockAddr); + void handleNodeActivated(SharedNodePointer node); protected: - void resubscribeToAll(); QSet _subscribedChannels; };