Abstract WebRTCDataChannel into a QUdpSocket-style WebRTCSocket

This commit is contained in:
David Rowe 2021-06-26 21:20:26 +12:00
parent 5b937a1580
commit d65ecead9f
5 changed files with 322 additions and 32 deletions

View file

@ -165,10 +165,6 @@ bool DomainServer::forwardMetaverseAPIRequest(HTTPConnection* connection,
DomainServer::DomainServer(int argc, char* argv[]) :
QCoreApplication(argc, argv),
_gatekeeper(this),
#ifdef WEBRTC_DATA_CHANNELS
_webrtcSignalingServer(QHostAddress::AnyIPv4, DEFAULT_DOMAIN_SERVER_WS_PORT, this),
_webrtcDataChannels(NodeType::DomainServer, this),
#endif
_httpManager(QHostAddress::AnyIPv4, DOMAIN_SERVER_HTTP_PORT,
QString("%1/resources/web/").arg(QCoreApplication::applicationDirPath()), this)
{
@ -252,8 +248,6 @@ DomainServer::DomainServer(int argc, char* argv[]) :
updateDownstreamNodes();
updateUpstreamNodes();
setUpWebRTC();
if (_type != NonMetaverse) {
// if we have a metaverse domain, we'll use an access token for API calls
resetAccountManagerAccessToken();
@ -3139,20 +3133,6 @@ void DomainServer::updateUpstreamNodes() {
updateReplicationNodes(Upstream);
}
void DomainServer::setUpWebRTC() {
#ifdef WEBRTC_DATA_CHANNELS
// Inbound WebRTC signaling messages received from a client.
connect(&_webrtcSignalingServer, &WebRTCSignalingServer::messageReceived,
&_webrtcDataChannels, &WebRTCDataChannels::onSignalingMessage);
// Outbound WebRTC signaling messages being sent to a client.
connect(&_webrtcDataChannels, &WebRTCDataChannels::signalingMessage,
&_webrtcSignalingServer, &WebRTCSignalingServer::sendMessage);
#endif
}
void DomainServer::initializeExporter() {
static const QString ENABLE_EXPORTER = "monitoring.enable_prometheus_exporter";
static const QString EXPORTER_PORT = "monitoring.prometheus_exporter_port";

View file

@ -27,11 +27,6 @@
#include <Assignment.h>
#include <HTTPSConnection.h>
#include <LimitedNodeList.h>
#include <shared/WebRTC.h>
#if defined(WEBRTC_DATA_CHANNELS)
#include <webrtc/WebRTCDataChannels.h>
#include <webrtc/WebRTCSignalingServer.h>
#endif
#include "AssetsBackupHandler.h"
#include "DomainGatekeeper.h"
@ -147,8 +142,6 @@ private slots:
void updateDownstreamNodes();
void updateUpstreamNodes();
void setUpWebRTC();
void initializeExporter();
void initializeMetadataExporter();
@ -319,11 +312,6 @@ private:
std::unordered_map<int, std::unique_ptr<QTemporaryFile>> _pendingContentFiles;
QThread _assetClientThread;
#ifdef WEBRTC_DATA_CHANNELS
WebRTCSignalingServer _webrtcSignalingServer;
WebRTCDataChannels _webrtcDataChannels;
#endif
};

View file

@ -26,6 +26,8 @@ namespace udt {
static const int CONNECTION_SEND_BUFFER_SIZE_PACKETS = 8192;
static const int UDP_SEND_BUFFER_SIZE_BYTES = 1048576;
static const int UDP_RECEIVE_BUFFER_SIZE_BYTES = 1048576;
static const int WEBRTC_SEND_BUFFER_SIZE_BYTES = 1048576;
static const int WEBRTC_RECEIVE_BUFFER_SIZE_BYTES = 1048576;
static const int DEFAULT_SYN_INTERVAL_USECS = 10 * 1000;

View file

@ -0,0 +1,158 @@
//
// WebRTCSocket.cpp
// libraries/networking/src/webrtc
//
// Created by David Rowe on 21 Jun 2021.
// Copyright 2021 Vircadia contributors.
//
#include "WebRTCSocket.h"
#if defined(WEBRTC_DATA_CHANNELS)
#include "../NetworkLogging.h"
#include "../udt/Constants.h"
WebRTCSocket::WebRTCSocket(QObject* parent, NodeType_t nodeType) :
QObject(parent),
_parent(parent),
_signalingServer(this /*, QHostAddress::AnyIPv4, DEFAULT_DOMAIN_SERVER_WS_PORT*/),
_dataChannels(this, nodeType)
{
// Connect WebRTC signaling server and data channels.
connect(&_signalingServer, &WebRTCSignalingServer::messageReceived,
&_dataChannels, &WebRTCDataChannels::onSignalingMessage);
connect(&_dataChannels, &WebRTCDataChannels::signalingMessage,
&_signalingServer, &WebRTCSignalingServer::sendMessage);
// Route received data channel messages.
connect(&_dataChannels, &WebRTCDataChannels::dataMessage, this, &WebRTCSocket::onDataChannelReceivedMessage);
}
void WebRTCSocket::setSocketOption(QAbstractSocket::SocketOption option, const QVariant& value) {
clearError();
switch (option) {
case QAbstractSocket::SocketOption::ReceiveBufferSizeSocketOption:
case QAbstractSocket::SocketOption::SendBufferSizeSocketOption:
// WebRTC doesn't provide access to setting these buffer sizes.
break;
default:
setError(QAbstractSocket::SocketError::UnsupportedSocketOperationError, "Failed to set socket option");
qCCritical(networking_webrtc) << "WebRTCSocket::setSocketOption() not implemented for option:" << option;
}
}
QVariant WebRTCSocket::socketOption(QAbstractSocket::SocketOption option) {
clearError();
switch (option) {
case QAbstractSocket::SocketOption::ReceiveBufferSizeSocketOption:
// WebRTC doesn't provide access to the receive buffer size. Just use the default buffer size.
return udt::WEBRTC_RECEIVE_BUFFER_SIZE_BYTES;
case QAbstractSocket::SocketOption::SendBufferSizeSocketOption:
// WebRTC doesn't provide access to the send buffer size though it's probably 16MB. Just use the default buffer size.
return udt::WEBRTC_SEND_BUFFER_SIZE_BYTES;
default:
setError(QAbstractSocket::SocketError::UnsupportedSocketOperationError, "Failed to get socket option");
qCCritical(networking_webrtc) << "WebRTCSocket::getSocketOption() not implemented for option:" << option;
}
return QVariant();
}
bool WebRTCSocket::bind(const QHostAddress& address, quint16 port, QAbstractSocket::BindMode mode) {
// WebRTC data channels aren't bound to ports so just treat this as a successful operation.
auto wasBound = _isBound;
_isBound = _signalingServer.bind(address, port);
if (_isBound != wasBound) {
emit stateChanged(_isBound ? QAbstractSocket::BoundState : QAbstractSocket::UnconnectedState);
}
return _isBound;
}
QAbstractSocket::SocketState WebRTCSocket::state() const {
return _isBound ? QAbstractSocket::BoundState : QAbstractSocket::UnconnectedState;
}
void WebRTCSocket::abort() {
_dataChannels.reset();
}
qint64 WebRTCSocket::writeDatagram(const QByteArray& datagram, quint16 port) {
clearError();
if (_dataChannels.sendDataMessage(port, datagram)) {
return datagram.length();
}
setError(QAbstractSocket::SocketError::UnknownSocketError, "Failed to write datagram");
return -1;
}
qint64 WebRTCSocket::bytesToWrite(quint16 port) const {
return _dataChannels.getBufferedAmount(port);
}
bool WebRTCSocket::hasPendingDatagrams() const {
return _receivedQueue.length() > 0;
}
qint64 WebRTCSocket::pendingDatagramSize() const {
if (_receivedQueue.length() > 0) {
return _receivedQueue.head().second.length();
}
return -1;
}
qint64 WebRTCSocket::readDatagram(char* data, qint64 maxSize, QHostAddress* address, quint16* port) {
clearError();
if (_receivedQueue.length() > 0) {
auto datagram = _receivedQueue.dequeue();
auto length = std::min((qint64)datagram.second.length(), maxSize);
if (data) {
memcpy(data, datagram.second.constData(), length);
}
if (address) {
// WEBRTC TODO: Use signaling channel's remote WebSocket address? Or remote data channel address?
*address = QHostAddress::AnyIPv4;
}
if (port) {
*port = datagram.first;
}
return length;
}
setError(QAbstractSocket::SocketError::UnknownSocketError, "Failed to read datagram");
return -1;
}
QAbstractSocket::SocketError WebRTCSocket::error() const {
return _lastErrorType;
}
QString WebRTCSocket::errorString() const {
return _lastErrorString;
}
void WebRTCSocket::setError(QAbstractSocket::SocketError errorType, QString errorString) {
_lastErrorType = errorType;
}
void WebRTCSocket::clearError() {
_lastErrorType = QAbstractSocket::SocketError();
_lastErrorString = QString();
}
void WebRTCSocket::onDataChannelReceivedMessage(int dataChannelID, const QByteArray& message) {
_receivedQueue.enqueue(QPair<int, QByteArray>(dataChannelID, message));
emit readyRead();
}
#endif // WEBRTC_DATA_CHANNELS

View file

@ -0,0 +1,162 @@
//
// WebRTCSocket.h
// libraries/networking/src/webrtc
//
// Created by David Rowe on 21 Jun 2021.
// Copyright 2021 Vircadia contributors.
//
#ifndef vircadia_WebRTCSocket_h
#define vircadia_WebRTCSocket_h
#include <shared/WebRTC.h>
#if defined(WEBRTC_DATA_CHANNELS)
#include <QAbstractSocket>
#include <QObject>
#include <QQueue>
#include "WebRTCDataChannels.h"
#include "WebRTCSignalingServer.h"
/// @addtogroup Networking
/// @{
/// @brief Provides a QUdpSocket-style interface for using WebRTCDataChannels.
class WebRTCSocket : public QObject {
Q_OBJECT
public:
/// @brief Constructs a new WebRTCSocket object.
/// @param parent Qt parent object.
/// @param nodeType The type of node that the WebRTCsocket object is being used in.
WebRTCSocket(QObject* parent, NodeType_t nodeType);
/// @brief Nominally sets the value of a socket option.
/// @details Only <code>SendBufferSizeSocketOption</code> and <code>ReceiveBufferSizeSocketOption</code> options are handled
/// and for these no action is taken because these buffer sizes are not configurable in WebRTC.
/// Included for compatibility with the QUdpSocket interface.
/// @param option The socket option.
/// @param value The value of the socket option.
void setSocketOption(QAbstractSocket::SocketOption option, const QVariant& value);
/// @brief Nominally gets the value of a socket option.
/// @details Only <code>SendBufferSizeSocketOption</code> and <code>ReceiveBufferSizeSocketOption</code> options are handled
/// and for these only default values are returned because these buffer sizes are not configurable in WebRTC.
/// Included for compatibility with the QUdpSocket interface.
/// @param option The socket option.
/// @return The value of the socket option.
QVariant socketOption(QAbstractSocket::SocketOption option);
/// @brief Binds the WebRTC socket's signaling server to an address and port.
/// @details Note: WebRTC data connections aren't bound to an address or port. Their ports are negotiated as part of the
/// WebRTC peer connection process.
/// @param address The address to use for the signaling server.
/// @param port The port to use for the signaling server.
/// @param mode The bind mode. (Not used: included for compatibility with the QUdpSocket interface.)
/// @return <code>true</code> if the signaling server was successfully bound, <code>false</code> if it wasn't.
bool bind(const QHostAddress& address, quint16 port = 0, QAbstractSocket::BindMode mode
= QAbstractSocket::DefaultForPlatform);
/// @brief Gets the state of the socket.
/// @details In particular, QAbstractSocket::BoundState is returned if the socket is bound,
/// QAbstractSocket::UnconnectedState if it isn't.
/// @return The state of the socket.
QAbstractSocket::SocketState state() const;
/// @brief Immediately closes all connections and resets the socket.
void abort();
/// @brief Nominally gets the host port number.
/// @details
/// Included for compatibility with the QUdpSocket interface.
/// @return <code>0</code>
quint16 localPort() const { return 0; }
/// @brief Nominally gets the socket descriptor.
/// @details
/// Included for compatibility with the QUdpSocket interface.
/// @return <code>-1</code>
qintptr socketDescriptor() const { return -1; }
/// @brief Sends a datagram to the host on a data channel.
/// @param datagram The datagram to send.
/// @param port The data channel ID.
/// @return The number of bytes if successfully sent, otherwise <code>-1</code>.
qint64 writeDatagram(const QByteArray& datagram, quint16 port);
/// @brief Gets the number of bytes waiting to be written.
/// @param port The data channel ID.
/// @return The number of bytes waiting to be written.
qint64 bytesToWrite(quint16 port) const;
/// @brief Gets whether there's a datagram waiting to be read.
/// @return <code>true</code> if there's a datagram waiting to be read, <code>false</code> if there isn't.
bool hasPendingDatagrams() const;
/// @brief Gets the size of the first pending datagram.
/// @return the size of the first pending datagram; <code>-1</code> if there is no pending datagram.
qint64 pendingDatagramSize() const;
/// @brief Reads the next datagram, up to a maximum number of bytes.
/// @details Any remaining data in the datagram is lost.
/// @param data The destination to read the datagram into.
/// @param maxSize The maximum number of bytes to read.
/// @param address The destination to put the IP address that the datagram was read from. (Not currently set.)
/// @param port The destination to put the data channel ID that the datagram was read from.
/// @return The number of bytes read on success; <code>-1</code> if reading unsuccessful.
qint64 readDatagram(char* data, qint64 maxSize, QHostAddress* address = nullptr, quint16* port = nullptr);
/// @brief Gets the type of error that last occurred.
/// @return The type of error that last occurred.
QAbstractSocket::SocketError error() const;
/// @brief Gets the description of the error that last occurred.
/// @return The description of the error that last occurred.
QString errorString() const;
public slots:
/// @brief Handles the WebRTC data channel receiving a message.
/// @param dataChannelID The data channel that the message was received on.
/// @param message The message that was received.
/// @detail Queues the message to be read via readDatagram.
void onDataChannelReceivedMessage(int dataChannelID, const QByteArray& message);
signals:
/// @brief Emitted when the state of the socket changes.
void stateChanged(QAbstractSocket::SocketState socketState);
/// @brief Emitted each time new data becomes available for reading.
void readyRead();
private:
void setError(QAbstractSocket::SocketError errorType, QString errorString);
void clearError();
QObject* _parent;
WebRTCSignalingServer _signalingServer;
WebRTCDataChannels _dataChannels;
bool _isBound { false };
QQueue<QPair<int, QByteArray>> _receivedQueue; // Messages received are queued for reading from the "socket".
QAbstractSocket::SocketError _lastErrorType { QAbstractSocket::UnknownSocketError };
QString _lastErrorString;
};
/// @}
#endif // WEBRTC_DATA_CHANNELS
#endif // vircadia_WebRTCSocket_h