Merge pull request #8797 from birarda/slow-localhost

fixes for various networking/ATP stuck download failure cases
This commit is contained in:
Clément Brisset 2016-10-15 18:11:02 -07:00 committed by GitHub
commit 281074f1d3
14 changed files with 265 additions and 99 deletions

View file

@ -519,7 +519,7 @@ void DomainServer::setupNodeListAndAssignments() {
// add whatever static assignments that have been parsed to the queue
addStaticAssignmentsToQueue();
// set a custum packetVersionMatch as the verify packet operator for the udt::Socket
// set a custom packetVersionMatch as the verify packet operator for the udt::Socket
nodeList->setPacketFilterOperator(&DomainServer::packetVersionMatch);
}

View file

@ -45,6 +45,8 @@ AssetClient::AssetClient() {
packetReceiver.registerListener(PacketType::AssetUploadReply, this, "handleAssetUploadReply");
connect(nodeList.data(), &LimitedNodeList::nodeKilled, this, &AssetClient::handleNodeKilled);
connect(nodeList.data(), &LimitedNodeList::clientConnectionToNodeReset,
this, &AssetClient::handleNodeClientConnectionReset);
}
void AssetClient::init() {
@ -233,15 +235,15 @@ MessageID AssetClient::getAsset(const QString& hash, DataOffset start, DataOffse
packet->writePrimitive(start);
packet->writePrimitive(end);
nodeList->sendPacket(std::move(packet), *assetServer);
if (nodeList->sendPacket(std::move(packet), *assetServer) != -1) {
_pendingRequests[assetServer][messageID] = { QSharedPointer<ReceivedMessage>(), callback, progressCallback };
_pendingRequests[assetServer][messageID] = { QSharedPointer<ReceivedMessage>(), callback, progressCallback };
return messageID;
} else {
callback(false, AssetServerError::NoError, QByteArray());
return INVALID_MESSAGE_ID;
return messageID;
}
}
callback(false, AssetServerError::NoError, QByteArray());
return INVALID_MESSAGE_ID;
}
MessageID AssetClient::getAssetInfo(const QString& hash, GetInfoCallback callback) {
@ -259,15 +261,15 @@ MessageID AssetClient::getAssetInfo(const QString& hash, GetInfoCallback callbac
packet->writePrimitive(messageID);
packet->write(QByteArray::fromHex(hash.toLatin1()));
nodeList->sendPacket(std::move(packet), *assetServer);
if (nodeList->sendPacket(std::move(packet), *assetServer) != -1) {
_pendingInfoRequests[assetServer][messageID] = callback;
_pendingInfoRequests[assetServer][messageID] = callback;
return messageID;
} else {
callback(false, AssetServerError::NoError, { "", 0 });
return INVALID_MESSAGE_ID;
return messageID;
}
}
callback(false, AssetServerError::NoError, { "", 0 });
return INVALID_MESSAGE_ID;
}
void AssetClient::handleAssetGetInfoReply(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) {
@ -453,15 +455,15 @@ MessageID AssetClient::getAssetMapping(const AssetPath& path, MappingOperationCa
packetList->writeString(path);
nodeList->sendPacketList(std::move(packetList), *assetServer);
if (nodeList->sendPacketList(std::move(packetList), *assetServer) != -1) {
_pendingMappingRequests[assetServer][messageID] = callback;
_pendingMappingRequests[assetServer][messageID] = callback;
return messageID;
} else {
callback(false, AssetServerError::NoError, QSharedPointer<ReceivedMessage>());
return INVALID_MESSAGE_ID;
return messageID;
}
}
callback(false, AssetServerError::NoError, QSharedPointer<ReceivedMessage>());
return INVALID_MESSAGE_ID;
}
MessageID AssetClient::getAllAssetMappings(MappingOperationCallback callback) {
@ -478,15 +480,15 @@ MessageID AssetClient::getAllAssetMappings(MappingOperationCallback callback) {
packetList->writePrimitive(AssetMappingOperationType::GetAll);
nodeList->sendPacketList(std::move(packetList), *assetServer);
if (nodeList->sendPacketList(std::move(packetList), *assetServer) != -1) {
_pendingMappingRequests[assetServer][messageID] = callback;
_pendingMappingRequests[assetServer][messageID] = callback;
return messageID;
} else {
callback(false, AssetServerError::NoError, QSharedPointer<ReceivedMessage>());
return INVALID_MESSAGE_ID;
return messageID;
}
}
callback(false, AssetServerError::NoError, QSharedPointer<ReceivedMessage>());
return INVALID_MESSAGE_ID;
}
MessageID AssetClient::deleteAssetMappings(const AssetPathList& paths, MappingOperationCallback callback) {
@ -507,15 +509,15 @@ MessageID AssetClient::deleteAssetMappings(const AssetPathList& paths, MappingOp
packetList->writeString(path);
}
nodeList->sendPacketList(std::move(packetList), *assetServer);
if (nodeList->sendPacketList(std::move(packetList), *assetServer) != -1) {
_pendingMappingRequests[assetServer][messageID] = callback;
_pendingMappingRequests[assetServer][messageID] = callback;
return messageID;
} else {
callback(false, AssetServerError::NoError, QSharedPointer<ReceivedMessage>());
return INVALID_MESSAGE_ID;
return messageID;
}
}
callback(false, AssetServerError::NoError, QSharedPointer<ReceivedMessage>());
return INVALID_MESSAGE_ID;
}
MessageID AssetClient::setAssetMapping(const QString& path, const AssetHash& hash, MappingOperationCallback callback) {
@ -535,15 +537,15 @@ MessageID AssetClient::setAssetMapping(const QString& path, const AssetHash& has
packetList->writeString(path);
packetList->write(QByteArray::fromHex(hash.toUtf8()));
nodeList->sendPacketList(std::move(packetList), *assetServer);
if (nodeList->sendPacketList(std::move(packetList), *assetServer) != -1) {
_pendingMappingRequests[assetServer][messageID] = callback;
_pendingMappingRequests[assetServer][messageID] = callback;
return messageID;
} else {
callback(false, AssetServerError::NoError, QSharedPointer<ReceivedMessage>());
return INVALID_MESSAGE_ID;
return messageID;
}
}
callback(false, AssetServerError::NoError, QSharedPointer<ReceivedMessage>());
return INVALID_MESSAGE_ID;
}
MessageID AssetClient::renameAssetMapping(const AssetPath& oldPath, const AssetPath& newPath, MappingOperationCallback callback) {
@ -561,16 +563,16 @@ MessageID AssetClient::renameAssetMapping(const AssetPath& oldPath, const AssetP
packetList->writeString(oldPath);
packetList->writeString(newPath);
nodeList->sendPacketList(std::move(packetList), *assetServer);
if (nodeList->sendPacketList(std::move(packetList), *assetServer) != -1) {
_pendingMappingRequests[assetServer][messageID] = callback;
_pendingMappingRequests[assetServer][messageID] = callback;
return messageID;
return messageID;
} else {
callback(false, AssetServerError::NoError, QSharedPointer<ReceivedMessage>());
return INVALID_MESSAGE_ID;
}
}
callback(false, AssetServerError::NoError, QSharedPointer<ReceivedMessage>());
return INVALID_MESSAGE_ID;
}
bool AssetClient::cancelMappingRequest(MessageID id) {
@ -646,15 +648,15 @@ MessageID AssetClient::uploadAsset(const QByteArray& data, UploadResultCallback
packetList->writePrimitive(size);
packetList->write(data.constData(), size);
nodeList->sendPacketList(std::move(packetList), *assetServer);
if (nodeList->sendPacketList(std::move(packetList), *assetServer) != -1) {
_pendingUploads[assetServer][messageID] = callback;
_pendingUploads[assetServer][messageID] = callback;
return messageID;
} else {
callback(false, AssetServerError::NoError, QString());
return INVALID_MESSAGE_ID;
return messageID;
}
}
callback(false, AssetServerError::NoError, QString());
return INVALID_MESSAGE_ID;
}
void AssetClient::handleAssetUploadReply(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) {
@ -704,6 +706,34 @@ void AssetClient::handleNodeKilled(SharedNodePointer node) {
return;
}
forceFailureOfPendingRequests(node);
{
auto messageMapIt = _pendingUploads.find(node);
if (messageMapIt != _pendingUploads.end()) {
for (const auto& value : messageMapIt->second) {
value.second(false, AssetServerError::NoError, "");
}
messageMapIt->second.clear();
}
}
}
void AssetClient::handleNodeClientConnectionReset(SharedNodePointer node) {
// a client connection to a Node was reset
// if it was an AssetServer we need to cause anything pending to fail so it is re-attempted
if (node->getType() != NodeType::AssetServer) {
return;
}
qCDebug(asset_client) << "AssetClient detected client connection reset handshake with Asset Server - failing any pending requests";
forceFailureOfPendingRequests(node);
}
void AssetClient::forceFailureOfPendingRequests(SharedNodePointer node) {
{
auto messageMapIt = _pendingRequests.find(node);
if (messageMapIt != _pendingRequests.end()) {
@ -731,16 +761,6 @@ void AssetClient::handleNodeKilled(SharedNodePointer node) {
}
}
{
auto messageMapIt = _pendingUploads.find(node);
if (messageMapIt != _pendingUploads.end()) {
for (const auto& value : messageMapIt->second) {
value.second(false, AssetServerError::NoError, "");
}
messageMapIt->second.clear();
}
}
{
auto messageMapIt = _pendingMappingRequests.find(node);
if (messageMapIt != _pendingMappingRequests.end()) {

View file

@ -75,6 +75,7 @@ private slots:
void handleAssetUploadReply(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
void handleNodeKilled(SharedNodePointer node);
void handleNodeClientConnectionReset(SharedNodePointer node);
private:
MessageID getAssetMapping(const AssetHash& hash, MappingOperationCallback callback);
@ -96,6 +97,8 @@ private:
void handleProgressCallback(const QWeakPointer<Node>& node, MessageID messageID, qint64 size, DataOffset length);
void handleCompleteCallback(const QWeakPointer<Node>& node, MessageID messageID);
void forceFailureOfPendingRequests(SharedNodePointer node);
struct GetAssetRequestData {
QSharedPointer<ReceivedMessage> message;
ReceivedAssetCallback completeCallback;

View file

@ -32,6 +32,7 @@
#include <UUID.h>
#include "AccountManager.h"
#include "AssetClient.h"
#include "Assignment.h"
#include "HifiSockAddr.h"
#include "NetworkLogging.h"
@ -41,7 +42,8 @@ static Setting::Handle<quint16> LIMITED_NODELIST_LOCAL_PORT("LimitedNodeList.Loc
const std::set<NodeType_t> SOLO_NODE_TYPES = {
NodeType::AvatarMixer,
NodeType::AudioMixer
NodeType::AudioMixer,
NodeType::AssetServer
};
LimitedNodeList::LimitedNodeList(int socketListenPort, int dtlsListenPort) :
@ -113,6 +115,12 @@ LimitedNodeList::LimitedNodeList(int socketListenPort, int dtlsListenPort) :
using std::placeholders::_1;
_nodeSocket.setPacketFilterOperator(std::bind(&LimitedNodeList::isPacketVerified, this, _1));
// set our socketBelongsToNode method as the connection creation filter operator for the udt::Socket
_nodeSocket.setConnectionCreationFilterOperator(std::bind(&LimitedNodeList::sockAddrBelongsToNode, this, _1));
// handle when a socket connection has its receiver side reset - might need to emit clientConnectionToNodeReset
connect(&_nodeSocket, &udt::Socket::clientHandshakeRequestComplete, this, &LimitedNodeList::clientConnectionToSockAddrReset);
_packetStatTimer.start();
if (_stunSockAddr.getAddress().isNull()) {
@ -317,6 +325,8 @@ void LimitedNodeList::fillPacketHeader(const NLPacket& packet, const QUuid& conn
}
}
static const qint64 ERROR_SENDING_PACKET_BYTES = -1;
qint64 LimitedNodeList::sendUnreliablePacket(const NLPacket& packet, const Node& destinationNode) {
Q_ASSERT(!packet.isPartOfMessage());
@ -353,7 +363,7 @@ qint64 LimitedNodeList::sendPacket(std::unique_ptr<NLPacket> packet, const Node&
return sendPacket(std::move(packet), *activeSocket, destinationNode.getConnectionSecret());
} else {
qCDebug(networking) << "LimitedNodeList::sendPacket called without active socket for node" << destinationNode << "- not sending";
return 0;
return ERROR_SENDING_PACKET_BYTES;
}
}
@ -392,7 +402,7 @@ qint64 LimitedNodeList::sendPacketList(NLPacketList& packetList, const Node& des
} else {
qCDebug(networking) << "LimitedNodeList::sendPacketList called without active socket for node" << destinationNode
<< " - not sending.";
return 0;
return ERROR_SENDING_PACKET_BYTES;
}
}
@ -438,7 +448,7 @@ qint64 LimitedNodeList::sendPacketList(std::unique_ptr<NLPacketList> packetList,
return _nodeSocket.writePacketList(std::move(packetList), *activeSocket);
} else {
qCDebug(networking) << "LimitedNodeList::sendPacketList called without active socket for node. Not sending.";
return 0;
return ERROR_SENDING_PACKET_BYTES;
}
}
@ -446,7 +456,7 @@ qint64 LimitedNodeList::sendPacket(std::unique_ptr<NLPacket> packet, const Node&
const HifiSockAddr& overridenSockAddr) {
if (overridenSockAddr.isNull() && !destinationNode.getActiveSocket()) {
qCDebug(networking) << "LimitedNodeList::sendPacket called without active socket for node. Not sending.";
return 0;
return ERROR_SENDING_PACKET_BYTES;
}
// use the node's active socket as the destination socket if there is no overriden socket address
@ -1136,3 +1146,23 @@ void LimitedNodeList::flagTimeForConnectionStep(ConnectionStep connectionStep, q
}
}
}
void LimitedNodeList::clientConnectionToSockAddrReset(const HifiSockAddr& sockAddr) {
// for certain reliable channels higher level classes may need to know if the udt::Connection has been reset
auto matchingNode = findNodeWithAddr(sockAddr);
if (matchingNode) {
emit clientConnectionToNodeReset(matchingNode);
}
}
#if (PR_BUILD || DEV_BUILD)
void LimitedNodeList::sendFakedHandshakeRequestToNode(SharedNodePointer node) {
if (node && node->getActiveSocket()) {
_nodeSocket.sendFakedHandshakeRequest(*node->getActiveSocket());
}
}
#endif

View file

@ -41,6 +41,7 @@
#include "NLPacketList.h"
#include "PacketReceiver.h"
#include "ReceivedMessage.h"
#include "udt/ControlPacket.h"
#include "udt/PacketHeaders.h"
#include "udt/Socket.h"
#include "UUIDHasher.h"
@ -235,6 +236,9 @@ public:
static void makeSTUNRequestPacket(char* stunRequestPacket);
#if (PR_BUILD || DEV_BUILD)
void sendFakedHandshakeRequestToNode(SharedNodePointer node);
#endif
public slots:
void reset();
@ -262,6 +266,8 @@ signals:
void nodeKilled(SharedNodePointer);
void nodeActivated(SharedNodePointer);
void clientConnectionToNodeReset(SharedNodePointer);
void localSockAddrChanged(const HifiSockAddr& localSockAddr);
void publicSockAddrChanged(const HifiSockAddr& publicSockAddr);
@ -274,6 +280,8 @@ signals:
protected slots:
void connectedForLocalSocketTest();
void errorTestingLocalSocket();
void clientConnectionToSockAddrReset(const HifiSockAddr& sockAddr);
protected:
LimitedNodeList(int socketListenPort = INVALID_PORT, int dtlsListenPort = INVALID_PORT);
@ -299,6 +307,8 @@ protected:
void sendPacketToIceServer(PacketType packetType, const HifiSockAddr& iceServerSockAddr, const QUuid& clientID,
const QUuid& peerRequestID = QUuid());
bool sockAddrBelongsToNode(const HifiSockAddr& sockAddr) { return findNodeWithAddr(sockAddr) != SharedNodePointer(); }
QUuid _sessionUUID;
NodeHash _nodeHash;
mutable QReadWriteLock _nodeMutex;

View file

@ -104,6 +104,10 @@ NodeList::NodeList(char newOwnerType, int socketListenPort, int dtlsListenPort)
connect(&_domainHandler, SIGNAL(connectedToDomain(QString)), &_keepAlivePingTimer, SLOT(start()));
connect(&_domainHandler, &DomainHandler::disconnectedFromDomain, &_keepAlivePingTimer, &QTimer::stop);
// set our sockAddrBelongsToDomainOrNode method as the connection creation filter for the udt::Socket
using std::placeholders::_1;
_nodeSocket.setConnectionCreationFilterOperator(std::bind(&NodeList::sockAddrBelongsToDomainOrNode, this, _1));
// we definitely want STUN to update our public socket, so call the LNL to kick that off
startSTUNPublicSocketUpdate();
@ -703,6 +707,10 @@ void NodeList::sendKeepAlivePings() {
});
}
bool NodeList::sockAddrBelongsToDomainOrNode(const HifiSockAddr& sockAddr) {
return _domainHandler.getSockAddr() == sockAddr || LimitedNodeList::sockAddrBelongsToNode(sockAddr);
}
void NodeList::ignoreNodeBySessionID(const QUuid& nodeID) {
// enumerate the nodes to send a reliable ignore packet to each that can leverage it

View file

@ -132,6 +132,8 @@ private:
void pingPunchForInactiveNode(const SharedNodePointer& node);
bool sockAddrBelongsToDomainOrNode(const HifiSockAddr& sockAddr);
NodeType_t _ownerType;
NodeSet _nodeTypesOfInterest;
DomainHandler _domainHandler;

View file

@ -426,18 +426,25 @@ SequenceNumber Connection::nextACK() const {
}
}
void Connection::sendHandshakeRequest() {
auto handshakeRequestPacket = ControlPacket::create(ControlPacket::HandshakeRequest, 0);
_parentSocket->writeBasePacket(*handshakeRequestPacket, _destination);
_didRequestHandshake = true;
}
bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, int packetSize, int payloadSize) {
if (!_hasReceivedHandshake) {
// Refuse to process any packets until we've received the handshake
// Send handshake request to re-request a handshake
auto handshakeRequestPacket = ControlPacket::create(ControlPacket::HandshakeRequest, 0);
_parentSocket->writeBasePacket(*handshakeRequestPacket, _destination);
#ifdef UDT_CONNECTION_DEBUG
qCDebug(networking) << "Received packet before receiving handshake, sending HandshakeRequest";
#endif
sendHandshakeRequest();
return false;
}
@ -789,6 +796,11 @@ void Connection::processHandshake(ControlPacketPointer controlPacket) {
// indicate that handshake has been received
_hasReceivedHandshake = true;
if (_didRequestHandshake) {
emit receiverHandshakeRequestComplete(_destination);
_didRequestHandshake = false;
}
}
void Connection::processHandshakeACK(ControlPacketPointer controlPacket) {

View file

@ -79,10 +79,13 @@ public:
void setMaxBandwidth(int maxBandwidth);
void sendHandshakeRequest();
signals:
void packetSent();
void connectionInactive(const HifiSockAddr& sockAddr);
void receiverHandshakeRequestComplete(const HifiSockAddr& sockAddr);
private slots:
void recordSentPackets(int payload, int total);
void recordRetransmission();
@ -129,6 +132,7 @@ private:
bool _hasReceivedHandshake { false }; // flag for receipt of handshake from server
bool _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client
bool _didRequestHandshake { false }; // flag for request of handshake from server
p_high_resolution_clock::time_point _connectionStart = p_high_resolution_clock::now(); // holds the time_point for creation of this connection
p_high_resolution_clock::time_point _lastReceiveTime; // holds the last time we received anything from sender

View file

@ -97,6 +97,9 @@ SendQueue::SendQueue(Socket* socket, HifiSockAddr dest) :
_currentSequenceNumber = _initialSequenceNumber - 1;
_atomicCurrentSequenceNumber = uint32_t(_currentSequenceNumber);
_lastACKSequenceNumber = uint32_t(_currentSequenceNumber) - 1;
// default the last receiver response to the current time
_lastReceiverResponse = QDateTime::currentMSecsSinceEpoch();
}
void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
@ -166,7 +169,7 @@ void SendQueue::ack(SequenceNumber ack) {
void SendQueue::nak(SequenceNumber start, SequenceNumber end) {
// this is a response from the client, re-set our timeout expiry
_lastReceiverResponse = QDateTime::currentMSecsSinceEpoch();
_lastReceiverResponse = QDateTime::currentMSecsSinceEpoch();
{
std::lock_guard<std::mutex> nakLocker(_naksLock);
@ -520,7 +523,6 @@ bool SendQueue::isInactive(bool attemptedToSendPacket) {
if (sinceLastResponse > 0 &&
sinceLastResponse >= int64_t(NUM_TIMEOUTS_BEFORE_INACTIVE * (_estimatedTimeout / USECS_PER_MSEC)) &&
_lastReceiverResponse > 0 &&
sinceLastResponse > MIN_MS_BEFORE_INACTIVE) {
// If the flow window has been full for over CONSIDER_INACTIVE_AFTER,
// then signal the queue is inactive and return so it can be cleaned up

View file

@ -171,11 +171,28 @@ qint64 Socket::writePacketList(std::unique_ptr<PacketList> packetList, const Hif
}
void Socket::writeReliablePacket(Packet* packet, const HifiSockAddr& sockAddr) {
findOrCreateConnection(sockAddr).sendReliablePacket(std::unique_ptr<Packet>(packet));
auto connection = findOrCreateConnection(sockAddr);
if (connection) {
connection->sendReliablePacket(std::unique_ptr<Packet>(packet));
}
#ifdef UDT_CONNECTION_DEBUG
else {
qCDebug(networking) << "Socket::writeReliablePacket refusing to send packet - no connection was created";
}
#endif
}
void Socket::writeReliablePacketList(PacketList* packetList, const HifiSockAddr& sockAddr) {
findOrCreateConnection(sockAddr).sendReliablePacketList(std::unique_ptr<PacketList>(packetList));
auto connection = findOrCreateConnection(sockAddr);
if (connection) {
connection->sendReliablePacketList(std::unique_ptr<PacketList>(packetList));
}
#ifdef UDT_CONNECTION_DEBUG
else {
qCDebug(networking) << "Socket::writeReliablePacketList refusing to send packet list - no connection was created";
}
#endif
}
qint64 Socket::writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr) {
@ -198,25 +215,40 @@ qint64 Socket::writeDatagram(const QByteArray& datagram, const HifiSockAddr& soc
return bytesWritten;
}
Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) {
Connection* Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) {
auto it = _connectionsHash.find(sockAddr);
if (it == _connectionsHash.end()) {
auto congestionControl = _ccFactory->create();
congestionControl->setMaxBandwidth(_maxBandwidth);
auto connection = std::unique_ptr<Connection>(new Connection(this, sockAddr, std::move(congestionControl)));
// we did not have a matching connection, time to see if we should make one
// we queue the connection to cleanup connection in case it asks for it during its own rate control sync
QObject::connect(connection.get(), &Connection::connectionInactive, this, &Socket::cleanupConnection);
if (_connectionCreationFilterOperator && !_connectionCreationFilterOperator(sockAddr)) {
// the connection creation filter did not allow us to create a new connection
#ifdef UDT_CONNECTION_DEBUG
qCDebug(networking) << "Socket::findOrCreateConnection refusing to create connection for" << sockAddr
<< "due to connection creation filter";
#endif
return nullptr;
} else {
auto congestionControl = _ccFactory->create();
congestionControl->setMaxBandwidth(_maxBandwidth);
auto connection = std::unique_ptr<Connection>(new Connection(this, sockAddr, std::move(congestionControl)));
// we queue the connection to cleanup connection in case it asks for it during its own rate control sync
QObject::connect(connection.get(), &Connection::connectionInactive, this, &Socket::cleanupConnection);
// allow higher-level classes to find out when connections have completed a handshake
QObject::connect(connection.get(), &Connection::receiverHandshakeRequestComplete,
this, &Socket::clientHandshakeRequestComplete);
#ifdef UDT_CONNECTION_DEBUG
qCDebug(networking) << "Creating new connection to" << sockAddr;
qCDebug(networking) << "Creating new connection to" << sockAddr;
#endif
it = _connectionsHash.insert(it, std::make_pair(sockAddr, std::move(connection)));
it = _connectionsHash.insert(it, std::make_pair(sockAddr, std::move(connection)));
}
}
return *it->second;
return it->second.get();
}
void Socket::clearConnections() {
@ -292,9 +324,12 @@ void Socket::readPendingDatagrams() {
// setup a control packet from the data we just read
auto controlPacket = ControlPacket::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr);
// move this control packet to the matching connection
auto& connection = findOrCreateConnection(senderSockAddr);
connection.processControl(move(controlPacket));
// move this control packet to the matching connection, if there is one
auto connection = findOrCreateConnection(senderSockAddr);
if (connection) {
connection->processControl(move(controlPacket));
}
} else {
// setup a Packet from the data we just read
@ -304,19 +339,21 @@ void Socket::readPendingDatagrams() {
if (!_packetFilterOperator || _packetFilterOperator(*packet)) {
if (packet->isReliable()) {
// if this was a reliable packet then signal the matching connection with the sequence number
auto& connection = findOrCreateConnection(senderSockAddr);
auto connection = findOrCreateConnection(senderSockAddr);
if (!connection.processReceivedSequenceNumber(packet->getSequenceNumber(),
packet->getDataSize(),
packet->getPayloadSize())) {
// the connection indicated that we should not continue processing this packet
if (!connection || !connection->processReceivedSequenceNumber(packet->getSequenceNumber(),
packet->getDataSize(),
packet->getPayloadSize())) {
// the connection could not be created or indicated that we should not continue processing this packet
continue;
}
}
if (packet->isPartOfMessage()) {
auto& connection = findOrCreateConnection(senderSockAddr);
connection.queueReceivedMessagePacket(std::move(packet));
auto connection = findOrCreateConnection(senderSockAddr);
if (connection) {
connection->queueReceivedMessagePacket(std::move(packet));
}
} else if (_packetHandler) {
// call the verified packet callback to let it handle this packet
_packetHandler(std::move(packet));
@ -427,3 +464,14 @@ void Socket::handleStateChanged(QAbstractSocket::SocketState socketState) {
qCWarning(networking) << "udt::Socket state changed - state is now" << socketState;
}
}
#if (PR_BUILD || DEV_BUILD)
void Socket::sendFakedHandshakeRequest(const HifiSockAddr& sockAddr) {
auto connection = findOrCreateConnection(sockAddr);
if (connection) {
connection->sendHandshakeRequest();
}
}
#endif

View file

@ -37,6 +37,7 @@ class PacketList;
class SequenceNumber;
using PacketFilterOperator = std::function<bool(const Packet&)>;
using ConnectionCreationFilterOperator = std::function<bool(const HifiSockAddr&)>;
using BasePacketHandler = std::function<void(std::unique_ptr<BasePacket>)>;
using PacketHandler = std::function<void(std::unique_ptr<Packet>)>;
@ -68,6 +69,8 @@ public:
void setPacketHandler(PacketHandler handler) { _packetHandler = handler; }
void setMessageHandler(MessageHandler handler) { _messageHandler = handler; }
void setMessageFailureHandler(MessageFailureHandler handler) { _messageFailureHandler = handler; }
void setConnectionCreationFilterOperator(ConnectionCreationFilterOperator filterOperator)
{ _connectionCreationFilterOperator = filterOperator; }
void addUnfilteredHandler(const HifiSockAddr& senderSockAddr, BasePacketHandler handler)
{ _unfilteredHandlers[senderSockAddr] = handler; }
@ -80,6 +83,13 @@ public:
StatsVector sampleStatsForAllConnections();
#if (PR_BUILD || DEV_BUILD)
void sendFakedHandshakeRequest(const HifiSockAddr& sockAddr);
#endif
signals:
void clientHandshakeRequestComplete(const HifiSockAddr& sockAddr);
public slots:
void cleanupConnection(HifiSockAddr sockAddr);
void clearConnections();
@ -93,7 +103,8 @@ private slots:
private:
void setSystemBufferSizes();
Connection& findOrCreateConnection(const HifiSockAddr& sockAddr);
Connection* findOrCreateConnection(const HifiSockAddr& sockAddr);
bool socketMatchesNodeOrDomain(const HifiSockAddr& sockAddr);
// privatized methods used by UDTTest - they are private since they must be called on the Socket thread
ConnectionStats::Stats sampleStatsForConnection(const HifiSockAddr& destination);
@ -109,6 +120,7 @@ private:
PacketHandler _packetHandler;
MessageHandler _messageHandler;
MessageFailureHandler _messageFailureHandler;
ConnectionCreationFilterOperator _connectionCreationFilterOperator;
std::unordered_map<HifiSockAddr, BasePacketHandler> _unfilteredHandlers;
std::unordered_map<HifiSockAddr, SequenceNumber> _unreliableSequenceNumbers;

View file

@ -17,6 +17,7 @@
#include <AssetUpload.h>
#include <MappingRequest.h>
#include <NetworkLogging.h>
#include <NodeList.h>
AssetScriptingInterface::AssetScriptingInterface(QScriptEngine* engine) :
_engine(engine)
@ -86,3 +87,13 @@ void AssetScriptingInterface::downloadData(QString urlString, QScriptValue callb
assetRequest->start();
}
#if (PR_BUILD || DEV_BUILD)
void AssetScriptingInterface::sendFakedHandshake() {
auto nodeList = DependencyManager::get<NodeList>();
SharedNodePointer assetServer = nodeList->soloNodeOfType(NodeType::AssetServer);
nodeList->sendFakedHandshakeRequestToNode(assetServer);
}
#endif

View file

@ -28,6 +28,10 @@ public:
Q_INVOKABLE void downloadData(QString url, QScriptValue downloadComplete);
Q_INVOKABLE void setMapping(QString path, QString hash, QScriptValue callback);
#if (PR_BUILD || DEV_BUILD)
Q_INVOKABLE void sendFakedHandshake();
#endif
protected:
QSet<AssetRequest*> _pendingRequests;
QScriptEngine* _engine;