change ice-server heartbeat behaviour

This commit is contained in:
Stephen Birarda 2015-05-28 14:05:43 -07:00
parent eb6b80133b
commit 531ef1fa1c
9 changed files with 135 additions and 144 deletions

View file

@ -1357,24 +1357,21 @@ void DomainServer::sendICEPingPackets() {
}
}
void DomainServer::processICEHeartbeatResponse(const QByteArray& packet) {
void DomainServer::processICEPeerInformation(const QByteArray& packet) {
// loop through the packet and pull out network peers
// any peer we don't have we add to the hash, otherwise we update
QDataStream iceResponseStream(packet);
iceResponseStream.skipRawData(numBytesForPacketHeader(packet));
NetworkPeer receivedPeer;
iceResponseStream >> receivedPeer;
while (!iceResponseStream.atEnd()) {
iceResponseStream >> receivedPeer;
if (!_connectedICEPeers.contains(receivedPeer.getUUID())) {
if (!_connectingICEPeers.contains(receivedPeer.getUUID())) {
qDebug() << "New peer requesting connection being added to hash -" << receivedPeer;
}
_connectingICEPeers[receivedPeer.getUUID()] = receivedPeer;
if (!_connectedICEPeers.contains(receivedPeer.getUUID())) {
if (!_connectingICEPeers.contains(receivedPeer.getUUID())) {
qDebug() << "New peer requesting connection being added to hash -" << receivedPeer;
}
_connectingICEPeers[receivedPeer.getUUID()] = receivedPeer;
}
}
@ -1458,8 +1455,8 @@ void DomainServer::processDatagram(const QByteArray& receivedPacket, const HifiS
processICEPingReply(receivedPacket, senderSockAddr);
break;
}
case PacketTypeIceServerHeartbeatResponse:
processICEHeartbeatResponse(receivedPacket);
case PacketTypeIceServerPeerInformation:
processICEPeerInformation(receivedPacket);
break;
default:
break;

View file

@ -79,7 +79,7 @@ private:
void setupAutomaticNetworking();
void sendHeartbeatToDataServer(const QString& networkAddress);
void processICEPingReply(const QByteArray& packet, const HifiSockAddr& senderSockAddr);
void processICEHeartbeatResponse(const QByteArray& packet);
void processICEPeerInformation(const QByteArray& packet);
void processDatagram(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr);

View file

@ -33,133 +33,113 @@ IceServer::IceServer(int argc, char* argv[]) :
qDebug() << "ice-server socket is listening on" << ICE_SERVER_DEFAULT_PORT;
qDebug() << "monitoring http endpoint is listening on " << ICE_SERVER_MONITORING_PORT;
_serverSocket.bind(QHostAddress::AnyIPv4, ICE_SERVER_DEFAULT_PORT);
// call our process datagrams slot when the UDP socket has packets ready
connect(&_serverSocket, &QUdpSocket::readyRead, this, &IceServer::processDatagrams);
// setup our timer to clear inactive peers
QTimer* inactivePeerTimer = new QTimer(this);
connect(inactivePeerTimer, &QTimer::timeout, this, &IceServer::clearInactivePeers);
inactivePeerTimer->start(CLEAR_INACTIVE_PEERS_INTERVAL_MSECS);
}
void IceServer::processDatagrams() {
HifiSockAddr sendingSockAddr;
QByteArray incomingPacket;
while (_serverSocket.hasPendingDatagrams()) {
incomingPacket.resize(_serverSocket.pendingDatagramSize());
_serverSocket.readDatagram(incomingPacket.data(), incomingPacket.size(),
sendingSockAddr.getAddressPointer(), sendingSockAddr.getPortPointer());
if (packetTypeForPacket(incomingPacket) == PacketTypeIceServerHeartbeat) {
PacketType packetType = packetTypeForPacket(incomingPacket);
if (packetType == PacketTypeIceServerHeartbeat) {
addOrUpdateHeartbeatingPeer(incomingPacket);
} else if (packetType == PacketTypeIceServerQuery) {
// this is a node hoping to connect to a heartbeating peer - do we have the heartbeating peer?
QUuid senderUUID = uuidFromPacketHeader(incomingPacket);
// pull the public and private sock addrs for this peer
HifiSockAddr publicSocket, localSocket;
QDataStream hearbeatStream(incomingPacket);
hearbeatStream.skipRawData(numBytesForPacketHeader(incomingPacket));
hearbeatStream >> publicSocket >> localSocket;
// make sure we have this sender in our peer hash
SharedNetworkPeer matchingPeer = _activePeers.value(senderUUID);
if (!matchingPeer) {
// if we don't have this sender we need to create them now
matchingPeer = SharedNetworkPeer(new NetworkPeer(senderUUID, publicSocket, localSocket));
_activePeers.insert(senderUUID, matchingPeer);
qDebug() << "Added a new network peer" << *matchingPeer;
} else {
// we already had the peer so just potentially update their sockets
matchingPeer->setPublicSocket(publicSocket);
matchingPeer->setLocalSocket(localSocket);
qDebug() << "Matched hearbeat to existing network peer" << *matchingPeer;
}
// update our last heard microstamp for this network peer to now
matchingPeer->setLastHeardMicrostamp(usecTimestampNow());
// check if this node also included a UUID that they would like to connect to
QUuid connectRequestID;
hearbeatStream >> connectRequestID;
// get the peers asking for connections with this peer
QSet<QUuid>& requestingConnections = _currentConnections[senderUUID];
if (!connectRequestID.isNull()) {
qDebug() << "Peer wants to connect to peer with ID" << uuidStringWithoutCurlyBraces(connectRequestID);
// ensure this peer is in the set of current connections for the peer with ID it wants to connect with
_currentConnections[connectRequestID].insert(senderUUID);
// add the ID of the node they have said they would like to connect to
requestingConnections.insert(connectRequestID);
}
if (requestingConnections.size() > 0) {
// send a heartbeart response based on the set of connections
qDebug() << "Sending a heartbeat response to" << senderUUID << "who has" << requestingConnections.size()
<< "potential connections";
sendHeartbeatResponse(sendingSockAddr, requestingConnections);
SharedNetworkPeer matchingPeer = _activePeers.value(connectRequestID);
if (matchingPeer) {
// we have the peer they want to connect to - send them pack the information for that peer
sendPeerInformationPacket(matchingPeer, sendingSockAddr);
}
}
}
}
void IceServer::sendHeartbeatResponse(const HifiSockAddr& destinationSockAddr, QSet<QUuid>& connections) {
QSet<QUuid>::iterator peerID = connections.begin();
SharedNetworkPeer IceServer::addOrUpdateHeartbeatingPeer(const QByteArray& incomingPacket) {
QUuid senderUUID = uuidFromPacketHeader(incomingPacket);
// pull the public and private sock addrs for this peer
HifiSockAddr publicSocket, localSocket;
QDataStream hearbeatStream(incomingPacket);
hearbeatStream.skipRawData(numBytesForPacketHeader(incomingPacket));
hearbeatStream >> publicSocket >> localSocket;
// make sure we have this sender in our peer hash
SharedNetworkPeer matchingPeer = _activePeers.value(senderUUID);
if (!matchingPeer) {
// if we don't have this sender we need to create them now
matchingPeer = SharedNetworkPeer(new NetworkPeer(senderUUID, publicSocket, localSocket));
_activePeers.insert(senderUUID, matchingPeer);
qDebug() << "Added a new network peer" << *matchingPeer;
} else {
// we already had the peer so just potentially update their sockets
matchingPeer->setPublicSocket(publicSocket);
matchingPeer->setLocalSocket(localSocket);
qDebug() << "Matched hearbeat to existing network peer" << *matchingPeer;
}
// update our last heard microstamp for this network peer to now
matchingPeer->setLastHeardMicrostamp(usecTimestampNow());
return matchingPeer;
}
void IceServer::sendPeerInformationPacket(const SharedNetworkPeer& peer, const HifiSockAddr& destinationSockAddr) {
QByteArray outgoingPacket(MAX_PACKET_SIZE, 0);
int currentPacketSize = populatePacketHeaderWithUUID(outgoingPacket, PacketTypeIceServerHeartbeatResponse, _id);
int currentPacketSize = populatePacketHeaderWithUUID(outgoingPacket, PacketTypeIceServerPeerInformation, _id);
int numHeaderBytes = currentPacketSize;
// go through the connections, sending packets containing connection information for those nodes
while (peerID != connections.end()) {
SharedNetworkPeer matchingPeer = _activePeers.value(*peerID);
// if this node is inactive we remove it from the set
if (!matchingPeer) {
peerID = connections.erase(peerID);
} else {
// get the byte array for this peer
QByteArray peerBytes = matchingPeer->toByteArray();
if (currentPacketSize + peerBytes.size() > MAX_PACKET_SIZE) {
// write the current packet
_serverSocket.writeDatagram(outgoingPacket.data(), currentPacketSize,
destinationSockAddr.getAddress(), destinationSockAddr.getPort());
// reset the packet size to our number of header bytes
currentPacketSize = populatePacketHeaderWithUUID(outgoingPacket, PacketTypeIceServerHeartbeatResponse, _id);
}
// append the current peer bytes
outgoingPacket.insert(currentPacketSize, peerBytes);
currentPacketSize += peerBytes.size();
++peerID;
}
}
if (currentPacketSize > numHeaderBytes) {
// write the last packet, if there is data in it
_serverSocket.writeDatagram(outgoingPacket.data(), currentPacketSize,
destinationSockAddr.getAddress(), destinationSockAddr.getPort());
}
// get the byte array for this peer
QByteArray peerBytes = peer->toByteArray();
outgoingPacket.replace(numHeaderBytes, peerBytes.size(), peerBytes);
currentPacketSize += peerBytes.size();
// write the current packet
_serverSocket.writeDatagram(outgoingPacket.data(), outgoingPacket.size(),
destinationSockAddr.getAddress(), destinationSockAddr.getPort());
}
void IceServer::clearInactivePeers() {
NetworkPeerHash::iterator peerItem = _activePeers.begin();
while (peerItem != _activePeers.end()) {
SharedNetworkPeer peer = peerItem.value();
if ((usecTimestampNow() - peer->getLastHeardMicrostamp()) > (PEER_SILENCE_THRESHOLD_MSECS * 1000)) {
qDebug() << "Removing peer from memory for inactivity -" << *peer;
peerItem = _activePeers.erase(peerItem);
@ -171,11 +151,9 @@ void IceServer::clearInactivePeers() {
}
bool IceServer::handleHTTPRequest(HTTPConnection* connection, const QUrl& url, bool skipSubHandler) {
//
// We need an HTTP handler in order to monitor the health of the ice server
// The correct functioning of the ICE server will be determined by its HTTP availability,
//
if (connection->requestOperation() == QNetworkAccessManager::GetOperation) {
if (url.path() == "/status") {
connection->respond(HTTPConnection::StatusCode200, QByteArray::number(_activePeers.size()));

View file

@ -31,14 +31,14 @@ private slots:
void processDatagrams();
void clearInactivePeers();
private:
void sendHeartbeatResponse(const HifiSockAddr& destinationSockAddr, QSet<QUuid>& connections);
SharedNetworkPeer addOrUpdateHeartbeatingPeer(const QByteArray& incomingPacket);
void sendPeerInformationPacket(const SharedNetworkPeer& peer, const HifiSockAddr& destinationSockAddr);
QUuid _id;
QUdpSocket _serverSocket;
NetworkPeerHash _activePeers;
QHash<QUuid, QSet<QUuid> > _currentConnections;
HTTPManager _httpManager;
};
#endif // hifi_IceServer_h
#endif // hifi_IceServer_h

View file

@ -814,23 +814,30 @@ void LimitedNodeList::updateLocalSockAddr() {
}
}
void LimitedNodeList::sendHeartbeatToIceServer(const HifiSockAddr& iceServerSockAddr,
QUuid headerID, const QUuid& connectionRequestID) {
void LimitedNodeList::sendHeartbeatToIceServer(const HifiSockAddr& iceServerSockAddr) {
sendPacketToIceServer(PacketTypeIceServerHeartbeat, iceServerSockAddr, _sessionUUID);
}
if (headerID.isNull()) {
headerID = _sessionUUID;
}
void LimitedNodeList::sendPeerQueryToIceServer(const HifiSockAddr& iceServerSockAddr, const QUuid& clientID,
const QUuid& peerID) {
sendPacketToIceServer(PacketTypeIceServerQuery, iceServerSockAddr, clientID, peerID);
}
QByteArray iceRequestByteArray = byteArrayWithUUIDPopulatedHeader(PacketTypeIceServerHeartbeat, headerID);
void LimitedNodeList::sendPacketToIceServer(PacketType packetType, const HifiSockAddr& iceServerSockAddr,
const QUuid& headerID, const QUuid& peerID) {
QByteArray iceRequestByteArray = byteArrayWithUUIDPopulatedHeader(packetType, headerID);
QDataStream iceDataStream(&iceRequestByteArray, QIODevice::Append);
iceDataStream << _publicSockAddr << _localSockAddr;
if (!connectionRequestID.isNull()) {
iceDataStream << connectionRequestID;
if (packetType == PacketTypeIceServerQuery) {
assert(!peerID.isNull());
iceDataStream << peerID;
qCDebug(networking) << "Sending packet to ICE server to request connection info for peer with ID"
<< uuidStringWithoutCurlyBraces(connectionRequestID);
<< uuidStringWithoutCurlyBraces(peerID);
}
writeUnverifiedDatagram(iceRequestByteArray, iceServerSockAddr);

View file

@ -89,7 +89,7 @@ public:
SetPublicSocketFromSTUN,
SetICEServerHostname,
SetICEServerSocket,
SendICEServerHearbeat,
SendICEServerQuery,
ReceiveDSPeerInformation,
SendPingsToDS,
SetDomainHostname,
@ -178,8 +178,8 @@ public:
virtual bool processSTUNResponse(const QByteArray& packet);
void sendHeartbeatToIceServer(const HifiSockAddr& iceServerSockAddr,
QUuid headerID = QUuid(), const QUuid& connectRequestID = QUuid());
void sendHeartbeatToIceServer(const HifiSockAddr& iceServerSockAddr);
void sendPeerQueryToIceServer(const HifiSockAddr& iceServerSockAddr, const QUuid& clientID, const QUuid& peerID);
template<typename NodeLambda>
void eachNode(NodeLambda functor) {
@ -276,6 +276,9 @@ protected:
void stopInitialSTUNUpdate(bool success);
void sendPacketToIceServer(PacketType packetType, const HifiSockAddr& iceServerSockAddr, const QUuid& headerID,
const QUuid& peerRequestID = QUuid());
QUuid _sessionUUID;
NodeHash _nodeHash;
QReadWriteLock _nodeMutex;

View file

@ -192,7 +192,7 @@ void NodeList::processNodeData(const HifiSockAddr& senderSockAddr, const QByteAr
_domainHandler.parseDTLSRequirementPacket(packet);
break;
}
case PacketTypeIceServerHeartbeatResponse: {
case PacketTypeIceServerPeerInformation: {
_domainHandler.processICEResponsePacket(packet);
break;
}
@ -482,9 +482,9 @@ void NodeList::handleICEConnectionToDomainServer() {
_domainHandler.getICEPeer().resetConnectionAttempts();
flagTimeForConnectionStep(LimitedNodeList::ConnectionStep::SendICEServerHearbeat);
flagTimeForConnectionStep(LimitedNodeList::ConnectionStep::SendICEServerQuery);
LimitedNodeList::sendHeartbeatToIceServer(_domainHandler.getICEServerSockAddr(),
LimitedNodeList::sendPeerQueryToIceServer(_domainHandler.getICEServerSockAddr(),
_domainHandler.getICEClientID(),
_domainHandler.getICEDomainID());
}

View file

@ -78,6 +78,9 @@ PacketVersion versionForPacketType(PacketType packetType) {
return 2;
case PacketTypeAudioStreamStats:
return 1;
case PacketTypeIceServerHeartbeat:
case PacketTypeIceServerQuery:
return 1;
default:
return 0;
}
@ -125,7 +128,9 @@ QString nameForPacketType(PacketType packetType) {
PACKET_TYPE_NAME_LOOKUP(PacketTypeEntityEditNack);
PACKET_TYPE_NAME_LOOKUP(PacketTypeSignedTransactionPayment);
PACKET_TYPE_NAME_LOOKUP(PacketTypeIceServerHeartbeat);
PACKET_TYPE_NAME_LOOKUP(PacketTypeIceServerHeartbeatResponse);
PACKET_TYPE_NAME_LOOKUP(PacketTypeDomainServerAddedNode);
PACKET_TYPE_NAME_LOOKUP(PacketTypeIceServerQuery);
PACKET_TYPE_NAME_LOOKUP(PacketTypeIceServerPeerInformation);
PACKET_TYPE_NAME_LOOKUP(PacketTypeUnverifiedPing);
PACKET_TYPE_NAME_LOOKUP(PacketTypeUnverifiedPingReply);
PACKET_TYPE_NAME_LOOKUP(PacketTypeEntityAdd);
@ -149,33 +154,33 @@ int populatePacketHeaderWithUUID(QByteArray& packet, PacketType packetType, cons
if (packet.size() < numBytesForPacketHeaderGivenPacketType(packetType)) {
packet.resize(numBytesForPacketHeaderGivenPacketType(packetType));
}
return populatePacketHeaderWithUUID(packet.data(), packetType, connectionUUID);
}
int populatePacketHeaderWithUUID(char* packet, PacketType packetType, const QUuid& connectionUUID) {
int numTypeBytes = packArithmeticallyCodedValue(packetType, packet);
packet[numTypeBytes] = versionForPacketType(packetType);
char* position = packet + numTypeBytes + sizeof(PacketVersion);
QByteArray rfcUUID = connectionUUID.toRfc4122();
memcpy(position, rfcUUID.constData(), NUM_BYTES_RFC4122_UUID);
position += NUM_BYTES_RFC4122_UUID;
if (!NON_VERIFIED_PACKETS.contains(packetType)) {
// pack 16 bytes of zeros where the md5 hash will be placed once data is packed
memset(position, 0, NUM_BYTES_MD5_HASH);
position += NUM_BYTES_MD5_HASH;
}
if (SEQUENCE_NUMBERED_PACKETS.contains(packetType)) {
// Pack zeros for the number of bytes that the sequence number requires.
// The LimitedNodeList will handle packing in the sequence number when sending out the packet.
memset(position, 0, sizeof(PacketSequenceNumber));
position += sizeof(PacketSequenceNumber);
}
// return the number of bytes written for pointer pushing
return position - packet;
}
@ -235,13 +240,13 @@ PacketSequenceNumber sequenceNumberFromHeader(const QByteArray& packet, PacketTy
if (packetType == PacketTypeUnknown) {
packetType = packetTypeForPacket(packet);
}
PacketSequenceNumber result = DEFAULT_SEQUENCE_NUMBER;
if (SEQUENCE_NUMBERED_PACKETS.contains(packetType)) {
memcpy(&result, packet.data() + sequenceNumberOffsetForPacketType(packetType), sizeof(PacketSequenceNumber));
}
return result;
}
@ -249,7 +254,7 @@ void replaceHashInPacket(QByteArray& packet, const QUuid& connectionUUID, Packet
if (packetType == PacketTypeUnknown) {
packetType = packetTypeForPacket(packet);
}
packet.replace(hashOffsetForPacketType(packetType), NUM_BYTES_MD5_HASH,
hashForPacketAndConnectionUUID(packet, connectionUUID));
}
@ -258,7 +263,7 @@ void replaceSequenceNumberInPacket(QByteArray& packet, PacketSequenceNumber sequ
if (packetType == PacketTypeUnknown) {
packetType = packetTypeForPacket(packet);
}
packet.replace(sequenceNumberOffsetForPacketType(packetType),
sizeof(PacketSequenceNumber), reinterpret_cast<char*>(&sequenceNumber), sizeof(PacketSequenceNumber));
}
@ -268,7 +273,7 @@ void replaceHashAndSequenceNumberInPacket(QByteArray& packet, const QUuid& conne
if (packetType == PacketTypeUnknown) {
packetType = packetTypeForPacket(packet);
}
replaceHashInPacket(packet, connectionUUID, packetType);
replaceSequenceNumberInPacket(packet, sequenceNumber, packetType);
}

View file

@ -25,6 +25,7 @@
// NOTE: if adding a new packet packetType, you can replace one marked usable or add at the end
// NOTE: if you want the name of the packet packetType to be available for debugging or logging, update nameForPacketType() as well
enum PacketType {
PacketTypeUnknown, // 0
PacketTypeStunResponse,
@ -50,8 +51,8 @@ enum PacketType {
PacketTypeDomainServerPathQuery,
PacketTypeDomainServerPathResponse,
PacketTypeDomainServerAddedNode,
UNUSED_4,
UNUSED_5, // 25
PacketTypeIceServerPeerInformation,
PacketTypeIceServerQuery, // 25
PacketTypeOctreeStats,
PacketTypeJurisdiction,
PacketTypeJurisdictionRequest,
@ -77,7 +78,6 @@ enum PacketType {
PacketTypeEntityEditNack,
PacketTypeSignedTransactionPayment,
PacketTypeIceServerHeartbeat, // 50
PacketTypeIceServerHeartbeatResponse,
PacketTypeUnverifiedPing,
PacketTypeUnverifiedPingReply,
PacketTypeParticleEntitiesFix
@ -96,8 +96,9 @@ const QSet<PacketType> NON_VERIFIED_PACKETS = QSet<PacketType>()
<< PacketTypeCreateAssignment << PacketTypeRequestAssignment << PacketTypeStunResponse
<< PacketTypeNodeJsonStats << PacketTypeEntityQuery
<< PacketTypeOctreeDataNack << PacketTypeEntityEditNack
<< PacketTypeIceServerHeartbeat << PacketTypeIceServerHeartbeatResponse
<< PacketTypeUnverifiedPing << PacketTypeUnverifiedPingReply << PacketTypeStopNode
<< PacketTypeIceServerHeartbeat << PacketTypeIceServerPeerInformation
<< PacketTypeIceServerQuery << PacketTypeUnverifiedPing
<< PacketTypeUnverifiedPingReply << PacketTypeStopNode
<< PacketTypeDomainServerPathQuery << PacketTypeDomainServerPathResponse
<< PacketTypeDomainServerAddedNode;