diff options
Diffstat (limited to 'third_party/libwebrtc/examples/peerconnection/client/peer_connection_client.cc')
-rw-r--r-- | third_party/libwebrtc/examples/peerconnection/client/peer_connection_client.cc | 489 |
1 files changed, 489 insertions, 0 deletions
diff --git a/third_party/libwebrtc/examples/peerconnection/client/peer_connection_client.cc b/third_party/libwebrtc/examples/peerconnection/client/peer_connection_client.cc new file mode 100644 index 0000000000..2746752d80 --- /dev/null +++ b/third_party/libwebrtc/examples/peerconnection/client/peer_connection_client.cc @@ -0,0 +1,489 @@ +/* + * Copyright 2012 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "examples/peerconnection/client/peer_connection_client.h" + +#include "api/units/time_delta.h" +#include "examples/peerconnection/client/defaults.h" +#include "rtc_base/checks.h" +#include "rtc_base/logging.h" +#include "rtc_base/net_helpers.h" + +namespace { + +// This is our magical hangup signal. +constexpr char kByeMessage[] = "BYE"; +// Delay between server connection retries, in milliseconds +constexpr webrtc::TimeDelta kReconnectDelay = webrtc::TimeDelta::Seconds(2); + +rtc::Socket* CreateClientSocket(int family) { + rtc::Thread* thread = rtc::Thread::Current(); + RTC_DCHECK(thread != NULL); + return thread->socketserver()->CreateSocket(family, SOCK_STREAM); +} + +} // namespace + +PeerConnectionClient::PeerConnectionClient() + : callback_(NULL), resolver_(NULL), state_(NOT_CONNECTED), my_id_(-1) {} + +PeerConnectionClient::~PeerConnectionClient() = default; + +void PeerConnectionClient::InitSocketSignals() { + RTC_DCHECK(control_socket_.get() != NULL); + RTC_DCHECK(hanging_get_.get() != NULL); + control_socket_->SignalCloseEvent.connect(this, + &PeerConnectionClient::OnClose); + hanging_get_->SignalCloseEvent.connect(this, &PeerConnectionClient::OnClose); + control_socket_->SignalConnectEvent.connect(this, + &PeerConnectionClient::OnConnect); + hanging_get_->SignalConnectEvent.connect( + this, &PeerConnectionClient::OnHangingGetConnect); + control_socket_->SignalReadEvent.connect(this, &PeerConnectionClient::OnRead); + hanging_get_->SignalReadEvent.connect( + this, &PeerConnectionClient::OnHangingGetRead); +} + +int PeerConnectionClient::id() const { + return my_id_; +} + +bool PeerConnectionClient::is_connected() const { + return my_id_ != -1; +} + +const Peers& PeerConnectionClient::peers() const { + return peers_; +} + +void PeerConnectionClient::RegisterObserver( + PeerConnectionClientObserver* callback) { + RTC_DCHECK(!callback_); + callback_ = callback; +} + +void PeerConnectionClient::Connect(const std::string& server, + int port, + const std::string& client_name) { + RTC_DCHECK(!server.empty()); + RTC_DCHECK(!client_name.empty()); + + if (state_ != NOT_CONNECTED) { + RTC_LOG(LS_WARNING) + << "The client must not be connected before you can call Connect()"; + callback_->OnServerConnectionFailure(); + return; + } + + if (server.empty() || client_name.empty()) { + callback_->OnServerConnectionFailure(); + return; + } + + if (port <= 0) + port = kDefaultServerPort; + + server_address_.SetIP(server); + server_address_.SetPort(port); + client_name_ = client_name; + + if (server_address_.IsUnresolvedIP()) { + state_ = RESOLVING; + resolver_ = new rtc::AsyncResolver(); + resolver_->SignalDone.connect(this, &PeerConnectionClient::OnResolveResult); + resolver_->Start(server_address_); + } else { + DoConnect(); + } +} + +void PeerConnectionClient::OnResolveResult( + rtc::AsyncResolverInterface* resolver) { + if (resolver_->GetError() != 0) { + callback_->OnServerConnectionFailure(); + resolver_->Destroy(false); + resolver_ = NULL; + state_ = NOT_CONNECTED; + } else { + server_address_ = resolver_->address(); + DoConnect(); + } +} + +void PeerConnectionClient::DoConnect() { + control_socket_.reset(CreateClientSocket(server_address_.ipaddr().family())); + hanging_get_.reset(CreateClientSocket(server_address_.ipaddr().family())); + InitSocketSignals(); + char buffer[1024]; + snprintf(buffer, sizeof(buffer), "GET /sign_in?%s HTTP/1.0\r\n\r\n", + client_name_.c_str()); + onconnect_data_ = buffer; + + bool ret = ConnectControlSocket(); + if (ret) + state_ = SIGNING_IN; + if (!ret) { + callback_->OnServerConnectionFailure(); + } +} + +bool PeerConnectionClient::SendToPeer(int peer_id, const std::string& message) { + if (state_ != CONNECTED) + return false; + + RTC_DCHECK(is_connected()); + RTC_DCHECK(control_socket_->GetState() == rtc::Socket::CS_CLOSED); + if (!is_connected() || peer_id == -1) + return false; + + char headers[1024]; + snprintf(headers, sizeof(headers), + "POST /message?peer_id=%i&to=%i HTTP/1.0\r\n" + "Content-Length: %zu\r\n" + "Content-Type: text/plain\r\n" + "\r\n", + my_id_, peer_id, message.length()); + onconnect_data_ = headers; + onconnect_data_ += message; + return ConnectControlSocket(); +} + +bool PeerConnectionClient::SendHangUp(int peer_id) { + return SendToPeer(peer_id, kByeMessage); +} + +bool PeerConnectionClient::IsSendingMessage() { + return state_ == CONNECTED && + control_socket_->GetState() != rtc::Socket::CS_CLOSED; +} + +bool PeerConnectionClient::SignOut() { + if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT) + return true; + + if (hanging_get_->GetState() != rtc::Socket::CS_CLOSED) + hanging_get_->Close(); + + if (control_socket_->GetState() == rtc::Socket::CS_CLOSED) { + state_ = SIGNING_OUT; + + if (my_id_ != -1) { + char buffer[1024]; + snprintf(buffer, sizeof(buffer), + "GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_); + onconnect_data_ = buffer; + return ConnectControlSocket(); + } else { + // Can occur if the app is closed before we finish connecting. + return true; + } + } else { + state_ = SIGNING_OUT_WAITING; + } + + return true; +} + +void PeerConnectionClient::Close() { + control_socket_->Close(); + hanging_get_->Close(); + onconnect_data_.clear(); + peers_.clear(); + if (resolver_ != NULL) { + resolver_->Destroy(false); + resolver_ = NULL; + } + my_id_ = -1; + state_ = NOT_CONNECTED; +} + +bool PeerConnectionClient::ConnectControlSocket() { + RTC_DCHECK(control_socket_->GetState() == rtc::Socket::CS_CLOSED); + int err = control_socket_->Connect(server_address_); + if (err == SOCKET_ERROR) { + Close(); + return false; + } + return true; +} + +void PeerConnectionClient::OnConnect(rtc::Socket* socket) { + RTC_DCHECK(!onconnect_data_.empty()); + size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length()); + RTC_DCHECK(sent == onconnect_data_.length()); + onconnect_data_.clear(); +} + +void PeerConnectionClient::OnHangingGetConnect(rtc::Socket* socket) { + char buffer[1024]; + snprintf(buffer, sizeof(buffer), "GET /wait?peer_id=%i HTTP/1.0\r\n\r\n", + my_id_); + int len = static_cast<int>(strlen(buffer)); + int sent = socket->Send(buffer, len); + RTC_DCHECK(sent == len); +} + +void PeerConnectionClient::OnMessageFromPeer(int peer_id, + const std::string& message) { + if (message.length() == (sizeof(kByeMessage) - 1) && + message.compare(kByeMessage) == 0) { + callback_->OnPeerDisconnected(peer_id); + } else { + callback_->OnMessageFromPeer(peer_id, message); + } +} + +bool PeerConnectionClient::GetHeaderValue(const std::string& data, + size_t eoh, + const char* header_pattern, + size_t* value) { + RTC_DCHECK(value != NULL); + size_t found = data.find(header_pattern); + if (found != std::string::npos && found < eoh) { + *value = atoi(&data[found + strlen(header_pattern)]); + return true; + } + return false; +} + +bool PeerConnectionClient::GetHeaderValue(const std::string& data, + size_t eoh, + const char* header_pattern, + std::string* value) { + RTC_DCHECK(value != NULL); + size_t found = data.find(header_pattern); + if (found != std::string::npos && found < eoh) { + size_t begin = found + strlen(header_pattern); + size_t end = data.find("\r\n", begin); + if (end == std::string::npos) + end = eoh; + value->assign(data.substr(begin, end - begin)); + return true; + } + return false; +} + +bool PeerConnectionClient::ReadIntoBuffer(rtc::Socket* socket, + std::string* data, + size_t* content_length) { + char buffer[0xffff]; + do { + int bytes = socket->Recv(buffer, sizeof(buffer), nullptr); + if (bytes <= 0) + break; + data->append(buffer, bytes); + } while (true); + + bool ret = false; + size_t i = data->find("\r\n\r\n"); + if (i != std::string::npos) { + RTC_LOG(LS_INFO) << "Headers received"; + if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) { + size_t total_response_size = (i + 4) + *content_length; + if (data->length() >= total_response_size) { + ret = true; + std::string should_close; + const char kConnection[] = "\r\nConnection: "; + if (GetHeaderValue(*data, i, kConnection, &should_close) && + should_close.compare("close") == 0) { + socket->Close(); + // Since we closed the socket, there was no notification delivered + // to us. Compensate by letting ourselves know. + OnClose(socket, 0); + } + } else { + // We haven't received everything. Just continue to accept data. + } + } else { + RTC_LOG(LS_ERROR) << "No content length field specified by the server."; + } + } + return ret; +} + +void PeerConnectionClient::OnRead(rtc::Socket* socket) { + size_t content_length = 0; + if (ReadIntoBuffer(socket, &control_data_, &content_length)) { + size_t peer_id = 0, eoh = 0; + bool ok = + ParseServerResponse(control_data_, content_length, &peer_id, &eoh); + if (ok) { + if (my_id_ == -1) { + // First response. Let's store our server assigned ID. + RTC_DCHECK(state_ == SIGNING_IN); + my_id_ = static_cast<int>(peer_id); + RTC_DCHECK(my_id_ != -1); + + // The body of the response will be a list of already connected peers. + if (content_length) { + size_t pos = eoh + 4; + while (pos < control_data_.size()) { + size_t eol = control_data_.find('\n', pos); + if (eol == std::string::npos) + break; + int id = 0; + std::string name; + bool connected; + if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id, + &connected) && + id != my_id_) { + peers_[id] = name; + callback_->OnPeerConnected(id, name); + } + pos = eol + 1; + } + } + RTC_DCHECK(is_connected()); + callback_->OnSignedIn(); + } else if (state_ == SIGNING_OUT) { + Close(); + callback_->OnDisconnected(); + } else if (state_ == SIGNING_OUT_WAITING) { + SignOut(); + } + } + + control_data_.clear(); + + if (state_ == SIGNING_IN) { + RTC_DCHECK(hanging_get_->GetState() == rtc::Socket::CS_CLOSED); + state_ = CONNECTED; + hanging_get_->Connect(server_address_); + } + } +} + +void PeerConnectionClient::OnHangingGetRead(rtc::Socket* socket) { + RTC_LOG(LS_INFO) << __FUNCTION__; + size_t content_length = 0; + if (ReadIntoBuffer(socket, ¬ification_data_, &content_length)) { + size_t peer_id = 0, eoh = 0; + bool ok = + ParseServerResponse(notification_data_, content_length, &peer_id, &eoh); + + if (ok) { + // Store the position where the body begins. + size_t pos = eoh + 4; + + if (my_id_ == static_cast<int>(peer_id)) { + // A notification about a new member or a member that just + // disconnected. + int id = 0; + std::string name; + bool connected = false; + if (ParseEntry(notification_data_.substr(pos), &name, &id, + &connected)) { + if (connected) { + peers_[id] = name; + callback_->OnPeerConnected(id, name); + } else { + peers_.erase(id); + callback_->OnPeerDisconnected(id); + } + } + } else { + OnMessageFromPeer(static_cast<int>(peer_id), + notification_data_.substr(pos)); + } + } + + notification_data_.clear(); + } + + if (hanging_get_->GetState() == rtc::Socket::CS_CLOSED && + state_ == CONNECTED) { + hanging_get_->Connect(server_address_); + } +} + +bool PeerConnectionClient::ParseEntry(const std::string& entry, + std::string* name, + int* id, + bool* connected) { + RTC_DCHECK(name != NULL); + RTC_DCHECK(id != NULL); + RTC_DCHECK(connected != NULL); + RTC_DCHECK(!entry.empty()); + + *connected = false; + size_t separator = entry.find(','); + if (separator != std::string::npos) { + *id = atoi(&entry[separator + 1]); + name->assign(entry.substr(0, separator)); + separator = entry.find(',', separator + 1); + if (separator != std::string::npos) { + *connected = atoi(&entry[separator + 1]) ? true : false; + } + } + return !name->empty(); +} + +int PeerConnectionClient::GetResponseStatus(const std::string& response) { + int status = -1; + size_t pos = response.find(' '); + if (pos != std::string::npos) + status = atoi(&response[pos + 1]); + return status; +} + +bool PeerConnectionClient::ParseServerResponse(const std::string& response, + size_t content_length, + size_t* peer_id, + size_t* eoh) { + int status = GetResponseStatus(response.c_str()); + if (status != 200) { + RTC_LOG(LS_ERROR) << "Received error from server"; + Close(); + callback_->OnDisconnected(); + return false; + } + + *eoh = response.find("\r\n\r\n"); + RTC_DCHECK(*eoh != std::string::npos); + if (*eoh == std::string::npos) + return false; + + *peer_id = -1; + + // See comment in peer_channel.cc for why we use the Pragma header. + GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id); + + return true; +} + +void PeerConnectionClient::OnClose(rtc::Socket* socket, int err) { + RTC_LOG(LS_INFO) << __FUNCTION__; + + socket->Close(); + +#ifdef WIN32 + if (err != WSAECONNREFUSED) { +#else + if (err != ECONNREFUSED) { +#endif + if (socket == hanging_get_.get()) { + if (state_ == CONNECTED) { + hanging_get_->Close(); + hanging_get_->Connect(server_address_); + } + } else { + callback_->OnMessageSent(err); + } + } else { + if (socket == control_socket_.get()) { + RTC_LOG(LS_WARNING) << "Connection refused; retrying in 2 seconds"; + rtc::Thread::Current()->PostDelayedTask( + SafeTask(safety_.flag(), [this] { DoConnect(); }), kReconnectDelay); + } else { + Close(); + callback_->OnDisconnected(); + } + } +} |