summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2021-12-04 06:51:36 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2021-12-04 07:12:46 +0000
commit3da41867bf5d4fefd44d32cded5e532f3ef68214 (patch)
tree392ba9b8e55113fd66bc7d6c85b2db6332f9c59e
parentInitial commit. (diff)
downloadwebssh-3da41867bf5d4fefd44d32cded5e532f3ef68214.tar.xz
webssh-3da41867bf5d4fefd44d32cded5e532f3ef68214.zip
Adding upstream version 1.5.3+20210903+dfsg.upstream/1.5.3+20210903+dfsgupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
-rw-r--r--.coveragerc13
-rw-r--r--.github/FUNDING.yml5
-rw-r--r--.gitignore65
-rw-r--r--.travis.yml22
-rw-r--r--Dockerfile12
-rw-r--r--LICENSE21
-rw-r--r--MANIFEST.in13
-rw-r--r--README.md212
-rw-r--r--README.rst246
-rw-r--r--docker-compose.yml6
-rw-r--r--preview/login.pngbin0 -> 60907 bytes
-rw-r--r--preview/terminal.pngbin0 -> 98917 bytes
-rw-r--r--requirements.txt3
-rw-r--r--run.py5
-rw-r--r--setup.cfg9
-rw-r--r--setup.py40
-rw-r--r--tests/__init__.py0
-rw-r--r--tests/data/cert.crt21
-rw-r--r--tests/data/cert.key28
-rw-r--r--tests/data/fonts/.gitignore0
-rw-r--r--tests/data/fonts/fake-font0
-rw-r--r--tests/data/known_hosts_example1
-rw-r--r--tests/data/known_hosts_example21
-rw-r--r--tests/data/known_hosts_example31
-rw-r--r--tests/data/test_ed25519.key8
-rw-r--r--tests/data/test_ed25519_password.key8
-rw-r--r--tests/data/test_known_hosts1
-rw-r--r--tests/data/test_new_dsa.key21
-rw-r--r--tests/data/test_new_rsa_password.key39
-rw-r--r--tests/data/test_rsa.key15
-rw-r--r--tests/data/test_rsa_password.key18
-rw-r--r--tests/data/user_rsa_key15
-rw-r--r--tests/sshserver.py233
-rw-r--r--tests/test_app.py792
-rw-r--r--tests/test_handler.py279
-rw-r--r--tests/test_main.py22
-rw-r--r--tests/test_policy.py123
-rw-r--r--tests/test_settings.py187
-rw-r--r--tests/test_utils.py127
-rw-r--r--tests/utils.py52
-rw-r--r--webssh/__init__.py10
-rw-r--r--webssh/_version.py2
-rw-r--r--webssh/handler.py583
-rw-r--r--webssh/main.py58
-rw-r--r--webssh/policy.py86
-rw-r--r--webssh/settings.py198
-rw-r--r--webssh/static/css/fonts/.gitignore0
-rw-r--r--webssh/static/img/favicon.pngbin0 -> 5953 bytes
-rw-r--r--webssh/static/js/main.js858
-rw-r--r--webssh/templates/index.html101
-rw-r--r--webssh/utils.py145
-rw-r--r--webssh/worker.py125
52 files changed, 4830 insertions, 0 deletions
diff --git a/.coveragerc b/.coveragerc
new file mode 100644
index 0000000..8b32fc7
--- /dev/null
+++ b/.coveragerc
@@ -0,0 +1,13 @@
+[run]
+branch = true
+source = webssh
+
+[report]
+exclude_lines =
+ if self.debug:
+ pragma: no cover
+ raise NotImplementedError
+ if __name__ == .__main__.:
+ignore_errors = True
+omit =
+ tests/*
diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml
new file mode 100644
index 0000000..eebdb7e
--- /dev/null
+++ b/.github/FUNDING.yml
@@ -0,0 +1,5 @@
+# These are supported funding model platforms
+
+# github: huashengdun
+ko_fi: huashengdun
+custom: https://bit.ly/2XmXXIP
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..661e920
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,65 @@
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
+env/
+build/
+develop-eggs/
+dist/
+eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+*.egg-info/
+.installed.cfg
+*.egg
+
+# PyInstaller
+# Usually these files are written by a python script from a template
+# before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.coverage
+.cache
+.pytest_cache/
+nosetests.xml
+coverage.xml
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+*.log
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+target/
+
+# database file
+*.sqlite
+*.sqlite3
+*.db
+
+# temporary file
+*.swp
+
+# known_hosts file
+known_hosts
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..c526ef2
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,22 @@
+dist: xenial
+language: python
+
+python:
+ - "2.7"
+ - "3.4"
+ - "3.5"
+ - "3.6"
+ - "3.7"
+ - "3.8"
+
+install:
+ - pip install -r requirements.txt
+ - pip install 'pytest>=4.6' pytest-cov codecov flake8
+ - if [[ $TRAVIS_PYTHON_VERSION == '2.7' ]]; then pip install mock; fi
+
+script:
+ - pytest --cov=webssh
+ - flake8
+
+after_success:
+ - codecov
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 0000000..38dd2e0
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,12 @@
+FROM python:3.7-slim
+ADD . /code
+WORKDIR /code
+RUN \
+ groupadd -r webssh && \
+ useradd -r -s /bin/false -g webssh webssh && \
+ chown -R webssh:webssh /code && \
+ pip install -r requirements.txt
+
+EXPOSE 8888/tcp
+USER webssh
+CMD ["python", "run.py"]
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..8d1036e
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,21 @@
+The MIT License (MIT)
+
+Copyright (c) 2017 Shengdun Hua
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 0000000..bb85ddd
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1,13 @@
+include LICENSE
+
+recursive-include tests *
+prune tests/__pycache__
+prune tests/.pytest_cache
+
+recursive-include webssh *
+prune webssh/__pycache__
+prune webssh/.pytest_cache
+
+global-exclude *.pyc
+global-exclude *.log
+global-exclude .coverage
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..ad77175
--- /dev/null
+++ b/README.md
@@ -0,0 +1,212 @@
+## WebSSH
+
+[![Build Status](https://travis-ci.org/huashengdun/webssh.svg?branch=master)](https://travis-ci.org/huashengdun/webssh)
+[![codecov](https://codecov.io/gh/huashengdun/webssh/branch/master/graph/badge.svg)](https://codecov.io/gh/huashengdun/webssh)
+![PyPI - Python Version](https://img.shields.io/pypi/pyversions/webssh.svg)
+![PyPI](https://img.shields.io/pypi/v/webssh.svg)
+
+
+### Introduction
+
+A simple web application to be used as an ssh client to connect to your ssh servers. It is written in Python, base on tornado, paramiko and xterm.js.
+
+### Features
+
+* SSH password authentication supported, including empty password.
+* SSH public-key authentication supported, including DSA RSA ECDSA Ed25519 keys.
+* Encrypted keys supported.
+* Two-Factor Authentication (time-based one-time password) supported.
+* Fullscreen terminal supported.
+* Terminal window resizable.
+* Auto detect the ssh server's default encoding.
+* Modern browsers including Chrome, Firefox, Safari, Edge, Opera supported.
+
+
+### Preview
+
+![Login](preview/login.png)
+![Terminal](preview/terminal.png)
+
+
+### How it works
+```
++---------+ http +--------+ ssh +-----------+
+| browser | <==========> | webssh | <=======> | ssh server|
++---------+ websocket +--------+ ssh +-----------+
+```
+
+### Requirements
+
+* Python 2.7/3.4+
+
+
+### Quickstart
+
+1. Install this app, run command `pip install webssh`
+2. Start a webserver, run command `wssh`
+3. Open your browser, navigate to `127.0.0.1:8888`
+4. Input your data, submit the form.
+
+
+### Server options
+
+```bash
+# start a http server with specified listen address and listen port
+wssh --address='2.2.2.2' --port=8000
+
+# start a https server, certfile and keyfile must be passed
+wssh --certfile='/path/to/cert.crt' --keyfile='/path/to/cert.key'
+
+# missing host key policy
+wssh --policy=reject
+
+# logging level
+wssh --logging=debug
+
+# log to file
+wssh --log-file-prefix=main.log
+
+# more options
+wssh --help
+```
+
+### Browser console
+
+```javascript
+// connect to your ssh server
+wssh.connect(hostname, port, username, password, privatekey, passphrase, totp);
+
+// pass an object to wssh.connect
+var opts = {
+ hostname: 'hostname',
+ port: 'port',
+ username: 'username',
+ password: 'password',
+ privatekey: 'the private key text',
+ passphrase: 'passphrase',
+ totp: 'totp'
+};
+wssh.connect(opts);
+
+// without an argument, wssh will use the form data to connect
+wssh.connect();
+
+// set a new encoding for client to use
+wssh.set_encoding(encoding);
+
+// reset encoding to use the default one
+wssh.reset_encoding();
+
+// send a command to the server
+wssh.send('ls -l');
+```
+
+### Custom Font
+
+To use custom font, put your font file in the directory `webssh/static/css/fonts/` and restart the server.
+
+### URL Arguments
+
+Support passing arguments by url (query or fragment) like following examples:
+
+Passing form data (password must be encoded in base64, privatekey not supported)
+```bash
+http://localhost:8888/?hostname=xx&username=yy&password=str_base64_encoded
+```
+
+Passing a terminal background color
+```bash
+http://localhost:8888/#bgcolor=green
+```
+
+Passing a terminal font color
+```bash
+http://localhost:8888/#fontcolor=red
+```
+
+Passing a user defined title
+```bash
+http://localhost:8888/?title=my-ssh-server
+```
+
+Passing an encoding
+```bash
+http://localhost:8888/#encoding=gbk
+```
+
+Passing a font size
+```bash
+http://localhost:8888/#fontsize=24
+```
+
+Passing a command executed right after login
+```bash
+http://localhost:8888/?command=pwd
+```
+
+Passing a terminal type
+```bash
+http://localhost:8888/?term=xterm-256color
+```
+
+### Use Docker
+
+Start up the app
+```
+docker-compose up
+```
+
+Tear down the app
+```
+docker-compose down
+```
+
+### Tests
+
+Requirements
+```
+pip install pytest pytest-cov codecov flake8 mock
+```
+
+Use unittest to run all tests
+```
+python -m unittest discover tests
+```
+
+Use pytest to run all tests
+```
+python -m pytest tests
+```
+
+### Deployment
+
+Running behind an Nginx server
+
+```bash
+wssh --address='127.0.0.1' --port=8888 --policy=reject
+```
+```nginx
+# Nginx config example
+location / {
+ proxy_pass http://127.0.0.1:8888;
+ proxy_http_version 1.1;
+ proxy_read_timeout 300;
+ proxy_set_header Upgrade $http_upgrade;
+ proxy_set_header Connection "upgrade";
+ proxy_set_header Host $http_host;
+ proxy_set_header X-Real-IP $remote_addr;
+ proxy_set_header X-Real-PORT $remote_port;
+}
+```
+
+Running as a standalone server
+```bash
+wssh --port=8080 --sslport=4433 --certfile='cert.crt' --keyfile='cert.key' --xheaders=False --policy=reject
+```
+
+
+### Tips
+
+* For whatever deployment choice you choose, don't forget to enable SSL.
+* By default plain http requests from a public network will be either redirected or blocked and being redirected takes precedence over being blocked.
+* Try to use reject policy as the missing host key policy along with your verified known_hosts, this will prevent man-in-the-middle attacks. The idea is that it checks the system host keys file("~/.ssh/known_hosts") and the application host keys file("./known_hosts") in order, if the ssh server's hostname is not found or the key is not matched, the connection will be aborted.
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..f435d15
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,246 @@
+WebSSH
+------
+
+|Build Status| |codecov| |PyPI - Python Version| |PyPI|
+
+Introduction
+~~~~~~~~~~~~
+
+A simple web application to be used as an ssh client to connect to your
+ssh servers. It is written in Python, base on tornado, paramiko and
+xterm.js.
+
+Features
+~~~~~~~~
+
+- SSH password authentication supported, including empty password.
+- SSH public-key authentication supported, including DSA RSA ECDSA
+ Ed25519 keys.
+- Encrypted keys supported.
+- Two-Factor Authentication (time-based one-time password) supported.
+- Fullscreen terminal supported.
+- Terminal window resizable.
+- Auto detect the ssh server's default encoding.
+- Modern browsers including Chrome, Firefox, Safari, Edge, Opera
+ supported.
+
+Preview
+~~~~~~~
+
+|Login| |Terminal|
+
+How it works
+~~~~~~~~~~~~
+
+::
+
+ +---------+ http +--------+ ssh +-----------+
+ | browser | <==========> | webssh | <=======> | ssh server|
+ +---------+ websocket +--------+ ssh +-----------+
+
+Requirements
+~~~~~~~~~~~~
+
+- Python 2.7/3.4+
+
+Quickstart
+~~~~~~~~~~
+
+1. Install this app, run command ``pip install webssh``
+2. Start a webserver, run command ``wssh``
+3. Open your browser, navigate to ``127.0.0.1:8888``
+4. Input your data, submit the form.
+
+Server options
+~~~~~~~~~~~~~~
+
+.. code:: bash
+
+ # start a http server with specified listen address and listen port
+ wssh --address='2.2.2.2' --port=8000
+
+ # start a https server, certfile and keyfile must be passed
+ wssh --certfile='/path/to/cert.crt' --keyfile='/path/to/cert.key'
+
+ # missing host key policy
+ wssh --policy=reject
+
+ # logging level
+ wssh --logging=debug
+
+ # log to file
+ wssh --log-file-prefix=main.log
+
+ # more options
+ wssh --help
+
+Browser console
+~~~~~~~~~~~~~~~
+
+.. code:: javascript
+
+ // connect to your ssh server
+ wssh.connect(hostname, port, username, password, privatekey, passphrase, totp);
+
+ // pass an object to wssh.connect
+ var opts = {
+ hostname: 'hostname',
+ port: 'port',
+ username: 'username',
+ password: 'password',
+ privatekey: 'the private key text',
+ passphrase: 'passphrase',
+ totp: 'totp'
+ };
+ wssh.connect(opts);
+
+ // without an argument, wssh will use the form data to connect
+ wssh.connect();
+
+ // set a new encoding for client to use
+ wssh.set_encoding(encoding);
+
+ // reset encoding to use the default one
+ wssh.reset_encoding();
+
+ // send a command to the server
+ wssh.send('ls -l');
+
+Custom Font
+~~~~~~~~~~~
+
+To use custom font, put your font file in the directory
+``webssh/static/css/fonts/`` and restart the server.
+
+URL Arguments
+~~~~~~~~~~~~~
+
+Support passing arguments by url (query or fragment) like following
+examples:
+
+Passing form data (password must be encoded in base64, privatekey not
+supported)
+
+.. code:: bash
+
+ http://localhost:8888/?hostname=xx&username=yy&password=str_base64_encoded
+
+Passing a terminal background color
+
+.. code:: bash
+
+ http://localhost:8888/#bgcolor=green
+
+Passing a user defined title
+
+.. code:: bash
+
+ http://localhost:8888/?title=my-ssh-server
+
+Passing an encoding
+
+.. code:: bash
+
+ http://localhost:8888/#encoding=gbk
+
+Passing a command executed right after login
+
+.. code:: bash
+
+ http://localhost:8888/?command=pwd
+
+Passing a terminal type
+
+.. code:: bash
+
+ http://localhost:8888/?term=xterm-256color
+
+Use Docker
+~~~~~~~~~~
+
+Start up the app
+
+::
+
+ docker-compose up
+
+Tear down the app
+
+::
+
+ docker-compose down
+
+Tests
+~~~~~
+
+Requirements
+
+::
+
+ pip install pytest pytest-cov codecov flake8 mock
+
+Use unittest to run all tests
+
+::
+
+ python -m unittest discover tests
+
+Use pytest to run all tests
+
+::
+
+ python -m pytest tests
+
+Deployment
+~~~~~~~~~~
+
+Running behind an Nginx server
+
+.. code:: bash
+
+ wssh --address='127.0.0.1' --port=8888 --policy=reject
+
+.. code:: nginx
+
+ # Nginx config example
+ location / {
+ proxy_pass http://127.0.0.1:8888;
+ proxy_http_version 1.1;
+ proxy_read_timeout 300;
+ proxy_set_header Upgrade $http_upgrade;
+ proxy_set_header Connection "upgrade";
+ proxy_set_header Host $http_host;
+ proxy_set_header X-Real-IP $remote_addr;
+ proxy_set_header X-Real-PORT $remote_port;
+ }
+
+Running as a standalone server
+
+.. code:: bash
+
+ wssh --port=8080 --sslport=4433 --certfile='cert.crt' --keyfile='cert.key' --xheaders=False --policy=reject
+
+Tips
+~~~~
+
+- For whatever deployment choice you choose, don't forget to enable
+ SSL.
+- By default plain http requests from a public network will be either
+ redirected or blocked and being redirected takes precedence over
+ being blocked.
+- Try to use reject policy as the missing host key policy along with
+ your verified known\_hosts, this will prevent man-in-the-middle
+ attacks. The idea is that it checks the system host keys
+ file("~/.ssh/known\_hosts") and the application host keys
+ file("./known\_hosts") in order, if the ssh server's hostname is not
+ found or the key is not matched, the connection will be aborted.
+
+.. |Build Status| image:: https://travis-ci.org/huashengdun/webssh.svg?branch=master
+ :target: https://travis-ci.org/huashengdun/webssh
+.. |codecov| image:: https://codecov.io/gh/huashengdun/webssh/branch/master/graph/badge.svg
+ :target: https://codecov.io/gh/huashengdun/webssh
+.. |PyPI - Python Version| image:: https://img.shields.io/pypi/pyversions/webssh.svg
+.. |PyPI| image:: https://img.shields.io/pypi/v/webssh.svg
+.. |Login| image:: https://github.com/huashengdun/webssh/raw/master/preview/login.png
+.. |Terminal| image:: https://github.com/huashengdun/webssh/raw/master/preview/terminal.png
+
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..315cee6
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,6 @@
+version: '3'
+services:
+ web:
+ build: .
+ ports:
+ - "8888:8888"
diff --git a/preview/login.png b/preview/login.png
new file mode 100644
index 0000000..b83d49c
--- /dev/null
+++ b/preview/login.png
Binary files differ
diff --git a/preview/terminal.png b/preview/terminal.png
new file mode 100644
index 0000000..0e616d1
--- /dev/null
+++ b/preview/terminal.png
Binary files differ
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..aae78dd
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,3 @@
+paramiko==2.7.2
+tornado==5.1.1; python_version < '3.5'
+tornado==6.0.4; python_version >= '3.5'
diff --git a/run.py b/run.py
new file mode 100644
index 0000000..0585233
--- /dev/null
+++ b/run.py
@@ -0,0 +1,5 @@
+from webssh.main import main
+
+
+if __name__ == '__main__':
+ main()
diff --git a/setup.cfg b/setup.cfg
new file mode 100644
index 0000000..b798b26
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,9 @@
+[wheel]
+universal = 1
+
+[metadata]
+license_file = LICENSE
+
+[flake8]
+exclude = .git,build,dist,tests, __init__.py
+max-line-length = 79
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..eb1c6a4
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,40 @@
+import codecs
+from setuptools import setup
+from webssh._version import __version__ as version
+
+
+with codecs.open('README.rst', encoding='utf-8') as f:
+ long_description = f.read()
+
+
+setup(
+ name='webssh',
+ version=version,
+ description='Web based ssh client',
+ long_description=long_description,
+ author='Shengdun Hua',
+ author_email='webmaster0115@gmail.com',
+ url='https://github.com/huashengdun/webssh',
+ packages=['webssh'],
+ entry_points='''
+ [console_scripts]
+ wssh = webssh.main:main
+ ''',
+ license='MIT',
+ include_package_data=True,
+ classifiers=[
+ 'Programming Language :: Python',
+ 'Programming Language :: Python :: 2',
+ 'Programming Language :: Python :: 2.7',
+ 'Programming Language :: Python :: 3',
+ 'Programming Language :: Python :: 3.4',
+ 'Programming Language :: Python :: 3.5',
+ 'Programming Language :: Python :: 3.6',
+ 'Programming Language :: Python :: 3.7',
+ 'Programming Language :: Python :: 3.8',
+ ],
+ install_requires=[
+ 'tornado>=4.5.0',
+ 'paramiko>=2.3.1',
+ ],
+)
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/__init__.py
diff --git a/tests/data/cert.crt b/tests/data/cert.crt
new file mode 100644
index 0000000..a72be81
--- /dev/null
+++ b/tests/data/cert.crt
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDYDCCAkigAwIBAgIJAPPORA/o2Zd4MA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV
+BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
+aWRnaXRzIFB0eSBMdGQwHhcNMTgxMDE0MDgwNTQzWhcNMjExMDEzMDgwNTQzWjBF
+MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50
+ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB
+CgKCAQEAvSFaffq6ExFCPN4cApRopGEqVIipAYb6Ky3VHVu4pW0tOdrdKafGGYkN
+GWQdsLV0AAzzxmCAPpXmmAx0m0mgtPaJp3iW8NUibkISxdEO/QJOA7y8O9iWhDdb
+l9ghjwPI5AwURQkDkXbcBBBzQksYDaYseL2NGDGXkKCUQQoLzV0H+SV3vCPrbOXH
+t50HKgKzEOGoT8LcI7BRCTXk1xTlK0b/4ylKUwKIsfNPH0a9RkukBjMFkpXG/2CV
+VWb89+TkMzQwhcpIVn6rUCJQW5pHVRYLACP32Zki7xPUJb9OfF7XDK54v6Cwo3Fi
+aZWxN6rYhnn8wRTufY3PYzv5f3XiZwIDAQABo1MwUTAdBgNVHQ4EFgQUq0kfpU/m
+WQwNk3ymwm7fuVwYhJ0wHwYDVR0jBBgwFoAUq0kfpU/mWQwNk3ymwm7fuVwYhJ0w
+DwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAf2xudhAeOTUpNpw+
+XZWLBXBKZXINd7PrUDgEG4bB0/0kYZN+T7bMJEtmv6+9t57y6jSni9sQzpbvT2tJ
+TrbZgwhDvyTm3mw5n5RpAB9ZK+lnMcasa5N4qSd6wmpXjkC+kcEs7oQ8PwgIf3xT
+/aGdoswNTWCz0W8vs8yRynLB4MKx1d20IMlDkfGu5n7wXhNK0ymcT8pa6iqEYl6X
+bhPVTlELl8bM/OKktFc42VXoRghLRnfl8yM/9t7HVHKfHXZrLpIdtEOvnKwtzX5r
+fBMs4IPa0OIPHGCcbLGT4rIbSvSaI8yOPA93G1XXbMF1VKdKyzdGjMS6aFKfbrhV
+lnaUOA==
+-----END CERTIFICATE-----
diff --git a/tests/data/cert.key b/tests/data/cert.key
new file mode 100644
index 0000000..f453068
--- /dev/null
+++ b/tests/data/cert.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC9IVp9+roTEUI8
+3hwClGikYSpUiKkBhvorLdUdW7ilbS052t0pp8YZiQ0ZZB2wtXQADPPGYIA+leaY
+DHSbSaC09omneJbw1SJuQhLF0Q79Ak4DvLw72JaEN1uX2CGPA8jkDBRFCQORdtwE
+EHNCSxgNpix4vY0YMZeQoJRBCgvNXQf5JXe8I+ts5ce3nQcqArMQ4ahPwtwjsFEJ
+NeTXFOUrRv/jKUpTAoix808fRr1GS6QGMwWSlcb/YJVVZvz35OQzNDCFykhWfqtQ
+IlBbmkdVFgsAI/fZmSLvE9Qlv058XtcMrni/oLCjcWJplbE3qtiGefzBFO59jc9j
+O/l/deJnAgMBAAECggEAZSwcblvbgiuvVUQzk6W0PIrFzCa20dxUoxiHcocIRWYb
+1WEhAhF/xVUtLrIBt++5N/W1yh8BO3mQuzGehxth3qwrguzdQcOiAX1S8YMeE3ZS
+KWmjABiim+PJGXdCrHCH3IYhqbRitkPw+jOalJH7MgH8tDIh8hlFTNa5t/kZyybW
+uGFbqF6OFmyHSDIPvjPALzSlmd5po+EywnA5oa3sObj4n5xuaFB2l/IaF3ix38vT
+geo517L15cCuAa7x42i1cAGn5H/hdeO/Dw+MGk+0sXRRPooCMBzKztxpsB+7kNhk
+jbsVHmTkE5UG/T7Uc0PsthZNjFwouPOrQQVUFYTnwQKBgQDwBvpmc9vX4gnADa7p
+L2lgMVo6KccPFeFr4DIAYmwS0Vl0sB2j6nPVEBg3PatGLKGNMCIlcj+A3z6KQ+4o
+n7pnekRwX+2+m3OPX4Rbw8c/+E0CiRPtmYp9BISKNgPoSRGsI6s/L3wzagsDsQ3v
+xhKCohvfyY8JwUEPX6Hosmu/UQKBgQDJt0/ihWn0g/2uOKnXlXthxvkXFoR45sO7
+lY/yoyJB+Z4yGAjJlbyra+5xnReqYyBnf34/2AoddjT45dPCaFucMInQFINdMGF1
+NeVNzC6xa/7jjbgwf4kGqHsLC85Mrq3wyK5hwhMmfEPmRs6w+CRzM/Q78Bsr5P/T
+zEa13jFINwKBgQC50L0ieUjVDKD9s9oXnWOXWz19T4BRtl+nco1i7M67lqQJCJo5
+njQD2ozUnwIrtjtuoLeeg56Ttr+krEf/3P+iQe4fjLPxXkiM0qYVoC9s311GvDXY
+N4gVllzA3mYR+hcbSxW0OZ+N8ecK+ZNPbug/hx3LFi+MnrYuH5upGA7/sQKBgCRk
+nlUQHP2wkqRMNNhgb9JEQ8yWk2/8snO1mDL+m7+reY8wJuW3zkJfRrXY0dw75izG
+I9EA+VI3cXc2f+4jReP4HeUczlaR1AOBpc1TeVkpUuNbPlABsocw/oIPrzjGiztV
++aBJk4ruAJIbVE85ddoTFY161Gwm9MERqfBGFj4hAoGAN/ry0KC9/QkLkuPjs3uL
+AU3xjBJt1SMB7KZq1yt8mBo8M4q/E3ulynBK7G3f+hS2aj7OAhU4IcPRPGqjsLO1
+dZTIOMeVyOAr0TAaioCCIyvf8hEjA7cXddnWBJYi3WiUpOc6J0uINoSlrAX2UXtw
+/Aq5PmJKn4D4a75f+ue2Sw8=
+-----END PRIVATE KEY-----
diff --git a/tests/data/fonts/.gitignore b/tests/data/fonts/.gitignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/data/fonts/.gitignore
diff --git a/tests/data/fonts/fake-font b/tests/data/fonts/fake-font
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/data/fonts/fake-font
diff --git a/tests/data/known_hosts_example b/tests/data/known_hosts_example
new file mode 100644
index 0000000..66ee240
--- /dev/null
+++ b/tests/data/known_hosts_example
@@ -0,0 +1 @@
+192.168.1.199 ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAINwZGQmNFADnAAlm5uFLQTrdxqpNxHdgg4JPbB3sR2kr
diff --git a/tests/data/known_hosts_example2 b/tests/data/known_hosts_example2
new file mode 100644
index 0000000..f4c7aab
--- /dev/null
+++ b/tests/data/known_hosts_example2
@@ -0,0 +1 @@
+192.168.1.196 ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAINwZGQmNFADnAAlm5uFLQTrdxqpNxHdgg4JPbB3sR2kr
diff --git a/tests/data/known_hosts_example3 b/tests/data/known_hosts_example3
new file mode 100644
index 0000000..530b4ad
--- /dev/null
+++ b/tests/data/known_hosts_example3
@@ -0,0 +1 @@
+192.168.1.196 ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAINwZGQmNFADnAAlm5uFLQTrdxqpNxHdgg4JPbB3sR2jr
diff --git a/tests/data/test_ed25519.key b/tests/data/test_ed25519.key
new file mode 100644
index 0000000..eb9f94c
--- /dev/null
+++ b/tests/data/test_ed25519.key
@@ -0,0 +1,8 @@
+-----BEGIN OPENSSH PRIVATE KEY-----
+b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW
+QyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXwAAAKhjwAdrY8AH
+awAAAAtzc2gtZWQyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXw
+AAAEA9tGQi2IrprbOSbDCF+RmAHd6meNSXBUQ2ekKXm4/8xnr1K9komH/1WBIvQbbtvnFV
+hryd62EfcgRFuLRiokNfAAAAI2FsZXhfZ2F5bm9yQEFsZXhzLU1hY0Jvb2stQWlyLmxvY2
+FsAQI=
+-----END OPENSSH PRIVATE KEY-----
diff --git a/tests/data/test_ed25519_password.key b/tests/data/test_ed25519_password.key
new file mode 100644
index 0000000..d178aaa
--- /dev/null
+++ b/tests/data/test_ed25519_password.key
@@ -0,0 +1,8 @@
+-----BEGIN OPENSSH PRIVATE KEY-----
+b3BlbnNzaC1rZXktdjEAAAAACmFlczI1Ni1jYmMAAAAGYmNyeXB0AAAAGAAAABDaKD4ac7
+kieb+UfXaLaw68AAAAEAAAAAEAAAAzAAAAC3NzaC1lZDI1NTE5AAAAIOQn7fjND5ozMSV3
+CvbEtIdT73hWCMRjzS/lRdUDw50xAAAAsE8kLGyYBnl9ihJNqv378y6mO3SkzrDbWXOnK6
+ij0vnuTAvcqvWHAnyu6qBbplu/W2m55ZFeAItgaEcV2/V76sh/sAKlERqrLFyXylN0xoOW
+NU5+zU08aTlbSKGmeNUU2xE/xfJq12U9XClIRuVUkUpYANxNPbmTRpVrbD3fgXMhK97Jrb
+DEn8ca1IqMPiYmd/hpe5+tq3OxyRljXjCUFWTnqkp9VvUdzSTdSGZHsW9i
+-----END OPENSSH PRIVATE KEY-----
diff --git a/tests/data/test_known_hosts b/tests/data/test_known_hosts
new file mode 100644
index 0000000..f1413d8
--- /dev/null
+++ b/tests/data/test_known_hosts
@@ -0,0 +1 @@
+[127.0.0.1]:2222 ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAINwZGQmNFADnAAlm5uFLQTrdxqpNxHdgg4JPbB3sR2kr
diff --git a/tests/data/test_new_dsa.key b/tests/data/test_new_dsa.key
new file mode 100644
index 0000000..e4d2223
--- /dev/null
+++ b/tests/data/test_new_dsa.key
@@ -0,0 +1,21 @@
+-----BEGIN OPENSSH PRIVATE KEY-----
+b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABsQAAAAdzc2gtZH
+NzAAAAgQC5Y5rQ1EN+eWQUFv/9K/DLfPgjGC0mwyqvKsKyv6RLpKLc0vi0VDj8lY0WUcuG
+CzdYnhIOSa9aB0buGe10gIjU2vAxkhqv1yaR+Zuj3dLDHQk6jpAAgNHciKlQSf1zho/seL
+7nehYq/waXfU8/iJuXqywQgqpMLfaHOnIl/tPLGQAAABUArINMjWcrsmEgLmzf6k+sroko
+5GkAAACAMQsRQjOtQGQA8/XI7vOWnEMCVntwt1Xi4RsLH5+4GpUMUcm4CvqjfFfSF4CufH
+pjlywFhrAC2/ouQIpGJPGToWotk7dt5zWckGX5DscMiRVON7fxdpUMn16IO6DdUctXlWa9
+SY+NdfRESKoUCjgH5nlM8k7N2MwCK5phHHkoPu8AAACADgxrRWeNqX3gmZUM1qhrDO0mOH
+oHJFrBuvJCdQ6+S1GvjuBI0rNm225+gcaAhia9k/LGk8NwCbWG1FbpesuNaNFt/FxS9LVS
+qEaZoXtKuY+CUCn1BfBWF97/u0oMPwanXKIJEAhU81f5TXZM8Ui7OEIyTx1t9qgva+5/gF
+cL48kAAAHoLtDYCy7Q2AsAAAAHc3NoLWRzcwAAAIEAuWOa0NRDfnlkFBb//Svwy3z4Ixgt
+JsMqryrCsr+kS6Si3NL4tFQ4/JWNFlHLhgs3WJ4SDkmvWgdG7hntdICI1NrwMZIar9cmkf
+mbo93Swx0JOo6QAIDR3IipUEn9c4aP7Hi+53oWKv8Gl31PP4ibl6ssEIKqTC32hzpyJf7T
+yxkAAAAVAKyDTI1nK7JhIC5s3+pPrK6JKORpAAAAgDELEUIzrUBkAPP1yO7zlpxDAlZ7cL
+dV4uEbCx+fuBqVDFHJuAr6o3xX0heArnx6Y5csBYawAtv6LkCKRiTxk6FqLZO3bec1nJBl
++Q7HDIkVTje38XaVDJ9eiDug3VHLV5VmvUmPjXX0REiqFAo4B+Z5TPJOzdjMAiuaYRx5KD
+7vAAAAgA4Ma0Vnjal94JmVDNaoawztJjh6ByRawbryQnUOvktRr47gSNKzZttufoHGgIYm
+vZPyxpPDcAm1htRW6XrLjWjRbfxcUvS1UqhGmaF7SrmPglAp9QXwVhfe/7tKDD8Gp1yiCR
+AIVPNX+U12TPFIuzhCMk8dbfaoL2vuf4BXC+PJAAAAFBVcac1iVzrWVnLglRZRenUhlKLr
+AAAADHNoZW5nQHNlcnZlcgECAwQFBgc=
+-----END OPENSSH PRIVATE KEY-----
diff --git a/tests/data/test_new_rsa_password.key b/tests/data/test_new_rsa_password.key
new file mode 100644
index 0000000..68ae4c1
--- /dev/null
+++ b/tests/data/test_new_rsa_password.key
@@ -0,0 +1,39 @@
+-----BEGIN OPENSSH PRIVATE KEY-----
+b3BlbnNzaC1rZXktdjEAAAAACmFlczI1Ni1jdHIAAAAGYmNyeXB0AAAAGAAAABASFMDZtr
+vMq0+bs9xBVRMOAAAAEAAAAAEAAAGXAAAAB3NzaC1yc2EAAAADAQABAAABgQCpYgFiRc6d
+etTng/gKoHzfZrgsr+0dqsfVkrsTAl/w+2OsZbR6MCbcY94fEcE7WMTWSYUY2qv+35nlQn
+MT/8Q8Y8TTMbcQLIOaNhLQ2dFH8wn2e7+DbUT8giOOEICBjdUZx3tEH7PcFTzQ9ivHVIkb
+Rk8UHbj3vznvBvNEgQK+jj0ZI3+deOOFlPbnq9R3dJNgdVXAEnSt0cEfjteJQwT4PcaA2N
+fQvQAQtspC0EfEixvBH+yJsvjPDZwnYyejVGbGwKMdqAJJVka4QRkCJNoi5eyngDj/pzC7
+OhGeqNwlG+D28Zz885HXIZ5eEKYNy9YJlff1WlWH8/+1fb9eVdGEXd2/fpzc/+r2QW88aX
+L3bg2o46qswi+5F/yYbw8AOPCq1P62ZbsVxxWTYvG947AvxfH9ycZoOItizLofOluBELQV
+0P/0ooa0kPJpWQXuTAY7YSzo4vgw1F+O+8b1g33mWftUu6OHp7Rb2N3yRUiGVq9dVYeFhR
+8ycyFPWjoNvwMAAAWAfnTLRACzZl9T9m7oZXtRn/OFKsr/Z8mKfkeTb4PQ+cFT/Bi2adNq
+2JTsBhfGXAXiKLVVOBgBRmY5c+x0oWyrC1agoOEWkz1LhnKlJ2ETbmJBfDeRsMy5COQDmh
+Wnfj8noLzv59+MrPcIEfHSdC4Rai2JgFH54m5G5vaGR6SGbQ27E1ZPYnzzG9qrEB2UY30S
+1gCs8G4ppX/clIVq0eToKAHseV7UG/FDwuaiPOvk61pyUjefj+bexggZxUOJANdB5pWfl7
+BnEM3q9nD4QF74yrWZL38897Izku9l2Iupn64DMVs2+T/9WsfR7kDgJDoL2Noa/57w4ien
+Wt6WtKBnISmh9Bm5zbRG5fhPEMtCgrV3TAPgzj1VQ8Vy91D16CnWucqBpdDys46gUodiVZ
+Z6idCV6z24hHIJc7joR2mCNmqitCGcyrf4cO8tzug1DZVMeSkKSqL85oH9u/EOR/uWWNQi
+GAlehn8gmmlborYsLybau68EfyHSwYJ8XaLrELDfvM9L1CHDDacJ4svFa93r0y380Fek5P
+CqOLH4IqhpLHWWRoWSr23AjO6p0ZihrHzSveIzmuuTNr6uJmFt76jPKcpmLycCKhD8gKtk
+ZRjh+y5mEruTg/BJixCWhbl88rPYRSGNGjR9e91esw8Yj8BGYEvbvhkG0pQQpv937dbJuh
+n+CtnpvGr+8Mhw+mB2OW2c38XaAouwugLSoWV16xcwWx3z0ez0EAyeWjHev2XxjW5bigWg
+edmDPiYN+1I+OmG7d5NctKqNABb0qpwavL1uRJO96cC1drwucu5aTBrMRv1HlDQpsPHSRf
+u4FVruLE0wDaL2saowkZDJF5GoxjMdpzOpeVmjREuU3NwCrQr8t/AvDxzXl4x8BZ3jJTwe
+RA0yTGwSAZDzeN3KV2FLn+0K7xB+XvKqtKR5/IOlGviCt2w73nJpReAuSgMk95M/9imm5J
+r/AEcmkXKUT8gjPIT6B1xs44nnWvyf+CZreUZthAjYAjXn4ncKT51WX8q1dUuCKt9XQC7b
+pKH20WrP7BB/AoPPyaKtRbDBIy3Y9YA8KDsYoR9kC+hqIttL5IWxXwc15HzkU4fdKLQ4n1
+VTfzaz5Ns2gsfsSAYdyJKZ8JkP/tHR2bFN7m1rWqfzL8hrGv+BF/+rR7/3+BDOD0aZCep6
+u6mO4OD9hEuOP2rK5EVjJAoON7nYmjdfDpXRmp/p2f0Y+pA4R7CN+4xnel1gxlE7tBdQ7z
+Zu2O+NPToHXGLhzwUKUIqVhYb5cwdMIzaFQwyvOTyjNVMH0AqcsF2VuDWkgSqALg1CCSz3
+7Vinx6/tyPYZ1kHm+j0dNijSdvHZrwsmvxPfYspzB7K+Vi5cNsOw6pQGIBgBTBIU09FqB+
+MRBfNmLfVgVYsiU1jz/s/7H3J8DTNIC1XS4LRUXVlwddGSP/dXLgO6EJX3OvdduBD04HSZ
+wWggXDgWo1snhB8O2w6YSk6ocd801gPesebXGBWm+54oirWrpDr3E9y2RS7oaDFAMUV6rV
+IG/gc4rEFUNKX+0RwKJyArmYYJOhYgfoH0fEs01OKs6NzcsknXKVLPAXUaXV77nGlc4xsa
+G62+K3rLdaMFSWf/TFaIrl2Bma3p4tx993hsjNQewRhnrWdyEqP8CLcKq8Wc/fl4LlytWA
+PhjtjWxAp0RQKvjEu4Ul0SbFoiC+hbh+pWhVoQjPTXZePBWgI1M8CHX4fvcoRk0Ay1VMwx
+AZzHoZZl6v4arok4/nqwv5kYo7HhRbJrPBbNAJcGkE0Hnbh/4DxtcOLsSgwACTw03qavji
+wvu8wv0L5oQ6Q0H6LCUMQl/2eTuUt9uVtFXWRPmYolqmIKR5ZejYACI3XVyfaYJR6SuSx8
+PR/8/w==
+-----END OPENSSH PRIVATE KEY-----
diff --git a/tests/data/test_rsa.key b/tests/data/test_rsa.key
new file mode 100644
index 0000000..f50e9c5
--- /dev/null
+++ b/tests/data/test_rsa.key
@@ -0,0 +1,15 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
+oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
+d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
+gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
+EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
+soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
+tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
+avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
+4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
+H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
+qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
+HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
+nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
+-----END RSA PRIVATE KEY-----
diff --git a/tests/data/test_rsa_password.key b/tests/data/test_rsa_password.key
new file mode 100644
index 0000000..7713049
--- /dev/null
+++ b/tests/data/test_rsa_password.key
@@ -0,0 +1,18 @@
+-----BEGIN RSA PRIVATE KEY-----
+Proc-Type: 4,ENCRYPTED
+DEK-Info: DES-EDE3-CBC,DAA422E8A5A8EFB7
+
++nssHGmWl91IcmGiE6DdCIqGvAP04tuLh60wLjWBvdjtF9CjztPnF57xe+6pBk7o
+YgF/Ry3ik9ZV9rHNcRXifDKM9crxtYlpUlkM2C0SP89sXaO0P1Q1yCnrtZUwDIKO
+BNV8et5X7+AGMFsy/nmv0NFMrbpoG03Dppsloecd29NTRlIXwxHRFyHxy6BdEib/
+Dn0mEVbwg3dTvKrd/sODWR9hRwpDGM9nkEbUNJCh7vMwFKkIZZF8yqFvmGckuO5C
+HZkDJ6RkEDYrSZJAavQaiOPF5bu3cHughRfnrIKVrQuTTDiWjwX9Ny8e4p4k7dy7
+rLpbPhtxUOUbpOF7T1QxljDi1Tcq3Ebk3kN/ZLPRFnDrJfyUx+m9BXmAa78Wxs/l
+KaS8DTkYykd3+EGOeJFjZg2bvgqil4V+5JIt/+MQ5pZ/ui7i4GcH2bvZyGAbrXzP
+3LipSAdN5RG+fViLe3HUtfCx4ZAgtU78TWJrLk2FwKQGglFxKLnswp+IKZb09rZV
+uxmG4pPLUnH+mMYdiy5ugzj+5C8iZ0/IstpHVmO6GWROfedpJ82eMztTOtdhfMep
+8Z3HwAwkDtksL7Gq9klb0Wq5+uRlBWetixddAvnmqXNzYhaANWcAF/2a2Hz06Rb0
+e6pe/g0Ek5KV+6YI+D+oEblG0Sr+d4NtxtDTmIJKNVkmzlhI2s53bHp6txCb5JWJ
+S8mKLPBBBzaNXYd3odDvGXguuxUntWSsD11KyR6B9DXMIfWQW5dT7hp5kTMGlXWJ
+lD2hYab13DCCuAkwVTdpzhHYLZyxLYoSu05W6z8SAOs=
+-----END RSA PRIVATE KEY-----
diff --git a/tests/data/user_rsa_key b/tests/data/user_rsa_key
new file mode 100644
index 0000000..ee64f23
--- /dev/null
+++ b/tests/data/user_rsa_key
@@ -0,0 +1,15 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIICXQIBAAKBgQDI7iK3d8eWYZlYloat94c5VjtFY7c/0zuGl8C7uMnZ3t6i2G99
+66hEW0nCFSZkOW5F0XKEVj+EUCHvo8koYC6wiohAqWQnEwIoOoh7GSAcB8gP/qaq
++adIl/Rvlby/mHakj+y05LBND6nFWHAn1y1gOFFKUXSJNRZPXSFy47gqzwIBIwKB
+gQCbANjz7q/pCXZLp1Hz6tYHqOvlEmjK1iabB1oqafrMpJ0eibUX/u+FMHq6StR5
+M5413BaDWHokPdEJUnabfWXXR3SMlBUKrck0eAer1O8m78yxu3OEdpRk+znVo4DL
+guMeCdJB/qcF0kEsx+Q8HP42MZU1oCmk3PbfXNFwaHbWuwJBAOQ/ry/hLD7AqB8x
+DmCM82A9E59ICNNlHOhxpJoh6nrNTPCsBAEu/SmqrL8mS6gmbRKUaya5Lx1pkxj2
+s/kWOokCQQDhXCcYXjjWiIfxhl6Rlgkk1vmI0l6785XSJNv4P7pXjGmShXfIzroh
+S8uWK3tL0GELY7+UAKDTUEVjjQdGxYSXAkEA3bo1JzKCwJ3lJZ1ebGuqmADRO6UP
+40xH977aadfN1mEI6cusHmgpISl0nG5YH7BMsvaT+bs1FUH8m+hXDzoqOwJBAK3Z
+X/za+KV/REya2z0b+GzgWhkXUGUa/owrEBdHGriQ47osclkUgPUdNqcLmaDilAF4
+1Z4PHPrI5RJIONAx+JECQQC/fChqjBgFpk6iJ+BOdSexQpgfxH/u/457W10Y43HR
+soS+8btbHqjQkowQ/2NTlUfWvqIlfxs6ZbFsIp/HrhZL
+-----END RSA PRIVATE KEY-----
diff --git a/tests/sshserver.py b/tests/sshserver.py
new file mode 100644
index 0000000..de4deae
--- /dev/null
+++ b/tests/sshserver.py
@@ -0,0 +1,233 @@
+#!/usr/bin/env python
+
+# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+import random
+import socket
+# import sys
+import threading
+# import traceback
+import paramiko
+
+from binascii import hexlify
+from paramiko.py3compat import u, decodebytes
+from tests.utils import make_tests_data_path
+
+
+# setup logging
+paramiko.util.log_to_file(make_tests_data_path('sshserver.log'))
+
+host_key = paramiko.RSAKey(filename=make_tests_data_path('test_rsa.key'))
+# host_key = paramiko.DSSKey(filename='test_dss.key')
+
+print('Read key: ' + u(hexlify(host_key.get_fingerprint())))
+
+banner = u'\r\n\u6b22\u8fce\r\n'
+event_timeout = 5
+
+
+class Server(paramiko.ServerInterface):
+ # 'data' is the output of base64.b64encode(key)
+ # (using the "user_rsa_key" files)
+ data = (b'AAAAB3NzaC1yc2EAAAABIwAAAIEAyO4it3fHlmGZWJaGrfeHOVY7RWO3P9M7hp'
+ b'fAu7jJ2d7eothvfeuoRFtJwhUmZDluRdFyhFY/hFAh76PJKGAusIqIQKlkJxMC'
+ b'KDqIexkgHAfID/6mqvmnSJf0b5W8v5h2pI/stOSwTQ+pxVhwJ9ctYDhRSlF0iT'
+ b'UWT10hcuO4Ks8=')
+ good_pub_key = paramiko.RSAKey(data=decodebytes(data))
+
+ commands = [
+ b'$SHELL -ilc "locale charmap"',
+ b'$SHELL -ic "locale charmap"'
+ ]
+ encodings = ['UTF-8', 'GBK', 'UTF-8\r\n', 'GBK\r\n']
+
+ def __init__(self, encodings=[]):
+ self.shell_event = threading.Event()
+ self.exec_event = threading.Event()
+ self.cmd_to_enc = self.get_cmd2enc(encodings)
+ self.password_verified = False
+ self.key_verified = False
+
+ def get_cmd2enc(self, encodings):
+ n = len(self.commands)
+ while len(encodings) < n:
+ encodings.append(random.choice(self.encodings))
+ return dict(zip(self.commands, encodings[0:n]))
+
+ def check_channel_request(self, kind, chanid):
+ if kind == 'session':
+ return paramiko.OPEN_SUCCEEDED
+ return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
+
+ def check_auth_password(self, username, password):
+ print('Auth attempt with username: {!r} & password: {!r}'.format(username, password)) # noqa
+ if (username in ['robey', 'bar', 'foo']) and (password == 'foo'):
+ return paramiko.AUTH_SUCCESSFUL
+ return paramiko.AUTH_FAILED
+
+ def check_auth_publickey(self, username, key):
+ print('Auth attempt with username: {!r} & key: {!r}'.format(username, u(hexlify(key.get_fingerprint())))) # noqa
+ if (username in ['robey', 'keyonly']) and (key == self.good_pub_key):
+ return paramiko.AUTH_SUCCESSFUL
+ if username == 'pkey2fa' and key == self.good_pub_key:
+ self.key_verified = True
+ return paramiko.AUTH_PARTIALLY_SUCCESSFUL
+ return paramiko.AUTH_FAILED
+
+ def check_auth_interactive(self, username, submethods):
+ if username in ['pass2fa', 'pkey2fa']:
+ self.username = username
+ prompt = 'Verification code: ' if self.password_verified else 'Password: ' # noqa
+ print(username, prompt)
+ return paramiko.InteractiveQuery('', '', prompt)
+ return paramiko.AUTH_FAILED
+
+ def check_auth_interactive_response(self, responses):
+ if self.username in ['pass2fa', 'pkey2fa']:
+ if not self.password_verified:
+ if responses[0] == 'password':
+ print('password verified')
+ self.password_verified = True
+ if self.username == 'pkey2fa':
+ return self.check_auth_interactive(self.username, '')
+ else:
+ print('wrong password: {}'.format(responses[0]))
+ return paramiko.AUTH_FAILED
+ else:
+ if responses[0] == 'passcode':
+ print('totp verified')
+ return paramiko.AUTH_SUCCESSFUL
+ else:
+ print('wrong totp: {}'.format(responses[0]))
+ return paramiko.AUTH_FAILED
+ else:
+ return paramiko.AUTH_FAILED
+
+ def get_allowed_auths(self, username):
+ if username == 'keyonly':
+ return 'publickey'
+ if username == 'pass2fa':
+ return 'keyboard-interactive'
+ if username == 'pkey2fa':
+ if not self.key_verified:
+ return 'publickey'
+ else:
+ return 'keyboard-interactive'
+ return 'password,publickey'
+
+ def check_channel_exec_request(self, channel, command):
+ if command not in self.commands:
+ ret = False
+ else:
+ ret = True
+ self.encoding = self.cmd_to_enc[command]
+ channel.send(self.encoding)
+ channel.shutdown(1)
+ self.exec_event.set()
+ return ret
+
+ def check_channel_shell_request(self, channel):
+ self.shell_event.set()
+ return True
+
+ def check_channel_pty_request(self, channel, term, width, height,
+ pixelwidth, pixelheight, modes):
+ return True
+
+ def check_channel_window_change_request(self, channel, width, height,
+ pixelwidth, pixelheight):
+ channel.send('resized')
+ return True
+
+
+def run_ssh_server(port=2200, running=True, encodings=[]):
+ # now connect
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind(('127.0.0.1', port))
+ sock.listen(100)
+
+ while running:
+ client, addr = sock.accept()
+ print('Got a connection!')
+
+ t = paramiko.Transport(client)
+ t.load_server_moduli()
+ t.add_server_key(host_key)
+ server = Server(encodings)
+ try:
+ t.start_server(server=server)
+ except Exception as e:
+ print(e)
+ continue
+
+ # wait for auth
+ chan = t.accept(2)
+ if chan is None:
+ print('*** No channel.')
+ continue
+
+ username = t.get_username()
+ print('{} Authenticated!'.format(username))
+
+ server.shell_event.wait(timeout=event_timeout)
+ if not server.shell_event.is_set():
+ print('*** Client never asked for a shell.')
+ continue
+
+ server.exec_event.wait(timeout=event_timeout)
+ if not server.exec_event.is_set():
+ print('*** Client never asked for a command.')
+ continue
+
+ # chan.send('\r\n\r\nWelcome!\r\n\r\n')
+ print(server.encoding)
+ try:
+ banner_encoded = banner.encode(server.encoding)
+ except (ValueError, LookupError):
+ continue
+
+ chan.send(banner_encoded)
+ if username == 'bar':
+ msg = chan.recv(1024)
+ chan.send(msg)
+ elif username == 'foo':
+ lst = []
+ while True:
+ msg = chan.recv(32 * 1024)
+ lst.append(msg)
+ if msg.endswith(b'\r\n\r\n'):
+ break
+ data = b''.join(lst)
+ while data:
+ s = chan.send(data)
+ data = data[s:]
+ else:
+ chan.close()
+ t.close()
+ client.close()
+
+ try:
+ sock.close()
+ except Exception:
+ pass
+
+
+if __name__ == '__main__':
+ run_ssh_server()
diff --git a/tests/test_app.py b/tests/test_app.py
new file mode 100644
index 0000000..bd31b5f
--- /dev/null
+++ b/tests/test_app.py
@@ -0,0 +1,792 @@
+import json
+import random
+import threading
+import tornado.websocket
+import tornado.gen
+
+from tornado.testing import AsyncHTTPTestCase
+from tornado.httpclient import HTTPError
+from tornado.options import options
+from tests.sshserver import run_ssh_server, banner, Server
+from tests.utils import encode_multipart_formdata, read_file, make_tests_data_path # noqa
+from webssh import handler
+from webssh.main import make_app, make_handlers
+from webssh.settings import (
+ get_app_settings, get_server_settings, max_body_size
+)
+from webssh.utils import to_str
+from webssh.worker import clients
+
+try:
+ from urllib.parse import urlencode
+except ImportError:
+ from urllib import urlencode
+
+
+swallow_http_errors = handler.swallow_http_errors
+server_encodings = {e.strip() for e in Server.encodings}
+
+
+class TestAppBase(AsyncHTTPTestCase):
+
+ def get_httpserver_options(self):
+ return get_server_settings(options)
+
+ def assert_response(self, bstr, response):
+ if swallow_http_errors:
+ self.assertEqual(response.code, 200)
+ self.assertIn(bstr, response.body)
+ else:
+ self.assertEqual(response.code, 400)
+ self.assertIn(b'Bad Request', response.body)
+
+ def assert_status_in(self, status, data):
+ self.assertIsNone(data['encoding'])
+ self.assertIsNone(data['id'])
+ self.assertIn(status, data['status'])
+
+ def assert_status_equal(self, status, data):
+ self.assertIsNone(data['encoding'])
+ self.assertIsNone(data['id'])
+ self.assertEqual(status, data['status'])
+
+ def assert_status_none(self, data):
+ self.assertIsNotNone(data['encoding'])
+ self.assertIsNotNone(data['id'])
+ self.assertIsNone(data['status'])
+
+ def fetch_request(self, url, method='GET', body='', headers={}, sync=True):
+ if not sync and url.startswith('/'):
+ url = self.get_url(url)
+
+ if isinstance(body, dict):
+ body = urlencode(body)
+
+ if not headers:
+ headers = self.headers
+ else:
+ headers.update(self.headers)
+
+ client = self if sync else self.get_http_client()
+ return client.fetch(url, method=method, body=body, headers=headers)
+
+ def sync_post(self, url, body, headers={}):
+ return self.fetch_request(url, 'POST', body, headers)
+
+ def async_post(self, url, body, headers={}):
+ return self.fetch_request(url, 'POST', body, headers, sync=False)
+
+
+class TestAppBasic(TestAppBase):
+
+ running = [True]
+ sshserver_port = 2200
+ body = 'hostname=127.0.0.1&port={}&_xsrf=yummy&username=robey&password=foo'.format(sshserver_port) # noqa
+ headers = {'Cookie': '_xsrf=yummy'}
+
+ def get_app(self):
+ self.body_dict = {
+ 'hostname': '127.0.0.1',
+ 'port': str(self.sshserver_port),
+ 'username': 'robey',
+ 'password': '',
+ '_xsrf': 'yummy'
+ }
+ loop = self.io_loop
+ options.debug = False
+ options.policy = random.choice(['warning', 'autoadd'])
+ options.hostfile = ''
+ options.syshostfile = ''
+ options.tdstream = ''
+ options.delay = 0.1
+ app = make_app(make_handlers(loop, options), get_app_settings(options))
+ return app
+
+ @classmethod
+ def setUpClass(cls):
+ print('='*20)
+ t = threading.Thread(
+ target=run_ssh_server, args=(cls.sshserver_port, cls.running)
+ )
+ t.setDaemon(True)
+ t.start()
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.running.pop()
+ print('='*20)
+
+ def test_app_with_invalid_form_for_missing_argument(self):
+ response = self.fetch('/')
+ self.assertEqual(response.code, 200)
+
+ body = 'port=7000&username=admin&password&_xsrf=yummy'
+ response = self.sync_post('/', body)
+ self.assert_response(b'Missing argument hostname', response)
+
+ body = 'hostname=127.0.0.1&port=7000&password&_xsrf=yummy'
+ response = self.sync_post('/', body)
+ self.assert_response(b'Missing argument username', response)
+
+ body = 'hostname=&port=&username=&password&_xsrf=yummy'
+ response = self.sync_post('/', body)
+ self.assert_response(b'Missing value hostname', response)
+
+ body = 'hostname=127.0.0.1&port=7000&username=&password&_xsrf=yummy'
+ response = self.sync_post('/', body)
+ self.assert_response(b'Missing value username', response)
+
+ def test_app_with_invalid_form_for_invalid_value(self):
+ body = 'hostname=127.0.0&port=22&username=&password&_xsrf=yummy'
+ response = self.sync_post('/', body)
+ self.assert_response(b'Invalid hostname', response)
+
+ body = 'hostname=http://www.googe.com&port=22&username=&password&_xsrf=yummy' # noqa
+ response = self.sync_post('/', body)
+ self.assert_response(b'Invalid hostname', response)
+
+ body = 'hostname=127.0.0.1&port=port&username=&password&_xsrf=yummy'
+ response = self.sync_post('/', body)
+ self.assert_response(b'Invalid port', response)
+
+ body = 'hostname=127.0.0.1&port=70000&username=&password&_xsrf=yummy'
+ response = self.sync_post('/', body)
+ self.assert_response(b'Invalid port', response)
+
+ def test_app_with_wrong_hostname_ip(self):
+ body = 'hostname=127.0.0.2&port=2200&username=admin&_xsrf=yummy'
+ response = self.sync_post('/', body)
+ self.assertEqual(response.code, 200)
+ self.assertIn(b'Unable to connect to', response.body)
+
+ def test_app_with_wrong_hostname_domain(self):
+ body = 'hostname=xxxxxxxxxxxx&port=2200&username=admin&_xsrf=yummy'
+ response = self.sync_post('/', body)
+ self.assertEqual(response.code, 200)
+ self.assertIn(b'Unable to connect to', response.body)
+
+ def test_app_with_wrong_port(self):
+ body = 'hostname=127.0.0.1&port=7000&username=admin&_xsrf=yummy'
+ response = self.sync_post('/', body)
+ self.assertEqual(response.code, 200)
+ self.assertIn(b'Unable to connect to', response.body)
+
+ def test_app_with_wrong_credentials(self):
+ response = self.sync_post('/', self.body + 's')
+ self.assert_status_in('Authentication failed.', json.loads(to_str(response.body))) # noqa
+
+ def test_app_with_correct_credentials(self):
+ response = self.sync_post('/', self.body)
+ self.assert_status_none(json.loads(to_str(response.body)))
+
+ def test_app_with_correct_credentials_but_with_no_port(self):
+ default_port = handler.DEFAULT_PORT
+ handler.DEFAULT_PORT = self.sshserver_port
+
+ # with no port value
+ body = self.body.replace(str(self.sshserver_port), '')
+ response = self.sync_post('/', body)
+ self.assert_status_none(json.loads(to_str(response.body)))
+
+ # with no port argument
+ body = body.replace('port=&', '')
+ response = self.sync_post('/', body)
+ self.assert_status_none(json.loads(to_str(response.body)))
+
+ handler.DEFAULT_PORT = default_port
+
+ @tornado.testing.gen_test
+ def test_app_with_correct_credentials_timeout(self):
+ url = self.get_url('/')
+ response = yield self.async_post(url, self.body)
+ data = json.loads(to_str(response.body))
+ self.assert_status_none(data)
+
+ url = url.replace('http', 'ws')
+ ws_url = url + 'ws?id=' + data['id']
+ yield tornado.gen.sleep(options.delay + 0.1)
+ ws = yield tornado.websocket.websocket_connect(ws_url)
+ msg = yield ws.read_message()
+ self.assertIsNone(msg)
+ self.assertEqual(ws.close_reason, 'Websocket authentication failed.')
+
+ @tornado.testing.gen_test
+ def test_app_with_correct_credentials_but_ip_not_matched(self):
+ url = self.get_url('/')
+ response = yield self.async_post(url, self.body)
+ data = json.loads(to_str(response.body))
+ self.assert_status_none(data)
+
+ clients = handler.clients
+ handler.clients = {}
+ url = url.replace('http', 'ws')
+ ws_url = url + 'ws?id=' + data['id']
+ ws = yield tornado.websocket.websocket_connect(ws_url)
+ msg = yield ws.read_message()
+ self.assertIsNone(msg)
+ self.assertEqual(ws.close_reason, 'Websocket authentication failed.')
+ handler.clients = clients
+
+ @tornado.testing.gen_test
+ def test_app_with_correct_credentials_user_robey(self):
+ url = self.get_url('/')
+ response = yield self.async_post(url, self.body)
+ data = json.loads(to_str(response.body))
+ self.assert_status_none(data)
+
+ url = url.replace('http', 'ws')
+ ws_url = url + 'ws?id=' + data['id']
+ ws = yield tornado.websocket.websocket_connect(ws_url)
+ msg = yield ws.read_message()
+ self.assertEqual(to_str(msg, data['encoding']), banner)
+ ws.close()
+
+ @tornado.testing.gen_test
+ def test_app_with_correct_credentials_but_without_id_argument(self):
+ url = self.get_url('/')
+ response = yield self.async_post(url, self.body)
+ data = json.loads(to_str(response.body))
+ self.assert_status_none(data)
+
+ url = url.replace('http', 'ws')
+ ws_url = url + 'ws'
+ ws = yield tornado.websocket.websocket_connect(ws_url)
+ msg = yield ws.read_message()
+ self.assertIsNone(msg)
+ self.assertIn('Missing argument id', ws.close_reason)
+
+ @tornado.testing.gen_test
+ def test_app_with_correct_credentials_but_empty_id(self):
+ url = self.get_url('/')
+ response = yield self.async_post(url, self.body)
+ data = json.loads(to_str(response.body))
+ self.assert_status_none(data)
+
+ url = url.replace('http', 'ws')
+ ws_url = url + 'ws?id='
+ ws = yield tornado.websocket.websocket_connect(ws_url)
+ msg = yield ws.read_message()
+ self.assertIsNone(msg)
+ self.assertIn('Missing value id', ws.close_reason)
+
+ @tornado.testing.gen_test
+ def test_app_with_correct_credentials_but_wrong_id(self):
+ url = self.get_url('/')
+ response = yield self.async_post(url, self.body)
+ data = json.loads(to_str(response.body))
+ self.assert_status_none(data)
+
+ url = url.replace('http', 'ws')
+ ws_url = url + 'ws?id=1' + data['id']
+ ws = yield tornado.websocket.websocket_connect(ws_url)
+ msg = yield ws.read_message()
+ self.assertIsNone(msg)
+ self.assertIn('Websocket authentication failed', ws.close_reason)
+
+ @tornado.testing.gen_test
+ def test_app_with_correct_credentials_user_bar(self):
+ body = self.body.replace('robey', 'bar')
+ url = self.get_url('/')
+ response = yield self.async_post(url, body)
+ data = json.loads(to_str(response.body))
+ self.assert_status_none(data)
+
+ url = url.replace('http', 'ws')
+ ws_url = url + 'ws?id=' + data['id']
+ ws = yield tornado.websocket.websocket_connect(ws_url)
+ msg = yield ws.read_message()
+ self.assertEqual(to_str(msg, data['encoding']), banner)
+
+ # messages below will be ignored silently
+ yield ws.write_message('hello')
+ yield ws.write_message('"hello"')
+ yield ws.write_message('[hello]')
+ yield ws.write_message(json.dumps({'resize': []}))
+ yield ws.write_message(json.dumps({'resize': {}}))
+ yield ws.write_message(json.dumps({'resize': 'ab'}))
+ yield ws.write_message(json.dumps({'resize': ['a', 'b']}))
+ yield ws.write_message(json.dumps({'resize': {'a': 1, 'b': 2}}))
+ yield ws.write_message(json.dumps({'resize': [100]}))
+ yield ws.write_message(json.dumps({'resize': [100]*10}))
+ yield ws.write_message(json.dumps({'resize': [-1, -1]}))
+ yield ws.write_message(json.dumps({'data': [1]}))
+ yield ws.write_message(json.dumps({'data': (1,)}))
+ yield ws.write_message(json.dumps({'data': {'a': 2}}))
+ yield ws.write_message(json.dumps({'data': 1}))
+ yield ws.write_message(json.dumps({'data': 2.1}))
+ yield ws.write_message(json.dumps({'key-non-existed': 'hello'}))
+ # end - those just for testing webssh websocket stablity
+
+ yield ws.write_message(json.dumps({'resize': [79, 23]}))
+ msg = yield ws.read_message()
+ self.assertEqual(b'resized', msg)
+
+ yield ws.write_message(json.dumps({'data': 'bye'}))
+ msg = yield ws.read_message()
+ self.assertEqual(b'bye', msg)
+ ws.close()
+
+ @tornado.testing.gen_test
+ def test_app_auth_with_valid_pubkey_by_urlencoded_form(self):
+ url = self.get_url('/')
+ privatekey = read_file(make_tests_data_path('user_rsa_key'))
+ self.body_dict.update(privatekey=privatekey)
+ response = yield self.async_post(url, self.body_dict)
+ data = json.loads(to_str(response.body))
+ self.assert_status_none(data)
+
+ url = url.replace('http', 'ws')
+ ws_url = url + 'ws?id=' + data['id']
+ ws = yield tornado.websocket.websocket_connect(ws_url)
+ msg = yield ws.read_message()
+ self.assertEqual(to_str(msg, data['encoding']), banner)
+ ws.close()
+
+ @tornado.testing.gen_test
+ def test_app_auth_with_valid_pubkey_by_multipart_form(self):
+ url = self.get_url('/')
+ privatekey = read_file(make_tests_data_path('user_rsa_key'))
+ files = [('privatekey', 'user_rsa_key', privatekey)]
+ content_type, body = encode_multipart_formdata(self.body_dict.items(),
+ files)
+ headers = {
+ 'Content-Type': content_type, 'content-length': str(len(body))
+ }
+ response = yield self.async_post(url, body, headers=headers)
+ data = json.loads(to_str(response.body))
+ self.assert_status_none(data)
+
+ url = url.replace('http', 'ws')
+ ws_url = url + 'ws?id=' + data['id']
+ ws = yield tornado.websocket.websocket_connect(ws_url)
+ msg = yield ws.read_message()
+ self.assertEqual(to_str(msg, data['encoding']), banner)
+ ws.close()
+
+ @tornado.testing.gen_test
+ def test_app_auth_with_invalid_pubkey_for_user_robey(self):
+ url = self.get_url('/')
+ privatekey = 'h' * 1024
+ files = [('privatekey', 'user_rsa_key', privatekey)]
+ content_type, body = encode_multipart_formdata(self.body_dict.items(),
+ files)
+ headers = {
+ 'Content-Type': content_type, 'content-length': str(len(body))
+ }
+
+ if swallow_http_errors:
+ response = yield self.async_post(url, body, headers=headers)
+ self.assertIn(b'Invalid key', response.body)
+ else:
+ with self.assertRaises(HTTPError) as ctx:
+ yield self.async_post(url, body, headers=headers)
+ self.assertIn('Bad Request', ctx.exception.message)
+
+ @tornado.testing.gen_test
+ def test_app_auth_with_pubkey_exceeds_key_max_size(self):
+ url = self.get_url('/')
+ privatekey = 'h' * (handler.PrivateKey.max_length + 1)
+ files = [('privatekey', 'user_rsa_key', privatekey)]
+ content_type, body = encode_multipart_formdata(self.body_dict.items(),
+ files)
+ headers = {
+ 'Content-Type': content_type, 'content-length': str(len(body))
+ }
+ if swallow_http_errors:
+ response = yield self.async_post(url, body, headers=headers)
+ self.assertIn(b'Invalid key', response.body)
+ else:
+ with self.assertRaises(HTTPError) as ctx:
+ yield self.async_post(url, body, headers=headers)
+ self.assertIn('Bad Request', ctx.exception.message)
+
+ @tornado.testing.gen_test
+ def test_app_auth_with_pubkey_cannot_be_decoded_by_multipart_form(self):
+ url = self.get_url('/')
+ privatekey = 'h' * 1024
+ files = [('privatekey', 'user_rsa_key', privatekey)]
+ content_type, body = encode_multipart_formdata(self.body_dict.items(),
+ files)
+ body = body.encode('utf-8')
+ # added some gbk bytes to the privatekey, make it cannot be decoded
+ body = body[:-100] + b'\xb4\xed\xce\xf3' + body[-100:]
+ headers = {
+ 'Content-Type': content_type, 'content-length': str(len(body))
+ }
+ if swallow_http_errors:
+ response = yield self.async_post(url, body, headers=headers)
+ self.assertIn(b'Invalid unicode', response.body)
+ else:
+ with self.assertRaises(HTTPError) as ctx:
+ yield self.async_post(url, body, headers=headers)
+ self.assertIn('Bad Request', ctx.exception.message)
+
+ def test_app_post_form_with_large_body_size_by_multipart_form(self):
+ privatekey = 'h' * (2 * max_body_size)
+ files = [('privatekey', 'user_rsa_key', privatekey)]
+ content_type, body = encode_multipart_formdata(self.body_dict.items(),
+ files)
+ headers = {
+ 'Content-Type': content_type, 'content-length': str(len(body))
+ }
+ response = self.sync_post('/', body, headers=headers)
+ self.assertIn(response.code, [400, 599])
+
+ def test_app_post_form_with_large_body_size_by_urlencoded_form(self):
+ privatekey = 'h' * (2 * max_body_size)
+ body = self.body + '&privatekey=' + privatekey
+ response = self.sync_post('/', body)
+ self.assertIn(response.code, [400, 599])
+
+ @tornado.testing.gen_test
+ def test_app_with_user_keyonly_for_bad_authentication_type(self):
+ self.body_dict.update(username='keyonly', password='foo')
+ response = yield self.async_post('/', self.body_dict)
+ self.assertEqual(response.code, 200)
+ self.assert_status_in('Bad authentication type', json.loads(to_str(response.body))) # noqa
+
+ @tornado.testing.gen_test
+ def test_app_with_user_pass2fa_with_correct_passwords(self):
+ self.body_dict.update(username='pass2fa', password='password',
+ totp='passcode')
+ response = yield self.async_post('/', self.body_dict)
+ self.assertEqual(response.code, 200)
+ data = json.loads(to_str(response.body))
+ self.assert_status_none(data)
+
+ @tornado.testing.gen_test
+ def test_app_with_user_pass2fa_with_wrong_pkey_correct_passwords(self):
+ url = self.get_url('/')
+ privatekey = read_file(make_tests_data_path('user_rsa_key'))
+ self.body_dict.update(username='pass2fa', password='password',
+ privatekey=privatekey, totp='passcode')
+ response = yield self.async_post(url, self.body_dict)
+ data = json.loads(to_str(response.body))
+ self.assert_status_none(data)
+
+ @tornado.testing.gen_test
+ def test_app_with_user_pkey2fa_with_correct_passwords(self):
+ url = self.get_url('/')
+ privatekey = read_file(make_tests_data_path('user_rsa_key'))
+ self.body_dict.update(username='pkey2fa', password='password',
+ privatekey=privatekey, totp='passcode')
+ response = yield self.async_post(url, self.body_dict)
+ data = json.loads(to_str(response.body))
+ self.assert_status_none(data)
+
+ @tornado.testing.gen_test
+ def test_app_with_user_pkey2fa_with_wrong_password(self):
+ url = self.get_url('/')
+ privatekey = read_file(make_tests_data_path('user_rsa_key'))
+ self.body_dict.update(username='pkey2fa', password='wrongpassword',
+ privatekey=privatekey, totp='passcode')
+ response = yield self.async_post(url, self.body_dict)
+ data = json.loads(to_str(response.body))
+ self.assert_status_in('Authentication failed', data)
+
+ @tornado.testing.gen_test
+ def test_app_with_user_pkey2fa_with_wrong_passcode(self):
+ url = self.get_url('/')
+ privatekey = read_file(make_tests_data_path('user_rsa_key'))
+ self.body_dict.update(username='pkey2fa', password='password',
+ privatekey=privatekey, totp='wrongpasscode')
+ response = yield self.async_post(url, self.body_dict)
+ data = json.loads(to_str(response.body))
+ self.assert_status_in('Authentication failed', data)
+
+ @tornado.testing.gen_test
+ def test_app_with_user_pkey2fa_with_empty_passcode(self):
+ url = self.get_url('/')
+ privatekey = read_file(make_tests_data_path('user_rsa_key'))
+ self.body_dict.update(username='pkey2fa', password='password',
+ privatekey=privatekey, totp='')
+ response = yield self.async_post(url, self.body_dict)
+ data = json.loads(to_str(response.body))
+ self.assert_status_in('Need a verification code', data)
+
+
+class OtherTestBase(TestAppBase):
+ sshserver_port = 3300
+ headers = {'Cookie': '_xsrf=yummy'}
+ debug = False
+ policy = None
+ xsrf = True
+ hostfile = ''
+ syshostfile = ''
+ tdstream = ''
+ maxconn = 20
+ origin = 'same'
+ encodings = []
+ body = {
+ 'hostname': '127.0.0.1',
+ 'port': '',
+ 'username': 'robey',
+ 'password': 'foo',
+ '_xsrf': 'yummy'
+ }
+
+ def get_app(self):
+ self.body.update(port=str(self.sshserver_port))
+ loop = self.io_loop
+ options.debug = self.debug
+ options.xsrf = self.xsrf
+ options.policy = self.policy if self.policy else random.choice(['warning', 'autoadd']) # noqa
+ options.hostfile = self.hostfile
+ options.syshostfile = self.syshostfile
+ options.tdstream = self.tdstream
+ options.maxconn = self.maxconn
+ options.origin = self.origin
+ app = make_app(make_handlers(loop, options), get_app_settings(options))
+ return app
+
+ def setUp(self):
+ print('='*20)
+ self.running = True
+ OtherTestBase.sshserver_port += 1
+
+ t = threading.Thread(
+ target=run_ssh_server,
+ args=(self.sshserver_port, self.running, self.encodings)
+ )
+ t.setDaemon(True)
+ t.start()
+ super(OtherTestBase, self).setUp()
+
+ def tearDown(self):
+ self.running = False
+ print('='*20)
+ super(OtherTestBase, self).tearDown()
+
+
+class TestAppInDebugMode(OtherTestBase):
+
+ debug = True
+
+ def assert_response(self, bstr, response):
+ if swallow_http_errors:
+ self.assertEqual(response.code, 200)
+ self.assertIn(bstr, response.body)
+ else:
+ self.assertEqual(response.code, 500)
+ self.assertIn(b'Uncaught exception', response.body)
+
+ def test_server_error_for_post_method(self):
+ body = dict(self.body, error='raise')
+ response = self.sync_post('/', body)
+ self.assert_response(b'"status": "Internal Server Error"', response)
+
+ def test_html(self):
+ response = self.fetch('/', method='GET')
+ self.assertIn(b'novalidate>', response.body)
+
+
+class TestAppWithLargeBuffer(OtherTestBase):
+
+ @tornado.testing.gen_test
+ def test_app_for_sending_message_with_large_size(self):
+ url = self.get_url('/')
+ response = yield self.async_post(url, dict(self.body, username='foo'))
+ data = json.loads(to_str(response.body))
+ self.assert_status_none(data)
+
+ url = url.replace('http', 'ws')
+ ws_url = url + 'ws?id=' + data['id']
+ ws = yield tornado.websocket.websocket_connect(ws_url)
+ msg = yield ws.read_message()
+ self.assertEqual(to_str(msg, data['encoding']), banner)
+
+ send = 'h' * (64 * 1024) + '\r\n\r\n'
+ yield ws.write_message(json.dumps({'data': send}))
+ lst = []
+ while True:
+ msg = yield ws.read_message()
+ lst.append(msg)
+ if msg.endswith(b'\r\n\r\n'):
+ break
+ recv = b''.join(lst).decode(data['encoding'])
+ self.assertEqual(send, recv)
+ ws.close()
+
+
+class TestAppWithRejectPolicy(OtherTestBase):
+
+ policy = 'reject'
+ hostfile = make_tests_data_path('known_hosts_example')
+
+ @tornado.testing.gen_test
+ def test_app_with_hostname_not_in_hostkeys(self):
+ response = yield self.async_post('/', self.body)
+ data = json.loads(to_str(response.body))
+ message = 'Connection to {}:{} is not allowed.'.format(self.body['hostname'], self.sshserver_port) # noqa
+ self.assertEqual(message, data['status'])
+
+
+class TestAppWithBadHostKey(OtherTestBase):
+
+ policy = random.choice(['warning', 'autoadd', 'reject'])
+ hostfile = make_tests_data_path('test_known_hosts')
+
+ def setUp(self):
+ self.sshserver_port = 2222
+ super(TestAppWithBadHostKey, self).setUp()
+
+ @tornado.testing.gen_test
+ def test_app_with_bad_host_key(self):
+ response = yield self.async_post('/', self.body)
+ data = json.loads(to_str(response.body))
+ self.assertEqual('Bad host key.', data['status'])
+
+
+class TestAppWithTrustedStream(OtherTestBase):
+ tdstream = '127.0.0.2'
+
+ def test_with_forbidden_get_request(self):
+ response = self.fetch('/', method='GET')
+ self.assertEqual(response.code, 403)
+ self.assertIn('Forbidden', response.error.message)
+
+ def test_with_forbidden_post_request(self):
+ response = self.sync_post('/', self.body)
+ self.assertEqual(response.code, 403)
+ self.assertIn('Forbidden', response.error.message)
+
+ def test_with_forbidden_put_request(self):
+ response = self.fetch_request('/', method='PUT', body=self.body)
+ self.assertEqual(response.code, 403)
+ self.assertIn('Forbidden', response.error.message)
+
+
+class TestAppNotFoundHandler(OtherTestBase):
+
+ custom_headers = handler.MixinHandler.custom_headers
+
+ def test_with_not_found_get_request(self):
+ response = self.fetch('/pathnotfound', method='GET')
+ self.assertEqual(response.code, 404)
+ self.assertEqual(
+ response.headers['Server'], self.custom_headers['Server']
+ )
+ self.assertIn(b'404: Not Found', response.body)
+
+ def test_with_not_found_post_request(self):
+ response = self.sync_post('/pathnotfound', self.body)
+ self.assertEqual(response.code, 404)
+ self.assertEqual(
+ response.headers['Server'], self.custom_headers['Server']
+ )
+ self.assertIn(b'404: Not Found', response.body)
+
+ def test_with_not_found_put_request(self):
+ response = self.fetch_request('/pathnotfound', method='PUT',
+ body=self.body)
+ self.assertEqual(response.code, 404)
+ self.assertEqual(
+ response.headers['Server'], self.custom_headers['Server']
+ )
+ self.assertIn(b'404: Not Found', response.body)
+
+
+class TestAppWithHeadRequest(OtherTestBase):
+
+ def test_with_index_path(self):
+ response = self.fetch('/', method='HEAD')
+ self.assertEqual(response.code, 200)
+
+ def test_with_ws_path(self):
+ response = self.fetch('/ws', method='HEAD')
+ self.assertEqual(response.code, 405)
+
+ def test_with_not_found_path(self):
+ response = self.fetch('/notfound', method='HEAD')
+ self.assertEqual(response.code, 404)
+
+
+class TestAppWithPutRequest(OtherTestBase):
+
+ xsrf = False
+
+ @tornado.testing.gen_test
+ def test_app_with_method_not_supported(self):
+ with self.assertRaises(HTTPError) as ctx:
+ yield self.fetch_request('/', 'PUT', self.body, sync=False)
+ self.assertIn('Method Not Allowed', ctx.exception.message)
+
+
+class TestAppWithTooManyConnections(OtherTestBase):
+
+ maxconn = 1
+
+ def setUp(self):
+ clients.clear()
+ super(TestAppWithTooManyConnections, self).setUp()
+
+ @tornado.testing.gen_test
+ def test_app_with_too_many_connections(self):
+ clients['127.0.0.1'] = {'fake_worker_id': None}
+
+ url = self.get_url('/')
+ response = yield self.async_post(url, self.body)
+ data = json.loads(to_str(response.body))
+ self.assertEqual('Too many live connections.', data['status'])
+
+ clients['127.0.0.1'].clear()
+ response = yield self.async_post(url, self.body)
+ self.assert_status_none(json.loads(to_str(response.body)))
+
+
+class TestAppWithCrossOriginOperation(OtherTestBase):
+
+ origin = 'http://www.example.com'
+
+ @tornado.testing.gen_test
+ def test_app_with_wrong_event_origin(self):
+ body = dict(self.body, _origin='localhost')
+ response = yield self.async_post('/', body)
+ self.assert_status_equal('Cross origin operation is not allowed.', json.loads(to_str(response.body))) # noqa
+
+ @tornado.testing.gen_test
+ def test_app_with_wrong_header_origin(self):
+ headers = dict(Origin='localhost')
+ response = yield self.async_post('/', self.body, headers=headers)
+ self.assert_status_equal('Cross origin operation is not allowed.', json.loads(to_str(response.body)), ) # noqa
+
+ @tornado.testing.gen_test
+ def test_app_with_correct_event_origin(self):
+ body = dict(self.body, _origin=self.origin)
+ response = yield self.async_post('/', body)
+ self.assert_status_none(json.loads(to_str(response.body)))
+ self.assertIsNone(response.headers.get('Access-Control-Allow-Origin'))
+
+ @tornado.testing.gen_test
+ def test_app_with_correct_header_origin(self):
+ headers = dict(Origin=self.origin)
+ response = yield self.async_post('/', self.body, headers=headers)
+ self.assert_status_none(json.loads(to_str(response.body)))
+ self.assertEqual(
+ response.headers.get('Access-Control-Allow-Origin'), self.origin
+ )
+
+
+class TestAppWithBadEncoding(OtherTestBase):
+
+ encodings = [u'\u7f16\u7801']
+
+ @tornado.testing.gen_test
+ def test_app_with_a_bad_encoding(self):
+ response = yield self.async_post('/', self.body)
+ dic = json.loads(to_str(response.body))
+ self.assert_status_none(dic)
+ self.assertIn(dic['encoding'], server_encodings)
+
+
+class TestAppWithUnknownEncoding(OtherTestBase):
+
+ encodings = [u'\u7f16\u7801', u'UnknownEncoding']
+
+ @tornado.testing.gen_test
+ def test_app_with_a_unknown_encoding(self):
+ response = yield self.async_post('/', self.body)
+ self.assert_status_none(json.loads(to_str(response.body)))
+ dic = json.loads(to_str(response.body))
+ self.assert_status_none(dic)
+ self.assertEqual(dic['encoding'], 'utf-8')
diff --git a/tests/test_handler.py b/tests/test_handler.py
new file mode 100644
index 0000000..020847d
--- /dev/null
+++ b/tests/test_handler.py
@@ -0,0 +1,279 @@
+import unittest
+import paramiko
+
+from tornado.httputil import HTTPServerRequest
+from tornado.options import options
+from tests.utils import read_file, make_tests_data_path
+from webssh import handler
+from webssh.handler import (
+ MixinHandler, WsockHandler, PrivateKey, InvalidValueError
+)
+
+try:
+ from unittest.mock import Mock
+except ImportError:
+ from mock import Mock
+
+
+class TestMixinHandler(unittest.TestCase):
+
+ def test_is_forbidden(self):
+ mhandler = MixinHandler()
+ handler.redirecting = True
+ options.fbidhttp = True
+
+ context = Mock(
+ address=('8.8.8.8', 8888),
+ trusted_downstream=['127.0.0.1'],
+ _orig_protocol='http'
+ )
+ hostname = '4.4.4.4'
+ self.assertTrue(mhandler.is_forbidden(context, hostname))
+
+ context = Mock(
+ address=('8.8.8.8', 8888),
+ trusted_downstream=[],
+ _orig_protocol='http'
+ )
+ hostname = 'www.google.com'
+ self.assertEqual(mhandler.is_forbidden(context, hostname), False)
+
+ context = Mock(
+ address=('8.8.8.8', 8888),
+ trusted_downstream=[],
+ _orig_protocol='http'
+ )
+ hostname = '4.4.4.4'
+ self.assertTrue(mhandler.is_forbidden(context, hostname))
+
+ context = Mock(
+ address=('192.168.1.1', 8888),
+ trusted_downstream=[],
+ _orig_protocol='http'
+ )
+ hostname = 'www.google.com'
+ self.assertIsNone(mhandler.is_forbidden(context, hostname))
+
+ options.fbidhttp = False
+ self.assertIsNone(mhandler.is_forbidden(context, hostname))
+
+ hostname = '4.4.4.4'
+ self.assertIsNone(mhandler.is_forbidden(context, hostname))
+
+ handler.redirecting = False
+ self.assertIsNone(mhandler.is_forbidden(context, hostname))
+
+ context._orig_protocol = 'https'
+ self.assertIsNone(mhandler.is_forbidden(context, hostname))
+
+ def test_get_redirect_url(self):
+ mhandler = MixinHandler()
+ hostname = 'www.example.com'
+ uri = '/'
+ port = 443
+
+ self.assertEqual(
+ mhandler.get_redirect_url(hostname, port, uri=uri),
+ 'https://www.example.com/'
+ )
+
+ port = 4433
+ self.assertEqual(
+ mhandler.get_redirect_url(hostname, port, uri),
+ 'https://www.example.com:4433/'
+ )
+
+ def test_get_client_addr(self):
+ mhandler = MixinHandler()
+ client_addr = ('8.8.8.8', 8888)
+ context_addr = ('127.0.0.1', 1234)
+ options.xheaders = True
+
+ mhandler.context = Mock(address=context_addr)
+ mhandler.get_real_client_addr = lambda: None
+ self.assertEqual(mhandler.get_client_addr(), context_addr)
+
+ mhandler.context = Mock(address=context_addr)
+ mhandler.get_real_client_addr = lambda: client_addr
+ self.assertEqual(mhandler.get_client_addr(), client_addr)
+
+ options.xheaders = False
+ mhandler.context = Mock(address=context_addr)
+ mhandler.get_real_client_addr = lambda: client_addr
+ self.assertEqual(mhandler.get_client_addr(), context_addr)
+
+ def test_get_real_client_addr(self):
+ x_forwarded_for = '1.1.1.1'
+ x_forwarded_port = 1111
+ x_real_ip = '2.2.2.2'
+ x_real_port = 2222
+ fake_port = 65535
+
+ mhandler = MixinHandler()
+ mhandler.request = HTTPServerRequest(uri='/')
+ mhandler.request.remote_ip = x_forwarded_for
+
+ self.assertIsNone(mhandler.get_real_client_addr())
+
+ mhandler.request.headers.add('X-Forwarded-For', x_forwarded_for)
+ self.assertEqual(mhandler.get_real_client_addr(),
+ (x_forwarded_for, fake_port))
+
+ mhandler.request.headers.add('X-Forwarded-Port', fake_port + 1)
+ self.assertEqual(mhandler.get_real_client_addr(),
+ (x_forwarded_for, fake_port))
+
+ mhandler.request.headers['X-Forwarded-Port'] = x_forwarded_port
+ self.assertEqual(mhandler.get_real_client_addr(),
+ (x_forwarded_for, x_forwarded_port))
+
+ mhandler.request.remote_ip = x_real_ip
+
+ mhandler.request.headers.add('X-Real-Ip', x_real_ip)
+ self.assertEqual(mhandler.get_real_client_addr(),
+ (x_real_ip, fake_port))
+
+ mhandler.request.headers.add('X-Real-Port', fake_port + 1)
+ self.assertEqual(mhandler.get_real_client_addr(),
+ (x_real_ip, fake_port))
+
+ mhandler.request.headers['X-Real-Port'] = x_real_port
+ self.assertEqual(mhandler.get_real_client_addr(),
+ (x_real_ip, x_real_port))
+
+
+class TestPrivateKey(unittest.TestCase):
+
+ def get_pk_obj(self, fname, password=None):
+ key = read_file(make_tests_data_path(fname))
+ return PrivateKey(key, password=password, filename=fname)
+
+ def _test_with_encrypted_key(self, fname, password, klass):
+ pk = self.get_pk_obj(fname, password='')
+ with self.assertRaises(InvalidValueError) as ctx:
+ pk.get_pkey_obj()
+ self.assertIn('Need a passphrase', str(ctx.exception))
+
+ pk = self.get_pk_obj(fname, password='wrongpass')
+ with self.assertRaises(InvalidValueError) as ctx:
+ pk.get_pkey_obj()
+ self.assertIn('wrong passphrase', str(ctx.exception))
+
+ pk = self.get_pk_obj(fname, password=password)
+ self.assertIsInstance(pk.get_pkey_obj(), klass)
+
+ def test_class_with_invalid_key_length(self):
+ key = u'a' * (PrivateKey.max_length + 1)
+
+ with self.assertRaises(InvalidValueError) as ctx:
+ PrivateKey(key)
+ self.assertIn('Invalid key length', str(ctx.exception))
+
+ def test_get_pkey_obj_with_invalid_key(self):
+ key = u'a b c'
+ fname = 'abc'
+
+ pk = PrivateKey(key, filename=fname)
+ with self.assertRaises(InvalidValueError) as ctx:
+ pk.get_pkey_obj()
+ self.assertIn('Invalid key {}'.format(fname), str(ctx.exception))
+
+ def test_get_pkey_obj_with_plain_rsa_key(self):
+ pk = self.get_pk_obj('test_rsa.key')
+ self.assertIsInstance(pk.get_pkey_obj(), paramiko.RSAKey)
+
+ def test_get_pkey_obj_with_plain_ed25519_key(self):
+ pk = self.get_pk_obj('test_ed25519.key')
+ self.assertIsInstance(pk.get_pkey_obj(), paramiko.Ed25519Key)
+
+ def test_get_pkey_obj_with_encrypted_rsa_key(self):
+ fname = 'test_rsa_password.key'
+ password = 'television'
+ self._test_with_encrypted_key(fname, password, paramiko.RSAKey)
+
+ def test_get_pkey_obj_with_encrypted_ed25519_key(self):
+ fname = 'test_ed25519_password.key'
+ password = 'abc123'
+ self._test_with_encrypted_key(fname, password, paramiko.Ed25519Key)
+
+ def test_get_pkey_obj_with_encrypted_new_rsa_key(self):
+ fname = 'test_new_rsa_password.key'
+ password = '123456'
+ self._test_with_encrypted_key(fname, password, paramiko.RSAKey)
+
+ def test_get_pkey_obj_with_plain_new_dsa_key(self):
+ pk = self.get_pk_obj('test_new_dsa.key')
+ self.assertIsInstance(pk.get_pkey_obj(), paramiko.DSSKey)
+
+ def test_parse_name(self):
+ key = u'-----BEGIN PRIVATE KEY-----'
+ pk = PrivateKey(key)
+ name, _ = pk.parse_name(pk.iostr, pk.tag_to_name)
+ self.assertIsNone(name)
+
+ key = u'-----BEGIN xxx PRIVATE KEY-----'
+ pk = PrivateKey(key)
+ name, _ = pk.parse_name(pk.iostr, pk.tag_to_name)
+ self.assertIsNone(name)
+
+ key = u'-----BEGIN RSA PRIVATE KEY-----'
+ pk = PrivateKey(key)
+ name, _ = pk.parse_name(pk.iostr, pk.tag_to_name)
+ self.assertIsNone(name)
+
+ key = u'-----BEGIN RSA PRIVATE KEY-----'
+ pk = PrivateKey(key)
+ name, _ = pk.parse_name(pk.iostr, pk.tag_to_name)
+ self.assertIsNone(name)
+
+ key = u'-----BEGIN RSA PRIVATE KEY-----'
+ pk = PrivateKey(key)
+ name, _ = pk.parse_name(pk.iostr, pk.tag_to_name)
+ self.assertIsNone(name)
+
+ for tag, to_name in PrivateKey.tag_to_name.items():
+ key = u'-----BEGIN {} PRIVATE KEY----- \r\n'.format(tag)
+ pk = PrivateKey(key)
+ name, length = pk.parse_name(pk.iostr, pk.tag_to_name)
+ self.assertEqual(name, to_name)
+ self.assertEqual(length, len(key))
+
+
+class TestWsockHandler(unittest.TestCase):
+
+ def test_check_origin(self):
+ request = HTTPServerRequest(uri='/')
+ obj = Mock(spec=WsockHandler, request=request)
+
+ obj.origin_policy = 'same'
+ request.headers['Host'] = 'www.example.com:4433'
+ origin = 'https://www.example.com:4433'
+ self.assertTrue(WsockHandler.check_origin(obj, origin))
+
+ origin = 'https://www.example.com'
+ self.assertFalse(WsockHandler.check_origin(obj, origin))
+
+ obj.origin_policy = 'primary'
+ self.assertTrue(WsockHandler.check_origin(obj, origin))
+
+ origin = 'https://blog.example.com'
+ self.assertTrue(WsockHandler.check_origin(obj, origin))
+
+ origin = 'https://blog.example.org'
+ self.assertFalse(WsockHandler.check_origin(obj, origin))
+
+ origin = 'https://blog.example.org'
+ obj.origin_policy = {'https://blog.example.org'}
+ self.assertTrue(WsockHandler.check_origin(obj, origin))
+
+ origin = 'http://blog.example.org'
+ obj.origin_policy = {'http://blog.example.org'}
+ self.assertTrue(WsockHandler.check_origin(obj, origin))
+
+ origin = 'http://blog.example.org'
+ obj.origin_policy = {'https://blog.example.org'}
+ self.assertFalse(WsockHandler.check_origin(obj, origin))
+
+ obj.origin_policy = '*'
+ origin = 'https://blog.example.org'
+ self.assertTrue(WsockHandler.check_origin(obj, origin))
diff --git a/tests/test_main.py b/tests/test_main.py
new file mode 100644
index 0000000..6ed89fc
--- /dev/null
+++ b/tests/test_main.py
@@ -0,0 +1,22 @@
+import unittest
+
+from tornado.web import Application
+from webssh import handler
+from webssh.main import app_listen
+
+
+class TestMain(unittest.TestCase):
+
+ def test_app_listen(self):
+ app = Application()
+ app.listen = lambda x, y, **kwargs: 1
+
+ handler.redirecting = None
+ server_settings = dict()
+ app_listen(app, 80, '127.0.0.1', server_settings)
+ self.assertFalse(handler.redirecting)
+
+ handler.redirecting = None
+ server_settings = dict(ssl_options='enabled')
+ app_listen(app, 80, '127.0.0.1', server_settings)
+ self.assertTrue(handler.redirecting)
diff --git a/tests/test_policy.py b/tests/test_policy.py
new file mode 100644
index 0000000..45b5dd8
--- /dev/null
+++ b/tests/test_policy.py
@@ -0,0 +1,123 @@
+import os
+import unittest
+import paramiko
+
+from shutil import copyfile
+from paramiko.client import RejectPolicy, WarningPolicy
+from tests.utils import make_tests_data_path
+from webssh.policy import (
+ AutoAddPolicy, get_policy_dictionary, load_host_keys,
+ get_policy_class, check_policy_setting
+)
+
+
+class TestPolicy(unittest.TestCase):
+
+ def test_get_policy_dictionary(self):
+ classes = [AutoAddPolicy, RejectPolicy, WarningPolicy]
+ dic = get_policy_dictionary()
+ for cls in classes:
+ val = dic[cls.__name__.lower()]
+ self.assertIs(cls, val)
+
+ def test_load_host_keys(self):
+ path = '/path-not-exists'
+ host_keys = load_host_keys(path)
+ self.assertFalse(host_keys)
+
+ path = '/tmp'
+ host_keys = load_host_keys(path)
+ self.assertFalse(host_keys)
+
+ path = make_tests_data_path('known_hosts_example')
+ host_keys = load_host_keys(path)
+ self.assertEqual(host_keys, paramiko.hostkeys.HostKeys(path))
+
+ def test_get_policy_class(self):
+ keys = ['autoadd', 'reject', 'warning']
+ vals = [AutoAddPolicy, RejectPolicy, WarningPolicy]
+ for key, val in zip(keys, vals):
+ cls = get_policy_class(key)
+ self.assertIs(cls, val)
+
+ key = 'non-exists'
+ with self.assertRaises(ValueError):
+ get_policy_class(key)
+
+ def test_check_policy_setting(self):
+ host_keys_filename = make_tests_data_path('host_keys_test.db')
+ host_keys_settings = dict(
+ host_keys=paramiko.hostkeys.HostKeys(),
+ system_host_keys=paramiko.hostkeys.HostKeys(),
+ host_keys_filename=host_keys_filename
+ )
+
+ with self.assertRaises(ValueError):
+ check_policy_setting(RejectPolicy, host_keys_settings)
+
+ try:
+ os.unlink(host_keys_filename)
+ except OSError:
+ pass
+ check_policy_setting(AutoAddPolicy, host_keys_settings)
+ self.assertEqual(os.path.exists(host_keys_filename), True)
+
+ def test_is_missing_host_key(self):
+ client = paramiko.SSHClient()
+ file1 = make_tests_data_path('known_hosts_example')
+ file2 = make_tests_data_path('known_hosts_example2')
+ client.load_host_keys(file1)
+ client.load_system_host_keys(file2)
+
+ autoadd = AutoAddPolicy()
+ for f in [file1, file2]:
+ entry = paramiko.hostkeys.HostKeys(f)._entries[0]
+ hostname = entry.hostnames[0]
+ key = entry.key
+ self.assertIsNone(
+ autoadd.is_missing_host_key(client, hostname, key)
+ )
+
+ for f in [file1, file2]:
+ entry = paramiko.hostkeys.HostKeys(f)._entries[0]
+ hostname = entry.hostnames[0]
+ key = entry.key
+ key.get_name = lambda: 'unknown'
+ self.assertTrue(
+ autoadd.is_missing_host_key(client, hostname, key)
+ )
+ del key.get_name
+
+ for f in [file1, file2]:
+ entry = paramiko.hostkeys.HostKeys(f)._entries[0]
+ hostname = entry.hostnames[0][1:]
+ key = entry.key
+ self.assertTrue(
+ autoadd.is_missing_host_key(client, hostname, key)
+ )
+
+ file3 = make_tests_data_path('known_hosts_example3')
+ entry = paramiko.hostkeys.HostKeys(file3)._entries[0]
+ hostname = entry.hostnames[0]
+ key = entry.key
+ with self.assertRaises(paramiko.BadHostKeyException):
+ autoadd.is_missing_host_key(client, hostname, key)
+
+ def test_missing_host_key(self):
+ client = paramiko.SSHClient()
+ file1 = make_tests_data_path('known_hosts_example')
+ file2 = make_tests_data_path('known_hosts_example2')
+ filename = make_tests_data_path('known_hosts')
+ copyfile(file1, filename)
+ client.load_host_keys(filename)
+ n1 = len(client._host_keys)
+
+ autoadd = AutoAddPolicy()
+ entry = paramiko.hostkeys.HostKeys(file2)._entries[0]
+ hostname = entry.hostnames[0]
+ key = entry.key
+ autoadd.missing_host_key(client, hostname, key)
+ self.assertEqual(len(client._host_keys), n1 + 1)
+ self.assertEqual(paramiko.hostkeys.HostKeys(filename),
+ client._host_keys)
+ os.unlink(filename)
diff --git a/tests/test_settings.py b/tests/test_settings.py
new file mode 100644
index 0000000..bd0d509
--- /dev/null
+++ b/tests/test_settings.py
@@ -0,0 +1,187 @@
+import io
+import random
+import ssl
+import sys
+import os.path
+import unittest
+import paramiko
+import tornado.options as options
+
+from tests.utils import make_tests_data_path
+from webssh.policy import load_host_keys
+from webssh.settings import (
+ get_host_keys_settings, get_policy_setting, base_dir, get_font_filename,
+ get_ssl_context, get_trusted_downstream, get_origin_setting, print_version,
+ check_encoding_setting
+)
+from webssh.utils import UnicodeType
+from webssh._version import __version__
+
+
+class TestSettings(unittest.TestCase):
+
+ def test_print_version(self):
+ sys_stdout = sys.stdout
+ sys.stdout = io.StringIO() if UnicodeType == str else io.BytesIO()
+
+ self.assertEqual(print_version(False), None)
+ self.assertEqual(sys.stdout.getvalue(), '')
+
+ with self.assertRaises(SystemExit):
+ self.assertEqual(print_version(True), None)
+ self.assertEqual(sys.stdout.getvalue(), __version__ + '\n')
+
+ sys.stdout = sys_stdout
+
+ def test_get_host_keys_settings(self):
+ options.hostfile = ''
+ options.syshostfile = ''
+ dic = get_host_keys_settings(options)
+
+ filename = os.path.join(base_dir, 'known_hosts')
+ self.assertEqual(dic['host_keys'], load_host_keys(filename))
+ self.assertEqual(dic['host_keys_filename'], filename)
+ self.assertEqual(
+ dic['system_host_keys'],
+ load_host_keys(os.path.expanduser('~/.ssh/known_hosts'))
+ )
+
+ options.hostfile = make_tests_data_path('known_hosts_example')
+ options.syshostfile = make_tests_data_path('known_hosts_example2')
+ dic2 = get_host_keys_settings(options)
+ self.assertEqual(dic2['host_keys'], load_host_keys(options.hostfile))
+ self.assertEqual(dic2['host_keys_filename'], options.hostfile)
+ self.assertEqual(dic2['system_host_keys'],
+ load_host_keys(options.syshostfile))
+
+ def test_get_policy_setting(self):
+ options.policy = 'warning'
+ options.hostfile = ''
+ options.syshostfile = ''
+ settings = get_host_keys_settings(options)
+ instance = get_policy_setting(options, settings)
+ self.assertIsInstance(instance, paramiko.client.WarningPolicy)
+
+ options.policy = 'autoadd'
+ options.hostfile = ''
+ options.syshostfile = ''
+ settings = get_host_keys_settings(options)
+ instance = get_policy_setting(options, settings)
+ self.assertIsInstance(instance, paramiko.client.AutoAddPolicy)
+ os.unlink(settings['host_keys_filename'])
+
+ options.policy = 'reject'
+ options.hostfile = ''
+ options.syshostfile = ''
+ settings = get_host_keys_settings(options)
+ try:
+ instance = get_policy_setting(options, settings)
+ except ValueError:
+ self.assertFalse(
+ settings['host_keys'] and settings['system_host_keys']
+ )
+ else:
+ self.assertIsInstance(instance, paramiko.client.RejectPolicy)
+
+ def test_get_ssl_context(self):
+ options.certfile = ''
+ options.keyfile = ''
+ ssl_ctx = get_ssl_context(options)
+ self.assertIsNone(ssl_ctx)
+
+ options.certfile = 'provided'
+ options.keyfile = ''
+ with self.assertRaises(ValueError) as ctx:
+ ssl_ctx = get_ssl_context(options)
+ self.assertEqual('keyfile is not provided', str(ctx.exception))
+
+ options.certfile = ''
+ options.keyfile = 'provided'
+ with self.assertRaises(ValueError) as ctx:
+ ssl_ctx = get_ssl_context(options)
+ self.assertEqual('certfile is not provided', str(ctx.exception))
+
+ options.certfile = 'FileDoesNotExist'
+ options.keyfile = make_tests_data_path('cert.key')
+ with self.assertRaises(ValueError) as ctx:
+ ssl_ctx = get_ssl_context(options)
+ self.assertIn('does not exist', str(ctx.exception))
+
+ options.certfile = make_tests_data_path('cert.key')
+ options.keyfile = 'FileDoesNotExist'
+ with self.assertRaises(ValueError) as ctx:
+ ssl_ctx = get_ssl_context(options)
+ self.assertIn('does not exist', str(ctx.exception))
+
+ options.certfile = make_tests_data_path('cert.key')
+ options.keyfile = make_tests_data_path('cert.key')
+ with self.assertRaises(ssl.SSLError) as ctx:
+ ssl_ctx = get_ssl_context(options)
+
+ options.certfile = make_tests_data_path('cert.crt')
+ options.keyfile = make_tests_data_path('cert.key')
+ ssl_ctx = get_ssl_context(options)
+ self.assertIsNotNone(ssl_ctx)
+
+ def test_get_trusted_downstream(self):
+ tdstream = ''
+ result = set()
+ self.assertEqual(get_trusted_downstream(tdstream), result)
+
+ tdstream = '1.1.1.1, 2.2.2.2'
+ result = set(['1.1.1.1', '2.2.2.2'])
+ self.assertEqual(get_trusted_downstream(tdstream), result)
+
+ tdstream = '1.1.1.1, 2.2.2.2, 2.2.2.2'
+ result = set(['1.1.1.1', '2.2.2.2'])
+ self.assertEqual(get_trusted_downstream(tdstream), result)
+
+ tdstream = '1.1.1.1, 2.2.2.'
+ with self.assertRaises(ValueError):
+ get_trusted_downstream(tdstream)
+
+ def test_get_origin_setting(self):
+ options.debug = False
+ options.origin = '*'
+ with self.assertRaises(ValueError):
+ get_origin_setting(options)
+
+ options.debug = True
+ self.assertEqual(get_origin_setting(options), '*')
+
+ options.origin = random.choice(['Same', 'Primary'])
+ self.assertEqual(get_origin_setting(options), options.origin.lower())
+
+ options.origin = ''
+ with self.assertRaises(ValueError):
+ get_origin_setting(options)
+
+ options.origin = ','
+ with self.assertRaises(ValueError):
+ get_origin_setting(options)
+
+ options.origin = 'www.example.com, https://www.example.org'
+ result = {'http://www.example.com', 'https://www.example.org'}
+ self.assertEqual(get_origin_setting(options), result)
+
+ options.origin = 'www.example.com:80, www.example.org:443'
+ result = {'http://www.example.com', 'https://www.example.org'}
+ self.assertEqual(get_origin_setting(options), result)
+
+ def test_get_font_setting(self):
+ font_dir = os.path.join(base_dir, 'tests', 'data', 'fonts')
+ font = ''
+ self.assertEqual(get_font_filename(font, font_dir), 'fake-font')
+
+ font = 'fake-font'
+ self.assertEqual(get_font_filename(font, font_dir), 'fake-font')
+
+ font = 'wrong-name'
+ with self.assertRaises(ValueError):
+ get_font_filename(font, font_dir)
+
+ def test_check_encoding_setting(self):
+ self.assertIsNone(check_encoding_setting(''))
+ self.assertIsNone(check_encoding_setting('utf-8'))
+ with self.assertRaises(ValueError):
+ check_encoding_setting('unknown-encoding')
diff --git a/tests/test_utils.py b/tests/test_utils.py
new file mode 100644
index 0000000..5ace48e
--- /dev/null
+++ b/tests/test_utils.py
@@ -0,0 +1,127 @@
+import unittest
+
+from webssh.utils import (
+ is_valid_ip_address, is_valid_port, is_valid_hostname, to_str, to_bytes,
+ to_int, is_ip_hostname, is_same_primary_domain, parse_origin_from_url
+)
+
+
+class TestUitls(unittest.TestCase):
+
+ def test_to_str(self):
+ b = b'hello'
+ u = u'hello'
+ self.assertEqual(to_str(b), u)
+ self.assertEqual(to_str(u), u)
+
+ def test_to_bytes(self):
+ b = b'hello'
+ u = u'hello'
+ self.assertEqual(to_bytes(b), b)
+ self.assertEqual(to_bytes(u), b)
+
+ def test_to_int(self):
+ self.assertEqual(to_int(''), None)
+ self.assertEqual(to_int(None), None)
+ self.assertEqual(to_int('22'), 22)
+ self.assertEqual(to_int(' 22 '), 22)
+
+ def test_is_valid_ip_address(self):
+ self.assertFalse(is_valid_ip_address('127.0.0'))
+ self.assertFalse(is_valid_ip_address(b'127.0.0'))
+ self.assertTrue(is_valid_ip_address('127.0.0.1'))
+ self.assertTrue(is_valid_ip_address(b'127.0.0.1'))
+ self.assertFalse(is_valid_ip_address('abc'))
+ self.assertFalse(is_valid_ip_address(b'abc'))
+ self.assertTrue(is_valid_ip_address('::1'))
+ self.assertTrue(is_valid_ip_address(b'::1'))
+ self.assertTrue(is_valid_ip_address('fe80::1111:2222:3333:4444'))
+ self.assertTrue(is_valid_ip_address(b'fe80::1111:2222:3333:4444'))
+ self.assertTrue(is_valid_ip_address('fe80::1111:2222:3333:4444%eth0'))
+ self.assertTrue(is_valid_ip_address(b'fe80::1111:2222:3333:4444%eth0'))
+
+ def test_is_valid_port(self):
+ self.assertTrue(is_valid_port(80))
+ self.assertFalse(is_valid_port(0))
+ self.assertFalse(is_valid_port(65536))
+
+ def test_is_valid_hostname(self):
+ self.assertTrue(is_valid_hostname('google.com'))
+ self.assertTrue(is_valid_hostname('google.com.'))
+ self.assertTrue(is_valid_hostname('www.google.com'))
+ self.assertTrue(is_valid_hostname('www.google.com.'))
+ self.assertFalse(is_valid_hostname('.www.google.com'))
+ self.assertFalse(is_valid_hostname('http://www.google.com'))
+ self.assertFalse(is_valid_hostname('https://www.google.com'))
+ self.assertFalse(is_valid_hostname('127.0.0.1'))
+ self.assertFalse(is_valid_hostname('::1'))
+
+ def test_is_ip_hostname(self):
+ self.assertTrue(is_ip_hostname('[::1]'))
+ self.assertTrue(is_ip_hostname('127.0.0.1'))
+ self.assertFalse(is_ip_hostname('localhost'))
+ self.assertFalse(is_ip_hostname('www.google.com'))
+
+ def test_is_same_primary_domain(self):
+ domain1 = 'localhost'
+ domain2 = 'localhost'
+ self.assertTrue(is_same_primary_domain(domain1, domain2))
+
+ domain1 = 'localhost'
+ domain2 = 'test'
+ self.assertFalse(is_same_primary_domain(domain1, domain2))
+
+ domain1 = 'com'
+ domain2 = 'example.com'
+ self.assertFalse(is_same_primary_domain(domain1, domain2))
+
+ domain1 = 'example.com'
+ domain2 = 'example.com'
+ self.assertTrue(is_same_primary_domain(domain1, domain2))
+
+ domain1 = 'www.example.com'
+ domain2 = 'example.com'
+ self.assertTrue(is_same_primary_domain(domain1, domain2))
+
+ domain1 = 'wwwexample.com'
+ domain2 = 'example.com'
+ self.assertFalse(is_same_primary_domain(domain1, domain2))
+
+ domain1 = 'www.example.com'
+ domain2 = 'www2.example.com'
+ self.assertTrue(is_same_primary_domain(domain1, domain2))
+
+ domain1 = 'xxx.www.example.com'
+ domain2 = 'xxx.www2.example.com'
+ self.assertTrue(is_same_primary_domain(domain1, domain2))
+
+ def test_parse_origin_from_url(self):
+ url = ''
+ self.assertIsNone(parse_origin_from_url(url))
+
+ url = 'www.example.com'
+ self.assertEqual(parse_origin_from_url(url), 'http://www.example.com')
+
+ url = 'http://www.example.com'
+ self.assertEqual(parse_origin_from_url(url), 'http://www.example.com')
+
+ url = 'www.example.com:80'
+ self.assertEqual(parse_origin_from_url(url), 'http://www.example.com')
+
+ url = 'http://www.example.com:80'
+ self.assertEqual(parse_origin_from_url(url), 'http://www.example.com')
+
+ url = 'www.example.com:443'
+ self.assertEqual(parse_origin_from_url(url), 'https://www.example.com')
+
+ url = 'https://www.example.com'
+ self.assertEqual(parse_origin_from_url(url), 'https://www.example.com')
+
+ url = 'https://www.example.com:443'
+ self.assertEqual(parse_origin_from_url(url), 'https://www.example.com')
+
+ url = 'https://www.example.com:80'
+ self.assertEqual(parse_origin_from_url(url), url)
+
+ url = 'http://www.example.com:443'
+ self.assertEqual(parse_origin_from_url(url), url)
diff --git a/tests/utils.py b/tests/utils.py
new file mode 100644
index 0000000..4252d0d
--- /dev/null
+++ b/tests/utils.py
@@ -0,0 +1,52 @@
+import mimetypes
+import os.path
+from uuid import uuid4
+from webssh.settings import base_dir
+
+
+def encode_multipart_formdata(fields, files):
+ """
+ fields is a sequence of (name, value) elements for regular form fields.
+ files is a sequence of (name, filename, value) elements for data to be
+ uploaded as files.
+ Return (content_type, body) ready for httplib.HTTP instance
+ """
+ boundary = uuid4().hex
+ CRLF = '\r\n'
+ L = []
+ for (key, value) in fields:
+ L.append('--' + boundary)
+ L.append('Content-Disposition: form-data; name="%s"' % key)
+ L.append('')
+ L.append(value)
+ for (key, filename, value) in files:
+ L.append('--' + boundary)
+ L.append(
+ 'Content-Disposition: form-data; name="%s"; filename="%s"' % (
+ key, filename
+ )
+ )
+ L.append('Content-Type: %s' % get_content_type(filename))
+ L.append('')
+ L.append(value)
+ L.append('--' + boundary + '--')
+ L.append('')
+ body = CRLF.join(L)
+ content_type = 'multipart/form-data; boundary=%s' % boundary
+ return content_type, body
+
+
+def get_content_type(filename):
+ return mimetypes.guess_type(filename)[0] or 'application/octet-stream'
+
+
+def read_file(path, encoding='utf-8'):
+ with open(path, 'rb') as f:
+ data = f.read()
+ if encoding is None:
+ return data
+ return data.decode(encoding)
+
+
+def make_tests_data_path(filename):
+ return os.path.join(base_dir, 'tests', 'data', filename)
diff --git a/webssh/__init__.py b/webssh/__init__.py
new file mode 100644
index 0000000..f165fb4
--- /dev/null
+++ b/webssh/__init__.py
@@ -0,0 +1,10 @@
+import sys
+from webssh._version import __version__, __version_info__
+
+
+__author__ = 'Shengdun Hua <webmaster0115@gmail.com>'
+
+if sys.platform == 'win32' and sys.version_info.major == 3 and \
+ sys.version_info.minor >= 8:
+ import asyncio
+ asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
diff --git a/webssh/_version.py b/webssh/_version.py
new file mode 100644
index 0000000..fbf7441
--- /dev/null
+++ b/webssh/_version.py
@@ -0,0 +1,2 @@
+__version_info__ = (1, 5, 3)
+__version__ = '.'.join(map(str, __version_info__))
diff --git a/webssh/handler.py b/webssh/handler.py
new file mode 100644
index 0000000..9b65d1d
--- /dev/null
+++ b/webssh/handler.py
@@ -0,0 +1,583 @@
+import io
+import json
+import logging
+import socket
+import struct
+import traceback
+import weakref
+import paramiko
+import tornado.web
+
+from concurrent.futures import ThreadPoolExecutor
+from tornado.ioloop import IOLoop
+from tornado.options import options
+from tornado.process import cpu_count
+from webssh.utils import (
+ is_valid_ip_address, is_valid_port, is_valid_hostname, to_bytes, to_str,
+ to_int, to_ip_address, UnicodeType, is_ip_hostname, is_same_primary_domain,
+ is_valid_encoding
+)
+from webssh.worker import Worker, recycle_worker, clients
+
+try:
+ from json.decoder import JSONDecodeError
+except ImportError:
+ JSONDecodeError = ValueError
+
+try:
+ from urllib.parse import urlparse
+except ImportError:
+ from urlparse import urlparse
+
+
+DEFAULT_PORT = 22
+
+swallow_http_errors = True
+redirecting = None
+
+
+class InvalidValueError(Exception):
+ pass
+
+
+class SSHClient(paramiko.SSHClient):
+
+ def handler(self, title, instructions, prompt_list):
+ answers = []
+ for prompt_, _ in prompt_list:
+ prompt = prompt_.strip().lower()
+ if prompt.startswith('password'):
+ answers.append(self.password)
+ elif prompt.startswith('verification'):
+ answers.append(self.totp)
+ else:
+ raise ValueError('Unknown prompt: {}'.format(prompt_))
+ return answers
+
+ def auth_interactive(self, username, handler):
+ if not self.totp:
+ raise ValueError('Need a verification code for 2fa.')
+ self._transport.auth_interactive(username, handler)
+
+ def _auth(self, username, password, pkey, *args):
+ self.password = password
+ saved_exception = None
+ two_factor = False
+ allowed_types = set()
+ two_factor_types = {'keyboard-interactive', 'password'}
+
+ if pkey is not None:
+ logging.info('Trying publickey authentication')
+ try:
+ allowed_types = set(
+ self._transport.auth_publickey(username, pkey)
+ )
+ two_factor = allowed_types & two_factor_types
+ if not two_factor:
+ return
+ except paramiko.SSHException as e:
+ saved_exception = e
+
+ if two_factor:
+ logging.info('Trying publickey 2fa')
+ return self.auth_interactive(username, self.handler)
+
+ if password is not None:
+ logging.info('Trying password authentication')
+ try:
+ self._transport.auth_password(username, password)
+ return
+ except paramiko.SSHException as e:
+ saved_exception = e
+ allowed_types = set(getattr(e, 'allowed_types', []))
+ two_factor = allowed_types & two_factor_types
+
+ if two_factor:
+ logging.info('Trying password 2fa')
+ return self.auth_interactive(username, self.handler)
+
+ assert saved_exception is not None
+ raise saved_exception
+
+
+class PrivateKey(object):
+
+ max_length = 16384 # rough number
+
+ tag_to_name = {
+ 'RSA': 'RSA',
+ 'DSA': 'DSS',
+ 'EC': 'ECDSA',
+ 'OPENSSH': 'Ed25519'
+ }
+
+ def __init__(self, privatekey, password=None, filename=''):
+ self.privatekey = privatekey
+ self.filename = filename
+ self.password = password
+ self.check_length()
+ self.iostr = io.StringIO(privatekey)
+ self.last_exception = None
+
+ def check_length(self):
+ if len(self.privatekey) > self.max_length:
+ raise InvalidValueError('Invalid key length.')
+
+ def parse_name(self, iostr, tag_to_name):
+ name = None
+ for line_ in iostr:
+ line = line_.strip()
+ if line and line.startswith('-----BEGIN ') and \
+ line.endswith(' PRIVATE KEY-----'):
+ lst = line.split(' ')
+ if len(lst) == 4:
+ tag = lst[1]
+ if tag:
+ name = tag_to_name.get(tag)
+ if name:
+ break
+ return name, len(line_)
+
+ def get_specific_pkey(self, name, offset, password):
+ self.iostr.seek(offset)
+ logging.debug('Reset offset to {}.'.format(offset))
+
+ logging.debug('Try parsing it as {} type key'.format(name))
+ pkeycls = getattr(paramiko, name+'Key')
+ pkey = None
+
+ try:
+ pkey = pkeycls.from_private_key(self.iostr, password=password)
+ except paramiko.PasswordRequiredException:
+ raise InvalidValueError('Need a passphrase to decrypt the key.')
+ except (paramiko.SSHException, ValueError) as exc:
+ self.last_exception = exc
+ logging.debug(str(exc))
+
+ return pkey
+
+ def get_pkey_obj(self):
+ logging.info('Parsing private key {!r}'.format(self.filename))
+ name, length = self.parse_name(self.iostr, self.tag_to_name)
+ if not name:
+ raise InvalidValueError('Invalid key {}.'.format(self.filename))
+
+ offset = self.iostr.tell() - length
+ password = to_bytes(self.password) if self.password else None
+ pkey = self.get_specific_pkey(name, offset, password)
+
+ if pkey is None and name == 'Ed25519':
+ for name in ['RSA', 'ECDSA', 'DSS']:
+ pkey = self.get_specific_pkey(name, offset, password)
+ if pkey:
+ break
+
+ if pkey:
+ return pkey
+
+ logging.error(str(self.last_exception))
+ msg = 'Invalid key'
+ if self.password:
+ msg += ' or wrong passphrase "{}" for decrypting it.'.format(
+ self.password)
+ raise InvalidValueError(msg)
+
+
+class MixinHandler(object):
+
+ custom_headers = {
+ 'Server': 'TornadoServer'
+ }
+
+ html = ('<html><head><title>{code} {reason}</title></head><body>{code} '
+ '{reason}</body></html>')
+
+ def initialize(self, loop=None):
+ self.check_request()
+ self.loop = loop
+ self.origin_policy = self.settings.get('origin_policy')
+
+ def check_request(self):
+ context = self.request.connection.context
+ result = self.is_forbidden(context, self.request.host_name)
+ self._transforms = []
+ if result:
+ self.set_status(403)
+ self.finish(
+ self.html.format(code=self._status_code, reason=self._reason)
+ )
+ elif result is False:
+ to_url = self.get_redirect_url(
+ self.request.host_name, options.sslport, self.request.uri
+ )
+ self.redirect(to_url, permanent=True)
+ else:
+ self.context = context
+
+ def check_origin(self, origin):
+ if self.origin_policy == '*':
+ return True
+
+ parsed_origin = urlparse(origin)
+ netloc = parsed_origin.netloc.lower()
+ logging.debug('netloc: {}'.format(netloc))
+
+ host = self.request.headers.get('Host')
+ logging.debug('host: {}'.format(host))
+
+ if netloc == host:
+ return True
+
+ if self.origin_policy == 'same':
+ return False
+ elif self.origin_policy == 'primary':
+ return is_same_primary_domain(netloc.rsplit(':', 1)[0],
+ host.rsplit(':', 1)[0])
+ else:
+ return origin in self.origin_policy
+
+ def is_forbidden(self, context, hostname):
+ ip = context.address[0]
+ lst = context.trusted_downstream
+ ip_address = None
+
+ if lst and ip not in lst:
+ logging.warning(
+ 'IP {!r} not found in trusted downstream {!r}'.format(ip, lst)
+ )
+ return True
+
+ if context._orig_protocol == 'http':
+ if redirecting and not is_ip_hostname(hostname):
+ ip_address = to_ip_address(ip)
+ if not ip_address.is_private:
+ # redirecting
+ return False
+
+ if options.fbidhttp:
+ if ip_address is None:
+ ip_address = to_ip_address(ip)
+ if not ip_address.is_private:
+ logging.warning('Public plain http request is forbidden.')
+ return True
+
+ def get_redirect_url(self, hostname, port, uri):
+ port = '' if port == 443 else ':%s' % port
+ return 'https://{}{}{}'.format(hostname, port, uri)
+
+ def set_default_headers(self):
+ for header in self.custom_headers.items():
+ self.set_header(*header)
+
+ def get_value(self, name):
+ value = self.get_argument(name)
+ if not value:
+ raise InvalidValueError('Missing value {}'.format(name))
+ return value
+
+ def get_context_addr(self):
+ return self.context.address[:2]
+
+ def get_client_addr(self):
+ if options.xheaders:
+ return self.get_real_client_addr() or self.get_context_addr()
+ else:
+ return self.get_context_addr()
+
+ def get_real_client_addr(self):
+ ip = self.request.remote_ip
+
+ if ip == self.request.headers.get('X-Real-Ip'):
+ port = self.request.headers.get('X-Real-Port')
+ elif ip in self.request.headers.get('X-Forwarded-For', ''):
+ port = self.request.headers.get('X-Forwarded-Port')
+ else:
+ # not running behind an nginx server
+ return
+
+ port = to_int(port)
+ if port is None or not is_valid_port(port):
+ # fake port
+ port = 65535
+
+ return (ip, port)
+
+
+class NotFoundHandler(MixinHandler, tornado.web.ErrorHandler):
+
+ def initialize(self):
+ super(NotFoundHandler, self).initialize()
+
+ def prepare(self):
+ raise tornado.web.HTTPError(404)
+
+
+class IndexHandler(MixinHandler, tornado.web.RequestHandler):
+
+ executor = ThreadPoolExecutor(max_workers=cpu_count()*5)
+
+ def initialize(self, loop, policy, host_keys_settings):
+ super(IndexHandler, self).initialize(loop)
+ self.policy = policy
+ self.host_keys_settings = host_keys_settings
+ self.ssh_client = self.get_ssh_client()
+ self.debug = self.settings.get('debug', False)
+ self.font = self.settings.get('font', '')
+ self.result = dict(id=None, status=None, encoding=None)
+
+ def write_error(self, status_code, **kwargs):
+ if swallow_http_errors and self.request.method == 'POST':
+ exc_info = kwargs.get('exc_info')
+ if exc_info:
+ reason = getattr(exc_info[1], 'log_message', None)
+ if reason:
+ self._reason = reason
+ self.result.update(status=self._reason)
+ self.set_status(200)
+ self.finish(self.result)
+ else:
+ super(IndexHandler, self).write_error(status_code, **kwargs)
+
+ def get_ssh_client(self):
+ ssh = SSHClient()
+ ssh._system_host_keys = self.host_keys_settings['system_host_keys']
+ ssh._host_keys = self.host_keys_settings['host_keys']
+ ssh._host_keys_filename = self.host_keys_settings['host_keys_filename']
+ ssh.set_missing_host_key_policy(self.policy)
+ return ssh
+
+ def get_privatekey(self):
+ name = 'privatekey'
+ lst = self.request.files.get(name)
+ if lst:
+ # multipart form
+ filename = lst[0]['filename']
+ data = lst[0]['body']
+ value = self.decode_argument(data, name=name).strip()
+ else:
+ # urlencoded form
+ value = self.get_argument(name, u'')
+ filename = ''
+
+ return value, filename
+
+ def get_hostname(self):
+ value = self.get_value('hostname')
+ if not (is_valid_hostname(value) or is_valid_ip_address(value)):
+ raise InvalidValueError('Invalid hostname: {}'.format(value))
+ return value
+
+ def get_port(self):
+ value = self.get_argument('port', u'')
+ if not value:
+ return DEFAULT_PORT
+
+ port = to_int(value)
+ if port is None or not is_valid_port(port):
+ raise InvalidValueError('Invalid port: {}'.format(value))
+ return port
+
+ def lookup_hostname(self, hostname, port):
+ key = hostname if port == 22 else '[{}]:{}'.format(hostname, port)
+
+ if self.ssh_client._system_host_keys.lookup(key) is None:
+ if self.ssh_client._host_keys.lookup(key) is None:
+ raise tornado.web.HTTPError(
+ 403, 'Connection to {}:{} is not allowed.'.format(
+ hostname, port)
+ )
+
+ def get_args(self):
+ hostname = self.get_hostname()
+ port = self.get_port()
+ username = self.get_value('username')
+ password = self.get_argument('password', u'')
+ privatekey, filename = self.get_privatekey()
+ passphrase = self.get_argument('passphrase', u'')
+ totp = self.get_argument('totp', u'')
+
+ if isinstance(self.policy, paramiko.RejectPolicy):
+ self.lookup_hostname(hostname, port)
+
+ if privatekey:
+ pkey = PrivateKey(privatekey, passphrase, filename).get_pkey_obj()
+ else:
+ pkey = None
+
+ self.ssh_client.totp = totp
+ args = (hostname, port, username, password, pkey)
+ logging.debug(args)
+
+ return args
+
+ def parse_encoding(self, data):
+ try:
+ encoding = to_str(data.strip(), 'ascii')
+ except UnicodeDecodeError:
+ return
+
+ if is_valid_encoding(encoding):
+ return encoding
+
+ def get_default_encoding(self, ssh):
+ commands = [
+ '$SHELL -ilc "locale charmap"',
+ '$SHELL -ic "locale charmap"'
+ ]
+
+ for command in commands:
+ try:
+ _, stdout, _ = ssh.exec_command(command, get_pty=True)
+ except paramiko.SSHException as exc:
+ logging.info(str(exc))
+ else:
+ data = stdout.read()
+ logging.debug('{!r} => {!r}'.format(command, data))
+ result = self.parse_encoding(data)
+ if result:
+ return result
+
+ logging.warning('Could not detect the default encoding.')
+ return 'utf-8'
+
+ def ssh_connect(self, args):
+ ssh = self.ssh_client
+ dst_addr = args[:2]
+ logging.info('Connecting to {}:{}'.format(*dst_addr))
+
+ try:
+ ssh.connect(*args, timeout=options.timeout)
+ except socket.error:
+ raise ValueError('Unable to connect to {}:{}'.format(*dst_addr))
+ except paramiko.BadAuthenticationType:
+ raise ValueError('Bad authentication type.')
+ except paramiko.AuthenticationException:
+ raise ValueError('Authentication failed.')
+ except paramiko.BadHostKeyException:
+ raise ValueError('Bad host key.')
+
+ term = self.get_argument('term', u'') or u'xterm'
+ chan = ssh.invoke_shell(term=term)
+ chan.setblocking(0)
+ worker = Worker(self.loop, ssh, chan, dst_addr)
+ worker.encoding = options.encoding if options.encoding else \
+ self.get_default_encoding(ssh)
+ return worker
+
+ def check_origin(self):
+ event_origin = self.get_argument('_origin', u'')
+ header_origin = self.request.headers.get('Origin')
+ origin = event_origin or header_origin
+
+ if origin:
+ if not super(IndexHandler, self).check_origin(origin):
+ raise tornado.web.HTTPError(
+ 403, 'Cross origin operation is not allowed.'
+ )
+
+ if not event_origin and self.origin_policy != 'same':
+ self.set_header('Access-Control-Allow-Origin', origin)
+
+ def head(self):
+ pass
+
+ def get(self):
+ self.render('index.html', debug=self.debug, font=self.font)
+
+ @tornado.gen.coroutine
+ def post(self):
+ if self.debug and self.get_argument('error', u''):
+ # for testing purpose only
+ raise ValueError('Uncaught exception')
+
+ ip, port = self.get_client_addr()
+ workers = clients.get(ip, {})
+ if workers and len(workers) >= options.maxconn:
+ raise tornado.web.HTTPError(403, 'Too many live connections.')
+
+ self.check_origin()
+
+ try:
+ args = self.get_args()
+ except InvalidValueError as exc:
+ raise tornado.web.HTTPError(400, str(exc))
+
+ future = self.executor.submit(self.ssh_connect, args)
+
+ try:
+ worker = yield future
+ except (ValueError, paramiko.SSHException) as exc:
+ logging.error(traceback.format_exc())
+ self.result.update(status=str(exc))
+ else:
+ if not workers:
+ clients[ip] = workers
+ worker.src_addr = (ip, port)
+ workers[worker.id] = worker
+ self.loop.call_later(options.delay, recycle_worker, worker)
+ self.result.update(id=worker.id, encoding=worker.encoding)
+
+ self.write(self.result)
+
+
+class WsockHandler(MixinHandler, tornado.websocket.WebSocketHandler):
+
+ def initialize(self, loop):
+ super(WsockHandler, self).initialize(loop)
+ self.worker_ref = None
+
+ def open(self):
+ self.src_addr = self.get_client_addr()
+ logging.info('Connected from {}:{}'.format(*self.src_addr))
+
+ workers = clients.get(self.src_addr[0])
+ if not workers:
+ self.close(reason='Websocket authentication failed.')
+ return
+
+ try:
+ worker_id = self.get_value('id')
+ except (tornado.web.MissingArgumentError, InvalidValueError) as exc:
+ self.close(reason=str(exc))
+ else:
+ worker = workers.get(worker_id)
+ if worker:
+ workers[worker_id] = None
+ self.set_nodelay(True)
+ worker.set_handler(self)
+ self.worker_ref = weakref.ref(worker)
+ self.loop.add_handler(worker.fd, worker, IOLoop.READ)
+ else:
+ self.close(reason='Websocket authentication failed.')
+
+ def on_message(self, message):
+ logging.debug('{!r} from {}:{}'.format(message, *self.src_addr))
+ worker = self.worker_ref()
+ try:
+ msg = json.loads(message)
+ except JSONDecodeError:
+ return
+
+ if not isinstance(msg, dict):
+ return
+
+ resize = msg.get('resize')
+ if resize and len(resize) == 2:
+ try:
+ worker.chan.resize_pty(*resize)
+ except (TypeError, struct.error, paramiko.SSHException):
+ pass
+
+ data = msg.get('data')
+ if data and isinstance(data, UnicodeType):
+ worker.data_to_dst.append(data)
+ worker.on_write()
+
+ def on_close(self):
+ logging.info('Disconnected from {}:{}'.format(*self.src_addr))
+ if not self.close_reason:
+ self.close_reason = 'client disconnected'
+
+ worker = self.worker_ref() if self.worker_ref else None
+ if worker:
+ worker.close(reason=self.close_reason)
diff --git a/webssh/main.py b/webssh/main.py
new file mode 100644
index 0000000..5faad10
--- /dev/null
+++ b/webssh/main.py
@@ -0,0 +1,58 @@
+import logging
+import tornado.web
+import tornado.ioloop
+
+from tornado.options import options
+from webssh import handler
+from webssh.handler import IndexHandler, WsockHandler, NotFoundHandler
+from webssh.settings import (
+ get_app_settings, get_host_keys_settings, get_policy_setting,
+ get_ssl_context, get_server_settings, check_encoding_setting
+)
+
+
+def make_handlers(loop, options):
+ host_keys_settings = get_host_keys_settings(options)
+ policy = get_policy_setting(options, host_keys_settings)
+
+ handlers = [
+ (r'/', IndexHandler, dict(loop=loop, policy=policy,
+ host_keys_settings=host_keys_settings)),
+ (r'/ws', WsockHandler, dict(loop=loop))
+ ]
+ return handlers
+
+
+def make_app(handlers, settings):
+ settings.update(default_handler_class=NotFoundHandler)
+ return tornado.web.Application(handlers, **settings)
+
+
+def app_listen(app, port, address, server_settings):
+ app.listen(port, address, **server_settings)
+ if not server_settings.get('ssl_options'):
+ server_type = 'http'
+ else:
+ server_type = 'https'
+ handler.redirecting = True if options.redirect else False
+ logging.info(
+ 'Listening on {}:{} ({})'.format(address, port, server_type)
+ )
+
+
+def main():
+ options.parse_command_line()
+ check_encoding_setting(options.encoding)
+ loop = tornado.ioloop.IOLoop.current()
+ app = make_app(make_handlers(loop, options), get_app_settings(options))
+ ssl_ctx = get_ssl_context(options)
+ server_settings = get_server_settings(options)
+ app_listen(app, options.port, options.address, server_settings)
+ if ssl_ctx:
+ server_settings.update(ssl_options=ssl_ctx)
+ app_listen(app, options.sslport, options.ssladdress, server_settings)
+ loop.start()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/webssh/policy.py b/webssh/policy.py
new file mode 100644
index 0000000..30d818f
--- /dev/null
+++ b/webssh/policy.py
@@ -0,0 +1,86 @@
+import logging
+import os.path
+import threading
+import paramiko
+
+
+def load_host_keys(path):
+ if os.path.exists(path) and os.path.isfile(path):
+ return paramiko.hostkeys.HostKeys(filename=path)
+ return paramiko.hostkeys.HostKeys()
+
+
+def get_policy_dictionary():
+ dic = {
+ k.lower(): v for k, v in vars(paramiko.client).items() if type(v)
+ is type and issubclass(v, paramiko.client.MissingHostKeyPolicy)
+ and v is not paramiko.client.MissingHostKeyPolicy
+ }
+ return dic
+
+
+def get_policy_class(policy):
+ origin_policy = policy
+ policy = policy.lower()
+ if not policy.endswith('policy'):
+ policy += 'policy'
+
+ dic = get_policy_dictionary()
+ logging.debug(dic)
+
+ try:
+ cls = dic[policy]
+ except KeyError:
+ raise ValueError('Unknown policy {!r}'.format(origin_policy))
+ return cls
+
+
+def check_policy_setting(policy_class, host_keys_settings):
+ host_keys = host_keys_settings['host_keys']
+ host_keys_filename = host_keys_settings['host_keys_filename']
+ system_host_keys = host_keys_settings['system_host_keys']
+
+ if policy_class is paramiko.client.AutoAddPolicy:
+ host_keys.save(host_keys_filename) # for permission test
+ elif policy_class is paramiko.client.RejectPolicy:
+ if not host_keys and not system_host_keys:
+ raise ValueError(
+ 'Reject policy could not be used without host keys.'
+ )
+
+
+class AutoAddPolicy(paramiko.client.MissingHostKeyPolicy):
+ """
+ thread-safe AutoAddPolicy
+ """
+ lock = threading.Lock()
+
+ def is_missing_host_key(self, client, hostname, key):
+ k = client._system_host_keys.lookup(hostname) or \
+ client._host_keys.lookup(hostname)
+ if k is None:
+ return True
+ host_key = k.get(key.get_name(), None)
+ if host_key is None:
+ return True
+ if host_key != key:
+ raise paramiko.BadHostKeyException(hostname, key, host_key)
+
+ def missing_host_key(self, client, hostname, key):
+ with self.lock:
+ if self.is_missing_host_key(client, hostname, key):
+ keytype = key.get_name()
+ logging.info(
+ 'Adding {} host key for {}'.format(keytype, hostname)
+ )
+ client._host_keys._entries.append(
+ paramiko.hostkeys.HostKeyEntry([hostname], key)
+ )
+
+ with open(client._host_keys_filename, 'a') as f:
+ f.write('{} {} {}\n'.format(
+ hostname, keytype, key.get_base64()
+ ))
+
+
+paramiko.client.AutoAddPolicy = AutoAddPolicy
diff --git a/webssh/settings.py b/webssh/settings.py
new file mode 100644
index 0000000..c9dbbbe
--- /dev/null
+++ b/webssh/settings.py
@@ -0,0 +1,198 @@
+import logging
+import os.path
+import ssl
+import sys
+
+from tornado.options import define
+from webssh.policy import (
+ load_host_keys, get_policy_class, check_policy_setting
+)
+from webssh.utils import (
+ to_ip_address, parse_origin_from_url, is_valid_encoding
+)
+from webssh._version import __version__
+
+
+def print_version(flag):
+ if flag:
+ print(__version__)
+ sys.exit(0)
+
+
+define('address', default='', help='Listen address')
+define('port', type=int, default=8888, help='Listen port')
+define('ssladdress', default='', help='SSL listen address')
+define('sslport', type=int, default=4433, help='SSL listen port')
+define('certfile', default='', help='SSL certificate file')
+define('keyfile', default='', help='SSL private key file')
+define('debug', type=bool, default=False, help='Debug mode')
+define('policy', default='warning',
+ help='Missing host key policy, reject|autoadd|warning')
+define('hostfile', default='', help='User defined host keys file')
+define('syshostfile', default='', help='System wide host keys file')
+define('tdstream', default='', help='Trusted downstream, separated by comma')
+define('redirect', type=bool, default=True, help='Redirecting http to https')
+define('fbidhttp', type=bool, default=True,
+ help='Forbid public plain http incoming requests')
+define('xheaders', type=bool, default=True, help='Support xheaders')
+define('xsrf', type=bool, default=True, help='CSRF protection')
+define('origin', default='same', help='''Origin policy,
+'same': same origin policy, matches host name and port number;
+'primary': primary domain policy, matches primary domain only;
+'<domains>': custom domains policy, matches any domain in the <domains> list
+separated by comma;
+'*': wildcard policy, matches any domain, allowed in debug mode only.''')
+define('wpintvl', type=float, default=0, help='Websocket ping interval')
+define('timeout', type=float, default=3, help='SSH connection timeout')
+define('delay', type=float, default=3, help='The delay to call recycle_worker')
+define('maxconn', type=int, default=20,
+ help='Maximum live connections (ssh sessions) per client')
+define('font', default='', help='custom font filename')
+define('encoding', default='',
+ help='''The default character encoding of ssh servers.
+Example: --encoding='utf-8' to solve the problem with some switches&routers''')
+define('version', type=bool, help='Show version information',
+ callback=print_version)
+
+
+base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+font_dirs = ['webssh', 'static', 'css', 'fonts']
+max_body_size = 1 * 1024 * 1024
+
+
+class Font(object):
+
+ def __init__(self, filename, dirs):
+ self.family = self.get_family(filename)
+ self.url = self.get_url(filename, dirs)
+
+ def get_family(self, filename):
+ return filename.split('.')[0]
+
+ def get_url(self, filename, dirs):
+ return os.path.join(*(dirs + [filename]))
+
+
+def get_app_settings(options):
+ settings = dict(
+ template_path=os.path.join(base_dir, 'webssh', 'templates'),
+ static_path=os.path.join(base_dir, 'webssh', 'static'),
+ websocket_ping_interval=options.wpintvl,
+ debug=options.debug,
+ xsrf_cookies=options.xsrf,
+ font=Font(
+ get_font_filename(options.font,
+ os.path.join(base_dir, *font_dirs)),
+ font_dirs[1:]
+ ),
+ origin_policy=get_origin_setting(options)
+ )
+ return settings
+
+
+def get_server_settings(options):
+ settings = dict(
+ xheaders=options.xheaders,
+ max_body_size=max_body_size,
+ trusted_downstream=get_trusted_downstream(options.tdstream)
+ )
+ return settings
+
+
+def get_host_keys_settings(options):
+ if not options.hostfile:
+ host_keys_filename = os.path.join(base_dir, 'known_hosts')
+ else:
+ host_keys_filename = options.hostfile
+ host_keys = load_host_keys(host_keys_filename)
+
+ if not options.syshostfile:
+ filename = os.path.expanduser('~/.ssh/known_hosts')
+ else:
+ filename = options.syshostfile
+ system_host_keys = load_host_keys(filename)
+
+ settings = dict(
+ host_keys=host_keys,
+ system_host_keys=system_host_keys,
+ host_keys_filename=host_keys_filename
+ )
+ return settings
+
+
+def get_policy_setting(options, host_keys_settings):
+ policy_class = get_policy_class(options.policy)
+ logging.info(policy_class.__name__)
+ check_policy_setting(policy_class, host_keys_settings)
+ return policy_class()
+
+
+def get_ssl_context(options):
+ if not options.certfile and not options.keyfile:
+ return None
+ elif not options.certfile:
+ raise ValueError('certfile is not provided')
+ elif not options.keyfile:
+ raise ValueError('keyfile is not provided')
+ elif not os.path.isfile(options.certfile):
+ raise ValueError('File {!r} does not exist'.format(options.certfile))
+ elif not os.path.isfile(options.keyfile):
+ raise ValueError('File {!r} does not exist'.format(options.keyfile))
+ else:
+ ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
+ ssl_ctx.load_cert_chain(options.certfile, options.keyfile)
+ return ssl_ctx
+
+
+def get_trusted_downstream(tdstream):
+ result = set()
+ for ip in tdstream.split(','):
+ ip = ip.strip()
+ if ip:
+ to_ip_address(ip)
+ result.add(ip)
+ return result
+
+
+def get_origin_setting(options):
+ if options.origin == '*':
+ if not options.debug:
+ raise ValueError(
+ 'Wildcard origin policy is only allowed in debug mode.'
+ )
+ else:
+ return '*'
+
+ origin = options.origin.lower()
+ if origin in ['same', 'primary']:
+ return origin
+
+ origins = set()
+ for url in origin.split(','):
+ orig = parse_origin_from_url(url)
+ if orig:
+ origins.add(orig)
+
+ if not origins:
+ raise ValueError('Empty origin list')
+
+ return origins
+
+
+def get_font_filename(font, font_dir):
+ filenames = {f for f in os.listdir(font_dir) if not f.startswith('.')
+ and os.path.isfile(os.path.join(font_dir, f))}
+ if font:
+ if font not in filenames:
+ raise ValueError(
+ 'Font file {!r} not found'.format(os.path.join(font_dir, font))
+ )
+ elif filenames:
+ font = filenames.pop()
+
+ return font
+
+
+def check_encoding_setting(encoding):
+ if encoding and not is_valid_encoding(encoding):
+ raise ValueError('Unknown character encoding {!r}.'.format(encoding))
diff --git a/webssh/static/css/fonts/.gitignore b/webssh/static/css/fonts/.gitignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/webssh/static/css/fonts/.gitignore
diff --git a/webssh/static/img/favicon.png b/webssh/static/img/favicon.png
new file mode 100644
index 0000000..0b281be
--- /dev/null
+++ b/webssh/static/img/favicon.png
Binary files differ
diff --git a/webssh/static/js/main.js b/webssh/static/js/main.js
new file mode 100644
index 0000000..480447e
--- /dev/null
+++ b/webssh/static/js/main.js
@@ -0,0 +1,858 @@
+/*jslint browser:true */
+
+var jQuery;
+var wssh = {};
+
+
+(function() {
+ // For FormData without getter and setter
+ var proto = FormData.prototype,
+ data = {};
+
+ if (!proto.get) {
+ proto.get = function (name) {
+ if (data[name] === undefined) {
+ var input = document.querySelector('input[name="' + name + '"]'),
+ value;
+ if (input) {
+ if (input.type === 'file') {
+ value = input.files[0];
+ } else {
+ value = input.value;
+ }
+ data[name] = value;
+ }
+ }
+ return data[name];
+ };
+ }
+
+ if (!proto.set) {
+ proto.set = function (name, value) {
+ data[name] = value;
+ };
+ }
+}());
+
+
+jQuery(function($){
+ var status = $('#status'),
+ button = $('.btn-primary'),
+ form_container = $('.form-container'),
+ waiter = $('#waiter'),
+ term_type = $('#term'),
+ style = {},
+ default_title = 'WebSSH',
+ title_element = document.querySelector('title'),
+ form_id = '#connect',
+ debug = document.querySelector(form_id).noValidate,
+ custom_font = document.fonts ? document.fonts.values().next().value : undefined,
+ default_fonts,
+ DISCONNECTED = 0,
+ CONNECTING = 1,
+ CONNECTED = 2,
+ state = DISCONNECTED,
+ messages = {1: 'This client is connecting ...', 2: 'This client is already connnected.'},
+ key_max_size = 16384,
+ fields = ['hostname', 'port', 'username'],
+ form_keys = fields.concat(['password', 'totp']),
+ opts_keys = ['bgcolor', 'title', 'encoding', 'command', 'term', 'fontsize', 'fontcolor'],
+ url_form_data = {},
+ url_opts_data = {},
+ validated_form_data,
+ event_origin,
+ hostname_tester = /((^\s*((([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))\s*$)|(^\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?\s*$))|(^\s*((?=.{1,255}$)(?=.*[A-Za-z].*)[0-9A-Za-z](?:(?:[0-9A-Za-z]|\b-){0,61}[0-9A-Za-z])?(?:\.[0-9A-Za-z](?:(?:[0-9A-Za-z]|\b-){0,61}[0-9A-Za-z])?)*)\s*$)/;
+
+
+ function store_items(names, data) {
+ var i, name, value;
+
+ for (i = 0; i < names.length; i++) {
+ name = names[i];
+ value = data.get(name);
+ if (value){
+ window.localStorage.setItem(name, value);
+ }
+ }
+ }
+
+
+ function restore_items(names) {
+ var i, name, value;
+
+ for (i=0; i < names.length; i++) {
+ name = names[i];
+ value = window.localStorage.getItem(name);
+ if (value) {
+ $('#'+name).val(value);
+ }
+ }
+ }
+
+
+ function populate_form(data) {
+ var names = form_keys.concat(['passphrase']),
+ i, name;
+
+ for (i=0; i < names.length; i++) {
+ name = names[i];
+ $('#'+name).val(data.get(name));
+ }
+ }
+
+
+ function get_object_length(object) {
+ return Object.keys(object).length;
+ }
+
+
+ function decode_uri(uri) {
+ try {
+ return decodeURI(uri);
+ } catch(e) {
+ console.error(e);
+ }
+ return '';
+ }
+
+
+ function decode_password(encoded) {
+ try {
+ return window.atob(encoded);
+ } catch (e) {
+ console.error(e);
+ }
+ return null;
+ }
+
+
+ function parse_url_data(string, form_keys, opts_keys, form_map, opts_map) {
+ var i, pair, key, val,
+ arr = string.split('&');
+
+ for (i = 0; i < arr.length; i++) {
+ pair = arr[i].split('=');
+ key = pair[0].trim().toLowerCase();
+ val = pair.slice(1).join('=').trim();
+
+ if (form_keys.indexOf(key) >= 0) {
+ form_map[key] = val;
+ } else if (opts_keys.indexOf(key) >=0) {
+ opts_map[key] = val;
+ }
+ }
+
+ if (form_map.password) {
+ form_map.password = decode_password(form_map.password);
+ }
+ }
+
+
+ function parse_xterm_style() {
+ var text = $('.xterm-helpers style').text();
+ var arr = text.split('xterm-normal-char{width:');
+ style.width = parseFloat(arr[1]);
+ arr = text.split('div{height:');
+ style.height = parseFloat(arr[1]);
+ }
+
+
+ function get_cell_size(term) {
+ style.width = term._core._renderService._renderer.dimensions.actualCellWidth;
+ style.height = term._core._renderService._renderer.dimensions.actualCellHeight;
+ }
+
+
+ function toggle_fullscreen(term) {
+ $('#terminal .terminal').toggleClass('fullscreen');
+ term.fitAddon.fit();
+ }
+
+
+ function current_geometry(term) {
+ if (!style.width || !style.height) {
+ try {
+ get_cell_size(term);
+ } catch (TypeError) {
+ parse_xterm_style();
+ }
+ }
+
+ var cols = parseInt(window.innerWidth / style.width, 10) - 1;
+ var rows = parseInt(window.innerHeight / style.height, 10);
+ return {'cols': cols, 'rows': rows};
+ }
+
+
+ function resize_terminal(term) {
+ var geometry = current_geometry(term);
+ term.on_resize(geometry.cols, geometry.rows);
+ }
+
+
+ function set_backgound_color(term, color) {
+ term.setOption('theme', {
+ background: color
+ });
+ }
+
+ function set_font_color(term, color) {
+ term.setOption('theme', {
+ foreground: color
+ });
+ }
+
+ function custom_font_is_loaded() {
+ if (!custom_font) {
+ console.log('No custom font specified.');
+ } else {
+ console.log('Status of custom font ' + custom_font.family + ': ' + custom_font.status);
+ if (custom_font.status === 'loaded') {
+ return true;
+ }
+ if (custom_font.status === 'unloaded') {
+ return false;
+ }
+ }
+ }
+
+ function update_font_family(term) {
+ if (term.font_family_updated) {
+ console.log('Already using custom font family');
+ return;
+ }
+
+ if (!default_fonts) {
+ default_fonts = term.getOption('fontFamily');
+ }
+
+ if (custom_font_is_loaded()) {
+ var new_fonts = custom_font.family + ', ' + default_fonts;
+ term.setOption('fontFamily', new_fonts);
+ term.font_family_updated = true;
+ console.log('Using custom font family ' + new_fonts);
+ }
+ }
+
+
+ function reset_font_family(term) {
+ if (!term.font_family_updated) {
+ console.log('Already using default font family');
+ return;
+ }
+
+ if (default_fonts) {
+ term.setOption('fontFamily', default_fonts);
+ term.font_family_updated = false;
+ console.log('Using default font family ' + default_fonts);
+ }
+ }
+
+
+ function format_geometry(cols, rows) {
+ return JSON.stringify({'cols': cols, 'rows': rows});
+ }
+
+
+ function read_as_text_with_decoder(file, callback, decoder) {
+ var reader = new window.FileReader();
+
+ if (decoder === undefined) {
+ decoder = new window.TextDecoder('utf-8', {'fatal': true});
+ }
+
+ reader.onload = function() {
+ var text;
+ try {
+ text = decoder.decode(reader.result);
+ } catch (TypeError) {
+ console.log('Decoding error happened.');
+ } finally {
+ if (callback) {
+ callback(text);
+ }
+ }
+ };
+
+ reader.onerror = function (e) {
+ console.error(e);
+ };
+
+ reader.readAsArrayBuffer(file);
+ }
+
+
+ function read_as_text_with_encoding(file, callback, encoding) {
+ var reader = new window.FileReader();
+
+ if (encoding === undefined) {
+ encoding = 'utf-8';
+ }
+
+ reader.onload = function() {
+ if (callback) {
+ callback(reader.result);
+ }
+ };
+
+ reader.onerror = function (e) {
+ console.error(e);
+ };
+
+ reader.readAsText(file, encoding);
+ }
+
+
+ function read_file_as_text(file, callback, decoder) {
+ if (!window.TextDecoder) {
+ read_as_text_with_encoding(file, callback, decoder);
+ } else {
+ read_as_text_with_decoder(file, callback, decoder);
+ }
+ }
+
+
+ function reset_wssh() {
+ var name;
+
+ for (name in wssh) {
+ if (wssh.hasOwnProperty(name) && name !== 'connect') {
+ delete wssh[name];
+ }
+ }
+ }
+
+
+ function log_status(text, to_populate) {
+ console.log(text);
+ status.html(text.split('\n').join('<br/>'));
+
+ if (to_populate && validated_form_data) {
+ populate_form(validated_form_data);
+ validated_form_data = undefined;
+ }
+
+ if (waiter.css('display') !== 'none') {
+ waiter.hide();
+ }
+
+ if (form_container.css('display') === 'none') {
+ form_container.show();
+ }
+ }
+
+
+ function ajax_complete_callback(resp) {
+ button.prop('disabled', false);
+
+ if (resp.status !== 200) {
+ log_status(resp.status + ': ' + resp.statusText, true);
+ state = DISCONNECTED;
+ return;
+ }
+
+ var msg = resp.responseJSON;
+ if (!msg.id) {
+ log_status(msg.status, true);
+ state = DISCONNECTED;
+ return;
+ }
+
+ var ws_url = window.location.href.split(/\?|#/, 1)[0].replace('http', 'ws'),
+ join = (ws_url[ws_url.length-1] === '/' ? '' : '/'),
+ url = ws_url + join + 'ws?id=' + msg.id,
+ sock = new window.WebSocket(url),
+ encoding = 'utf-8',
+ decoder = window.TextDecoder ? new window.TextDecoder(encoding) : encoding,
+ terminal = document.getElementById('terminal'),
+ termOptions = {
+ cursorBlink: true,
+ theme: {
+ background: url_opts_data.bgcolor || 'black',
+ foreground: url_opts_data.fontcolor || 'white'
+ }
+ };
+
+ if (url_opts_data.fontsize) {
+ var fontsize = window.parseInt(url_opts_data.fontsize);
+ if (fontsize && fontsize > 0) {
+ termOptions.fontSize = fontsize;
+ }
+ }
+
+ var term = new window.Terminal(termOptions);
+
+ term.fitAddon = new window.FitAddon.FitAddon();
+ term.loadAddon(term.fitAddon);
+
+ console.log(url);
+ if (!msg.encoding) {
+ console.log('Unable to detect the default encoding of your server');
+ msg.encoding = encoding;
+ } else {
+ console.log('The deault encoding of your server is ' + msg.encoding);
+ }
+
+ function term_write(text) {
+ if (term) {
+ term.write(text);
+ if (!term.resized) {
+ resize_terminal(term);
+ term.resized = true;
+ }
+ }
+ }
+
+ function set_encoding(new_encoding) {
+ // for console use
+ if (!new_encoding) {
+ console.log('An encoding is required');
+ return;
+ }
+
+ if (!window.TextDecoder) {
+ decoder = new_encoding;
+ encoding = decoder;
+ console.log('Set encoding to ' + encoding);
+ } else {
+ try {
+ decoder = new window.TextDecoder(new_encoding);
+ encoding = decoder.encoding;
+ console.log('Set encoding to ' + encoding);
+ } catch (RangeError) {
+ console.log('Unknown encoding ' + new_encoding);
+ return false;
+ }
+ }
+ }
+
+ wssh.set_encoding = set_encoding;
+
+ if (url_opts_data.encoding) {
+ if (set_encoding(url_opts_data.encoding) === false) {
+ set_encoding(msg.encoding);
+ }
+ } else {
+ set_encoding(msg.encoding);
+ }
+
+
+ wssh.geometry = function() {
+ // for console use
+ var geometry = current_geometry(term);
+ console.log('Current window geometry: ' + JSON.stringify(geometry));
+ };
+
+ wssh.send = function(data) {
+ // for console use
+ if (!sock) {
+ console.log('Websocket was already closed');
+ return;
+ }
+
+ if (typeof data !== 'string') {
+ console.log('Only string is allowed');
+ return;
+ }
+
+ try {
+ JSON.parse(data);
+ sock.send(data);
+ } catch (SyntaxError) {
+ data = data.trim() + '\r';
+ sock.send(JSON.stringify({'data': data}));
+ }
+ };
+
+ wssh.reset_encoding = function() {
+ // for console use
+ if (encoding === msg.encoding) {
+ console.log('Already reset to ' + msg.encoding);
+ } else {
+ set_encoding(msg.encoding);
+ }
+ };
+
+ wssh.resize = function(cols, rows) {
+ // for console use
+ if (term === undefined) {
+ console.log('Terminal was already destroryed');
+ return;
+ }
+
+ var valid_args = false;
+
+ if (cols > 0 && rows > 0) {
+ var geometry = current_geometry(term);
+ if (cols <= geometry.cols && rows <= geometry.rows) {
+ valid_args = true;
+ }
+ }
+
+ if (!valid_args) {
+ console.log('Unable to resize terminal to geometry: ' + format_geometry(cols, rows));
+ } else {
+ term.on_resize(cols, rows);
+ }
+ };
+
+ wssh.set_bgcolor = function(color) {
+ set_backgound_color(term, color);
+ };
+
+ wssh.set_fontcolor = function(color) {
+ set_font_color(term, color);
+ };
+
+ wssh.custom_font = function() {
+ update_font_family(term);
+ };
+
+ wssh.default_font = function() {
+ reset_font_family(term);
+ };
+
+ term.on_resize = function(cols, rows) {
+ if (cols !== this.cols || rows !== this.rows) {
+ console.log('Resizing terminal to geometry: ' + format_geometry(cols, rows));
+ this.resize(cols, rows);
+ sock.send(JSON.stringify({'resize': [cols, rows]}));
+ }
+ };
+
+ term.onData(function(data) {
+ // console.log(data);
+ sock.send(JSON.stringify({'data': data}));
+ });
+
+ sock.onopen = function() {
+ term.open(terminal);
+ toggle_fullscreen(term);
+ update_font_family(term);
+ term.focus();
+ state = CONNECTED;
+ title_element.text = url_opts_data.title || default_title;
+ if (url_opts_data.command) {
+ setTimeout(function () {
+ sock.send(JSON.stringify({'data': url_opts_data.command+'\r'}));
+ }, 500);
+ }
+ };
+
+ sock.onmessage = function(msg) {
+ read_file_as_text(msg.data, term_write, decoder);
+ };
+
+ sock.onerror = function(e) {
+ console.error(e);
+ };
+
+ sock.onclose = function(e) {
+ term.dispose();
+ term = undefined;
+ sock = undefined;
+ reset_wssh();
+ log_status(e.reason, true);
+ state = DISCONNECTED;
+ default_title = 'WebSSH';
+ title_element.text = default_title;
+ };
+
+ $(window).resize(function(){
+ if (term) {
+ resize_terminal(term);
+ }
+ });
+ }
+
+
+ function wrap_object(opts) {
+ var obj = {};
+
+ obj.get = function(attr) {
+ return opts[attr] || '';
+ };
+
+ obj.set = function(attr, val) {
+ opts[attr] = val;
+ };
+
+ return obj;
+ }
+
+
+ function clean_data(data) {
+ var i, attr, val;
+ var attrs = form_keys.concat(['privatekey', 'passphrase']);
+
+ for (i = 0; i < attrs.length; i++) {
+ attr = attrs[i];
+ val = data.get(attr);
+ if (typeof val === 'string') {
+ data.set(attr, val.trim());
+ }
+ }
+ }
+
+
+ function validate_form_data(data) {
+ clean_data(data);
+
+ var hostname = data.get('hostname'),
+ port = data.get('port'),
+ username = data.get('username'),
+ pk = data.get('privatekey'),
+ result = {
+ valid: false,
+ data: data,
+ title: ''
+ },
+ errors = [], size;
+
+ if (!hostname) {
+ errors.push('Value of hostname is required.');
+ } else {
+ if (!hostname_tester.test(hostname)) {
+ errors.push('Invalid hostname: ' + hostname);
+ }
+ }
+
+ if (!port) {
+ port = 22;
+ } else {
+ if (!(port > 0 && port < 65535)) {
+ errors.push('Invalid port: ' + port);
+ }
+ }
+
+ if (!username) {
+ errors.push('Value of username is required.');
+ }
+
+ if (pk) {
+ size = pk.size || pk.length;
+ if (size > key_max_size) {
+ errors.push('Invalid private key: ' + pk.name || '');
+ }
+ }
+
+ if (!errors.length || debug) {
+ result.valid = true;
+ result.title = username + '@' + hostname + ':' + port;
+ }
+ result.errors = errors;
+
+ return result;
+ }
+
+ // Fix empty input file ajax submission error for safari 11.x
+ function disable_file_inputs(inputs) {
+ var i, input;
+
+ for (i = 0; i < inputs.length; i++) {
+ input = inputs[i];
+ if (input.files.length === 0) {
+ input.setAttribute('disabled', '');
+ }
+ }
+ }
+
+
+ function enable_file_inputs(inputs) {
+ var i;
+
+ for (i = 0; i < inputs.length; i++) {
+ inputs[i].removeAttribute('disabled');
+ }
+ }
+
+
+ function connect_without_options() {
+ // use data from the form
+ var form = document.querySelector(form_id),
+ inputs = form.querySelectorAll('input[type="file"]'),
+ url = form.action,
+ data, pk;
+
+ disable_file_inputs(inputs);
+ data = new FormData(form);
+ pk = data.get('privatekey');
+ enable_file_inputs(inputs);
+
+ function ajax_post() {
+ status.text('');
+ button.prop('disabled', true);
+
+ $.ajax({
+ url: url,
+ type: 'post',
+ data: data,
+ complete: ajax_complete_callback,
+ cache: false,
+ contentType: false,
+ processData: false
+ });
+ }
+
+ var result = validate_form_data(data);
+ if (!result.valid) {
+ log_status(result.errors.join('\n'));
+ return;
+ }
+
+ if (pk && pk.size && !debug) {
+ read_file_as_text(pk, function(text) {
+ if (text === undefined) {
+ log_status('Invalid private key: ' + pk.name);
+ } else {
+ ajax_post();
+ }
+ });
+ } else {
+ ajax_post();
+ }
+
+ return result;
+ }
+
+
+ function connect_with_options(data) {
+ // use data from the arguments
+ var form = document.querySelector(form_id),
+ url = data.url || form.action,
+ _xsrf = form.querySelector('input[name="_xsrf"]');
+
+ var result = validate_form_data(wrap_object(data));
+ if (!result.valid) {
+ log_status(result.errors.join('\n'));
+ return;
+ }
+
+ data.term = term_type.val();
+ data._xsrf = _xsrf.value;
+ if (event_origin) {
+ data._origin = event_origin;
+ }
+
+ status.text('');
+ button.prop('disabled', true);
+
+ $.ajax({
+ url: url,
+ type: 'post',
+ data: data,
+ complete: ajax_complete_callback
+ });
+
+ return result;
+ }
+
+
+ function connect(hostname, port, username, password, privatekey, passphrase, totp) {
+ // for console use
+ var result, opts;
+
+ if (state !== DISCONNECTED) {
+ console.log(messages[state]);
+ return;
+ }
+
+ if (hostname === undefined) {
+ result = connect_without_options();
+ } else {
+ if (typeof hostname === 'string') {
+ opts = {
+ hostname: hostname,
+ port: port,
+ username: username,
+ password: password,
+ privatekey: privatekey,
+ passphrase: passphrase,
+ totp: totp
+ };
+ } else {
+ opts = hostname;
+ }
+
+ result = connect_with_options(opts);
+ }
+
+ if (result) {
+ state = CONNECTING;
+ default_title = result.title;
+ if (hostname) {
+ validated_form_data = result.data;
+ }
+ store_items(fields, result.data);
+ }
+ }
+
+ wssh.connect = connect;
+
+ $(form_id).submit(function(event){
+ event.preventDefault();
+ connect();
+ });
+
+
+ function cross_origin_connect(event)
+ {
+ console.log(event.origin);
+ var prop = 'connect',
+ args;
+
+ try {
+ args = JSON.parse(event.data);
+ } catch (SyntaxError) {
+ args = event.data.split('|');
+ }
+
+ if (!Array.isArray(args)) {
+ args = [args];
+ }
+
+ try {
+ event_origin = event.origin;
+ wssh[prop].apply(wssh, args);
+ } finally {
+ event_origin = undefined;
+ }
+ }
+
+ window.addEventListener('message', cross_origin_connect, false);
+
+ if (document.fonts) {
+ document.fonts.ready.then(
+ function () {
+ if (custom_font_is_loaded() === false) {
+ document.body.style.fontFamily = custom_font.family;
+ }
+ }
+ );
+ }
+
+
+ parse_url_data(
+ decode_uri(window.location.search.substring(1)) + '&' + decode_uri(window.location.hash.substring(1)),
+ form_keys, opts_keys, url_form_data, url_opts_data
+ );
+ // console.log(url_form_data);
+ // console.log(url_opts_data);
+
+ if (url_opts_data.term) {
+ term_type.val(url_opts_data.term);
+ }
+
+ if (url_form_data.password === null) {
+ log_status('Password via url must be encoded in base64.');
+ } else {
+ if (get_object_length(url_form_data)) {
+ waiter.show();
+ connect(url_form_data);
+ } else {
+ restore_items(fields);
+ form_container.show();
+ }
+ }
+
+});
diff --git a/webssh/templates/index.html b/webssh/templates/index.html
new file mode 100644
index 0000000..3268c38
--- /dev/null
+++ b/webssh/templates/index.html
@@ -0,0 +1,101 @@
+<!DOCTYPE html>
+<html lang="en">
+ <head>
+ <meta charset="UTF-8">
+ <title> WebSSH </title>
+ <link href="static/img/favicon.png" rel="icon" type="image/png">
+ <link href="static/css/bootstrap.min.css" rel="stylesheet" type="text/css"/>
+ <link href="static/css/xterm.min.css" rel="stylesheet" type="text/css"/>
+ <link href="static/css/fullscreen.min.css" rel="stylesheet" type="text/css"/>
+ <style>
+ .row {
+ margin-top: 15px;
+ margin-bottom: 10px;
+ }
+
+ .container {
+ margin-top: 20px;
+ }
+
+ .btn {
+ margin-top: 15px;
+ }
+
+ .btn-danger {
+ margin-left: 5px;
+ }
+ {% if font.family %}
+ @font-face {
+ font-family: '{{ font.family }}';
+ src: url('{{ font.url }}');
+ }
+
+ body {
+ font-family: '{{ font.family }}';
+ }
+ {% end %}
+ </style>
+ </head>
+ <body>
+ <div id="waiter" style="display: none"> Connecting ... </div>
+
+ <div class="container form-container" style="display: none">
+ <form id="connect" action="" method="post" enctype="multipart/form-data"{% if debug %} novalidate{% end %}>
+ <div class="row">
+ <div class="col">
+ <label for="Hostname">Hostname</label>
+ <input class="form-control" type="text" id="hostname" name="hostname" value="" required>
+ </div>
+ <div class="col">
+ <label for="Port">Port</label>
+ <input class="form-control" type="number" id="port" name="port" placeholder="22" value="" min=1 max=65535>
+ </div>
+ </div>
+ <div class="row">
+ <div class="col">
+ <label for="Username">Username</label>
+ <input class="form-control" type="text" id="username" name="username" value="" required>
+ </div>
+ <div class="col">
+ <label for="Password">Password</label>
+ <input class="form-control" type="password" id="password" name="password" value="">
+ </div>
+ </div>
+ <div class="row">
+ <div class="col">
+ <label for="Username">Private Key</label>
+ <input class="form-control" type="file" id="privatekey" name="privatekey" value="">
+ </div>
+ <div class="col">
+ <label for="Passphrase">Passphrase</label>
+ <input class="form-control" type="password" id="passphrase" name="passphrase" value="">
+ </div>
+ </div>
+ <div class="row">
+ <div class="col">
+ <label for="totp">Totp (time-based one-time password)</label>
+ <input class="form-control" type="password" id="totp" name="totp" value="">
+ </div>
+ <div class="col">
+ </div>
+ </div>
+ <input type="hidden" id="term" name="term" value="xterm-256color">
+ {% module xsrf_form_html() %}
+ <button type="submit" class="btn btn-primary">Connect</button>
+ <button type="reset" class="btn btn-danger">Reset</button>
+ </form>
+ </div>
+
+ <div class="container">
+ <div id="status" style="color: red;"></div>
+ <div id="terminal"></div>
+ </div>
+
+ <script src="static/js/jquery.min.js"></script>
+ <script src="static/js/popper.min.js"></script>
+ <script src="static/js/bootstrap.min.js"></script>
+ <script src="static/js/xterm.min.js"></script>
+ <script src="static/js/xterm-addon-fit.min.js"></script>
+ <script src="static/js/main.js"></script>
+ </body>
+</html>
diff --git a/webssh/utils.py b/webssh/utils.py
new file mode 100644
index 0000000..845ca56
--- /dev/null
+++ b/webssh/utils.py
@@ -0,0 +1,145 @@
+import ipaddress
+import re
+
+try:
+ from types import UnicodeType
+except ImportError:
+ UnicodeType = str
+
+try:
+ from urllib.parse import urlparse
+except ImportError:
+ from urlparse import urlparse
+
+
+numeric = re.compile(r'[0-9]+$')
+allowed = re.compile(r'(?!-)[a-z0-9-]{1,63}(?<!-)$', re.IGNORECASE)
+
+
+def to_str(bstr, encoding='utf-8'):
+ if isinstance(bstr, bytes):
+ return bstr.decode(encoding)
+ return bstr
+
+
+def to_bytes(ustr, encoding='utf-8'):
+ if isinstance(ustr, UnicodeType):
+ return ustr.encode(encoding)
+ return ustr
+
+
+def to_int(string):
+ try:
+ return int(string)
+ except (TypeError, ValueError):
+ pass
+
+
+def to_ip_address(ipstr):
+ ip = to_str(ipstr)
+ if ip.startswith('fe80::'):
+ ip = ip.split('%')[0]
+ return ipaddress.ip_address(ip)
+
+
+def is_valid_ip_address(ipstr):
+ try:
+ to_ip_address(ipstr)
+ except ValueError:
+ return False
+ return True
+
+
+def is_valid_port(port):
+ return 0 < port < 65536
+
+
+def is_valid_encoding(encoding):
+ try:
+ u'test'.encode(encoding)
+ except LookupError:
+ return False
+ return True
+
+
+def is_ip_hostname(hostname):
+ it = iter(hostname)
+ if next(it) == '[':
+ return True
+ for ch in it:
+ if ch != '.' and not ch.isdigit():
+ return False
+ return True
+
+
+def is_valid_hostname(hostname):
+ if hostname[-1] == '.':
+ # strip exactly one dot from the right, if present
+ hostname = hostname[:-1]
+ if len(hostname) > 253:
+ return False
+
+ labels = hostname.split('.')
+
+ # the TLD must be not all-numeric
+ if numeric.match(labels[-1]):
+ return False
+
+ return all(allowed.match(label) for label in labels)
+
+
+def is_same_primary_domain(domain1, domain2):
+ i = -1
+ dots = 0
+ l1 = len(domain1)
+ l2 = len(domain2)
+ m = min(l1, l2)
+
+ while i >= -m:
+ c1 = domain1[i]
+ c2 = domain2[i]
+
+ if c1 == c2:
+ if c1 == '.':
+ dots += 1
+ if dots == 2:
+ return True
+ else:
+ return False
+
+ i -= 1
+
+ if l1 == l2:
+ return True
+
+ if dots == 0:
+ return False
+
+ c = domain1[i] if l1 > m else domain2[i]
+ return c == '.'
+
+
+def parse_origin_from_url(url):
+ url = url.strip()
+ if not url:
+ return
+
+ if not (url.startswith('http://') or url.startswith('https://') or
+ url.startswith('//')):
+ url = '//' + url
+
+ parsed = urlparse(url)
+ port = parsed.port
+ scheme = parsed.scheme
+
+ if scheme == '':
+ scheme = 'https' if port == 443 else 'http'
+
+ if port == 443 and scheme == 'https':
+ netloc = parsed.netloc.replace(':443', '')
+ elif port == 80 and scheme == 'http':
+ netloc = parsed.netloc.replace(':80', '')
+ else:
+ netloc = parsed.netloc
+
+ return '{}://{}'.format(scheme, netloc)
diff --git a/webssh/worker.py b/webssh/worker.py
new file mode 100644
index 0000000..098538e
--- /dev/null
+++ b/webssh/worker.py
@@ -0,0 +1,125 @@
+import logging
+import tornado.websocket
+
+from tornado.ioloop import IOLoop
+from tornado.iostream import _ERRNO_CONNRESET
+from tornado.util import errno_from_exception
+
+
+BUF_SIZE = 32 * 1024
+clients = {} # {ip: {id: worker}}
+
+
+def clear_worker(worker, clients):
+ ip = worker.src_addr[0]
+ workers = clients.get(ip)
+ assert worker.id in workers
+ workers.pop(worker.id)
+
+ if not workers:
+ clients.pop(ip)
+ if not clients:
+ clients.clear()
+
+
+def recycle_worker(worker):
+ if worker.handler:
+ return
+ logging.warning('Recycling worker {}'.format(worker.id))
+ worker.close(reason='worker recycled')
+
+
+class Worker(object):
+ def __init__(self, loop, ssh, chan, dst_addr):
+ self.loop = loop
+ self.ssh = ssh
+ self.chan = chan
+ self.dst_addr = dst_addr
+ self.fd = chan.fileno()
+ self.id = str(id(self))
+ self.data_to_dst = []
+ self.handler = None
+ self.mode = IOLoop.READ
+ self.closed = False
+
+ def __call__(self, fd, events):
+ if events & IOLoop.READ:
+ self.on_read()
+ if events & IOLoop.WRITE:
+ self.on_write()
+ if events & IOLoop.ERROR:
+ self.close(reason='error event occurred')
+
+ def set_handler(self, handler):
+ if not self.handler:
+ self.handler = handler
+
+ def update_handler(self, mode):
+ if self.mode != mode:
+ self.loop.update_handler(self.fd, mode)
+ self.mode = mode
+ if mode == IOLoop.WRITE:
+ self.loop.call_later(0.1, self, self.fd, IOLoop.WRITE)
+
+ def on_read(self):
+ logging.debug('worker {} on read'.format(self.id))
+ try:
+ data = self.chan.recv(BUF_SIZE)
+ except (OSError, IOError) as e:
+ logging.error(e)
+ if self.chan.closed or errno_from_exception(e) in _ERRNO_CONNRESET:
+ self.close(reason='chan error on reading')
+ else:
+ logging.debug('{!r} from {}:{}'.format(data, *self.dst_addr))
+ if not data:
+ self.close(reason='chan closed')
+ return
+
+ logging.debug('{!r} to {}:{}'.format(data, *self.handler.src_addr))
+ try:
+ self.handler.write_message(data, binary=True)
+ except tornado.websocket.WebSocketClosedError:
+ self.close(reason='websocket closed')
+
+ def on_write(self):
+ logging.debug('worker {} on write'.format(self.id))
+ if not self.data_to_dst:
+ return
+
+ data = ''.join(self.data_to_dst)
+ logging.debug('{!r} to {}:{}'.format(data, *self.dst_addr))
+
+ try:
+ sent = self.chan.send(data)
+ except (OSError, IOError) as e:
+ logging.error(e)
+ if self.chan.closed or errno_from_exception(e) in _ERRNO_CONNRESET:
+ self.close(reason='chan error on writing')
+ else:
+ self.update_handler(IOLoop.WRITE)
+ else:
+ self.data_to_dst = []
+ data = data[sent:]
+ if data:
+ self.data_to_dst.append(data)
+ self.update_handler(IOLoop.WRITE)
+ else:
+ self.update_handler(IOLoop.READ)
+
+ def close(self, reason=None):
+ if self.closed:
+ return
+ self.closed = True
+
+ logging.info(
+ 'Closing worker {} with reason: {}'.format(self.id, reason)
+ )
+ if self.handler:
+ self.loop.remove_handler(self.fd)
+ self.handler.close(reason=reason)
+ self.chan.close()
+ self.ssh.close()
+ logging.info('Connection to {}:{} lost'.format(*self.dst_addr))
+
+ clear_worker(self, clients)
+ logging.debug(clients)