leverage udt::Socket in ice-server

This commit is contained in:
Stephen Birarda 2015-07-21 12:27:27 -07:00
parent da761d4c95
commit 4604bc5a3a
6 changed files with 57 additions and 63 deletions

View file

@ -34,8 +34,9 @@ IceServer::IceServer(int argc, char* argv[]) :
qDebug() << "monitoring http endpoint is listening on " << ICE_SERVER_MONITORING_PORT;
_serverSocket.bind(QHostAddress::AnyIPv4, ICE_SERVER_DEFAULT_PORT);
// call our process datagrams slot when the UDP socket has packets ready
connect(&_serverSocket, &QUdpSocket::readyRead, this, &IceServer::processDatagrams);
// set processPacket as the verified packet callback for the udt::Socket
using std::placeholders::_1;
_serverSocket.setVerifiedPacketCallback(std::bind(&IceServer::processPacket, this, _1));
// setup our timer to clear inactive peers
QTimer* inactivePeerTimer = new QTimer(this);
@ -44,58 +45,45 @@ IceServer::IceServer(int argc, char* argv[]) :
}
void IceServer::processDatagrams() {
HifiSockAddr sendingSockAddr;
while (_serverSocket.hasPendingDatagrams()) {
// setup a buffer to read the packet into
int packetSizeWithHeader = _serverSocket.pendingDatagramSize();
std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[packetSizeWithHeader]);
_serverSocket.readDatagram(buffer.get(), packetSizeWithHeader,
sendingSockAddr.getAddressPointer(), sendingSockAddr.getPortPointer());
void IceServer::processPacket(std::unique_ptr<udt::Packet> packet) {
PacketType::Value packetType = packet->getType();
if (packetType == PacketType::ICEServerHeartbeat) {
SharedNetworkPeer peer = addOrUpdateHeartbeatingPeer(*packet);
auto packet = udt::Packet::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, sendingSockAddr);
PacketType::Value packetType = packet->getType();
if (packetType == PacketType::ICEServerHeartbeat) {
SharedNetworkPeer peer = addOrUpdateHeartbeatingPeer(*packet);
// so that we can send packets to the heartbeating peer when we need, we need to activate a socket now
peer->activateMatchingOrNewSymmetricSocket(sendingSockAddr);
} else if (packetType == PacketType::ICEServerQuery) {
QDataStream heartbeatStream(packet.get());
// this is a node hoping to connect to a heartbeating peer - do we have the heartbeating peer?
QUuid senderUUID;
heartbeatStream >> senderUUID;
// pull the public and private sock addrs for this peer
HifiSockAddr publicSocket, localSocket;
heartbeatStream >> publicSocket >> localSocket;
// check if this node also included a UUID that they would like to connect to
QUuid connectRequestID;
heartbeatStream >> connectRequestID;
// so that we can send packets to the heartbeating peer when we need, we need to activate a socket now
peer->activateMatchingOrNewSymmetricSocket(packet->getSenderSockAddr());
} else if (packetType == PacketType::ICEServerQuery) {
QDataStream heartbeatStream(packet.get());
// this is a node hoping to connect to a heartbeating peer - do we have the heartbeating peer?
QUuid senderUUID;
heartbeatStream >> senderUUID;
// pull the public and private sock addrs for this peer
HifiSockAddr publicSocket, localSocket;
heartbeatStream >> publicSocket >> localSocket;
// check if this node also included a UUID that they would like to connect to
QUuid connectRequestID;
heartbeatStream >> connectRequestID;
SharedNetworkPeer matchingPeer = _activePeers.value(connectRequestID);
if (matchingPeer) {
SharedNetworkPeer matchingPeer = _activePeers.value(connectRequestID);
if (matchingPeer) {
qDebug() << "Sending information for peer" << connectRequestID << "to peer" << senderUUID;
// we have the peer they want to connect to - send them pack the information for that peer
sendPeerInformationPacket(*(matchingPeer.data()), &sendingSockAddr);
// we also need to send them to the active peer they are hoping to connect to
// create a dummy peer object we can pass to sendPeerInformationPacket
NetworkPeer dummyPeer(senderUUID, publicSocket, localSocket);
sendPeerInformationPacket(dummyPeer, matchingPeer->getActiveSocket());
} else {
qDebug() << "Peer" << senderUUID << "asked for" << connectRequestID << "but no matching peer found";
}
qDebug() << "Sending information for peer" << connectRequestID << "to peer" << senderUUID;
// we have the peer they want to connect to - send them pack the information for that peer
sendPeerInformationPacket(*(matchingPeer.data()), &packet->getSenderSockAddr());
// we also need to send them to the active peer they are hoping to connect to
// create a dummy peer object we can pass to sendPeerInformationPacket
NetworkPeer dummyPeer(senderUUID, publicSocket, localSocket);
sendPeerInformationPacket(dummyPeer, matchingPeer->getActiveSocket());
} else {
qDebug() << "Peer" << senderUUID << "asked for" << connectRequestID << "but no matching peer found";
}
}
}
@ -139,8 +127,7 @@ void IceServer::sendPeerInformationPacket(const NetworkPeer& peer, const HifiSoc
peerPacket->write(peer.toByteArray());
// write the current packet
_serverSocket.writeDatagram(peerPacket->getData(), peerPacket->getDataSize(),
destinationSockAddr->getAddress(), destinationSockAddr->getPort());
_serverSocket.writeUnreliablePacket(*peerPacket, *destinationSockAddr);
}
void IceServer::clearInactivePeers() {

View file

@ -20,6 +20,7 @@
#include <HTTPConnection.h>
#include <HTTPManager.h>
#include <udt/Packet.h>
#include <udt/Socket.h>
typedef QHash<QUuid, SharedNetworkPeer> NetworkPeerHash;
@ -29,15 +30,15 @@ public:
IceServer(int argc, char* argv[]);
bool handleHTTPRequest(HTTPConnection* connection, const QUrl& url, bool skipSubHandler = false);
private slots:
void processDatagrams();
void clearInactivePeers();
private:
void processPacket(std::unique_ptr<udt::Packet> packet);
SharedNetworkPeer addOrUpdateHeartbeatingPeer(udt::Packet& incomingPacket);
void sendPeerInformationPacket(const NetworkPeer& peer, const HifiSockAddr* destinationSockAddr);
QUuid _id;
QUdpSocket _serverSocket;
udt::Socket _serverSocket;
NetworkPeerHash _activePeers;
HTTPManager _httpManager;
};

View file

@ -97,7 +97,7 @@ LimitedNodeList::LimitedNodeList(unsigned short socketListenPort, unsigned short
// check the local socket right now
updateLocalSockAddr();
// set &PacketReceiver::handleVerifiedPacket as the verified packet function for the udt::Socket
// set &PacketReceiver::handleVerifiedPacket as the verified packet callback for the udt::Socket
using std::placeholders::_1;
_nodeSocket.setVerifiedPacketCallback(std::bind(&PacketReceiver::handleVerifiedPacket, _packetReceiver, _1));
@ -273,16 +273,16 @@ qint64 LimitedNodeList::writePacket(const NLPacket& packet, const HifiSockAddr&
emit dataSent(NodeType::Unassigned, packet.getDataSize());
return writeDatagram(QByteArray::fromRawData(packet.getData(), packet.getDataSize()), destinationSockAddr);
return writePacketAndCollectStats(packet, destinationSockAddr);
}
qint64 LimitedNodeList::writeDatagram(const QByteArray& datagram, const HifiSockAddr& destinationSockAddr) {
qint64 LimitedNodeList::writePacketAndCollectStats(const NLPacket& packet, const HifiSockAddr& destinationSockAddr) {
// XXX can BandwidthRecorder be used for this?
// stat collection for packets
++_numCollectedPackets;
_numCollectedBytes += datagram.size();
_numCollectedBytes += packet.getDataSize();
qint64 bytesWritten = _nodeSocket.writeDatagram(datagram, destinationSockAddr);
qint64 bytesWritten = _nodeSocket.writeUnreliablePacket(packet, destinationSockAddr);
return bytesWritten;
}

View file

@ -251,7 +251,7 @@ protected:
qint64 writePacket(const NLPacket& packet, const Node& destinationNode);
qint64 writePacket(const NLPacket& packet, const HifiSockAddr& destinationSockAddr,
const QUuid& connectionSecret = QUuid());
qint64 writeDatagram(const QByteArray& datagram, const HifiSockAddr& destinationSockAddr);
qint64 writePacketAndCollectStats(const NLPacket& packet, const HifiSockAddr& destinationSockAddr);
PacketSequenceNumber getNextSequenceNumberForPacket(const QUuid& nodeUUID, PacketType::Value packetType);

View file

@ -57,6 +57,10 @@ void Socket::setBufferSizes(int numBytes) {
}
}
qint64 Socket::writeUnreliablePacket(const Packet& packet, const HifiSockAddr& sockAddr) {
return writeDatagram(packet.getData(), packet.getDataSize(), sockAddr);
}
qint64 Socket::writeDatagram(const QByteArray& datagram, const HifiSockAddr& sockAddr) {
qint64 bytesWritten = _udpSocket.writeDatagram(datagram, sockAddr.getAddress(), sockAddr.getPort());

View file

@ -34,6 +34,8 @@ public:
quint16 localPort() const { return _udpSocket.localPort(); }
qint64 writeUnreliablePacket(const Packet& packet, const HifiSockAddr& sockAddr);
qint64 writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr)
{ return writeDatagram(QByteArray::fromRawData(data, size), sockAddr); }
qint64 writeDatagram(const QByteArray& datagram, const HifiSockAddr& sockAddr);