// // Socket.cpp // libraries/networking/src/udt // // Created by Stephen Birarda on 2015-07-20. // Copyright 2015 High Fidelity, Inc. // // Distributed under the Apache License, Version 2.0. // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html // #include "Socket.h" #include #include #include "../NetworkLogging.h" #include "Connection.h" #include "ControlPacket.h" #include "Packet.h" #include "../NLPacket.h" using namespace udt; Socket::Socket(QObject* parent) : QObject(parent) { connect(&_udpSocket, &QUdpSocket::readyRead, this, &Socket::readPendingDatagrams); // make sure our synchronization method is called every SYN interval connect(&_synTimer, &QTimer::timeout, this, &Socket::rateControlSync); // start our timer for the synchronization time interval _synTimer.start(_synInterval); } void Socket::rebind() { quint16 oldPort = _udpSocket.localPort(); _udpSocket.close(); bind(QHostAddress::AnyIPv4, oldPort); } void Socket::setSystemBufferSizes() { for (int i = 0; i < 2; i++) { QAbstractSocket::SocketOption bufferOpt; QString bufferTypeString; int numBytes = 0; if (i == 0) { bufferOpt = QAbstractSocket::SendBufferSizeSocketOption; numBytes = udt::UDP_SEND_BUFFER_SIZE_BYTES; bufferTypeString = "send"; } else { bufferOpt = QAbstractSocket::ReceiveBufferSizeSocketOption; numBytes = udt::UDP_RECEIVE_BUFFER_SIZE_BYTES; bufferTypeString = "receive"; } int oldBufferSize = _udpSocket.socketOption(bufferOpt).toInt(); if (oldBufferSize < numBytes) { _udpSocket.setSocketOption(bufferOpt, QVariant(numBytes)); int newBufferSize = _udpSocket.socketOption(bufferOpt).toInt(); qCDebug(networking) << "Changed socket" << bufferTypeString << "buffer size from" << oldBufferSize << "to" << newBufferSize << "bytes"; } else { // don't make the buffer smaller qCDebug(networking) << "Did not change socket" << bufferTypeString << "buffer size from" << oldBufferSize << "since it is larger than desired size of" << numBytes; } } } qint64 Socket::writeBasePacket(const udt::BasePacket& packet, const HifiSockAddr &sockAddr) { // Since this is a base packet we have no way to know if this is reliable or not - we just fire it off // this should not be called with an instance of Packet Q_ASSERT_X(!dynamic_cast(&packet), "Socket::writeBasePacket", "Cannot send a Packet/NLPacket via writeBasePacket"); return writeDatagram(packet.getData(), packet.getDataSize(), sockAddr); } qint64 Socket::writePacket(const Packet& packet, const HifiSockAddr& sockAddr) { Q_ASSERT_X(!packet.isReliable(), "Socket::writePacket", "Cannot send a reliable packet unreliably"); // write the correct sequence number to the Packet here packet.writeSequenceNumber(++_unreliableSequenceNumbers[sockAddr]); return writeDatagram(packet.getData(), packet.getDataSize(), sockAddr); } qint64 Socket::writePacket(std::unique_ptr packet, const HifiSockAddr& sockAddr) { if (packet->isReliable()) { findOrCreateConnection(sockAddr).sendReliablePacket(move(packet)); return 0; } return writePacket(*packet, sockAddr); } qint64 Socket::writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr) { return writeDatagram(QByteArray::fromRawData(data, size), sockAddr); } qint64 Socket::writeDatagram(const QByteArray& datagram, const HifiSockAddr& sockAddr) { qint64 bytesWritten = _udpSocket.writeDatagram(datagram, sockAddr.getAddress(), sockAddr.getPort()); if (bytesWritten < 0) { // when saturating a link this isn't an uncommon message - suppress it so it doesn't bomb the debug static const QString WRITE_ERROR_REGEX = "Socket::writeDatagram QAbstractSocket::NetworkError - Unable to send a message"; static QString repeatedMessage = LogHandler::getInstance().addRepeatedMessageRegex(WRITE_ERROR_REGEX); qCDebug(networking) << "Socket::writeDatagram" << _udpSocket.error() << "-" << qPrintable(_udpSocket.errorString()); } return bytesWritten; } Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) { auto it = _connectionsHash.find(sockAddr); if (it == _connectionsHash.end()) { auto connection = std::unique_ptr(new Connection(this, sockAddr, _ccFactory->create())); it = _connectionsHash.insert(it, std::make_pair(sockAddr, std::move(connection))); } return *it->second; } void Socket::readPendingDatagrams() { int packetSizeWithHeader = -1; while ((packetSizeWithHeader = _udpSocket.pendingDatagramSize()) != -1) { // setup a HifiSockAddr to read into HifiSockAddr senderSockAddr; // setup a buffer to read the packet into auto buffer = std::unique_ptr(new char[packetSizeWithHeader]); // pull the datagram _udpSocket.readDatagram(buffer.get(), packetSizeWithHeader, senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer()); auto it = _unfilteredHandlers.find(senderSockAddr); if (it != _unfilteredHandlers.end()) { // we have a registered unfiltered handler for this HifiSockAddr - call that and return if (it->second) { auto basePacket = BasePacket::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr); it->second(std::move(basePacket)); } return; } // check if this was a control packet or a data packet bool isControlPacket = *reinterpret_cast(buffer.get()) & CONTROL_BIT_MASK; if (isControlPacket) { // setup a control packet from the data we just read auto controlPacket = ControlPacket::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr); // move this control packet to the matching connection auto& connection = findOrCreateConnection(senderSockAddr); connection.processControl(move(controlPacket)); } else { // setup a Packet from the data we just read auto packet = Packet::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr); // call our verification operator to see if this packet is verified if (!_packetFilterOperator || _packetFilterOperator(*packet)) { if (packet->isReliable()) { // if this was a reliable packet then signal the matching connection with the sequence number auto& connection = findOrCreateConnection(senderSockAddr); connection.processReceivedSequenceNumber(packet->getSequenceNumber(), packet->getDataSize(), packet->getPayloadSize()); } if (_packetHandler) { // call the verified packet callback to let it handle this packet _packetHandler(std::move(packet)); } } } } } void Socket::connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* receiver, const char* slot) { auto it = _connectionsHash.find(destinationAddr); if (it != _connectionsHash.end()) { connect(it->second.get(), SIGNAL(packetSent()), receiver, slot); } } void Socket::rateControlSync() { // enumerate our list of connections and ask each of them to send off periodic ACK packet for rate control for (auto& connection : _connectionsHash) { connection.second->sync(); } if (_synTimer.interval() != _synInterval) { // if the _synTimer interval doesn't match the current _synInterval (changes when the CC factory is changed) // then restart it now with the right interval _synTimer.start(_synInterval); } } void Socket::setCongestionControlFactory(std::unique_ptr ccFactory) { // swap the current unique_ptr for the new factory _ccFactory.swap(ccFactory); // update the _synInterval to the value from the factory _synInterval = _ccFactory->synInterval(); } ConnectionStats::Stats Socket::sampleStatsForConnection(const HifiSockAddr& destination) { Q_ASSERT_X(thread() == QThread::currentThread(), "Socket::sampleStatsForConnection", "Stats sampling for connection must be on socket thread"); auto it = _connectionsHash.find(destination); if (it != _connectionsHash.end()) { return it->second->sampleStats(); } else { return ConnectionStats::Stats(); } } std::vector Socket::getSockAddr() { std::vector addr; addr.reserve(_connectionsHash.size()); for (const auto& connectionPair : _connectionsHash) { addr.push_back(connectionPair.first); } return std::move(addr); }