# -*- coding: utf-8 -*- # Description: # Author: Pawel Krupa (paulfantom) # Author: Ilya Mashchenko (ilyam8) # SPDX-License-Identifier: GPL-3.0-or-later import urllib3 from bases.FrameworkServices.SimpleService import SimpleService try: urllib3.disable_warnings() except AttributeError: pass class UrlService(SimpleService): def __init__(self, configuration=None, name=None): SimpleService.__init__(self, configuration=configuration, name=name) self.url = self.configuration.get('url') self.user = self.configuration.get('user') self.password = self.configuration.get('pass') self.proxy_user = self.configuration.get('proxy_user') self.proxy_password = self.configuration.get('proxy_pass') self.proxy_url = self.configuration.get('proxy_url') self.method = self.configuration.get('method', 'GET') self.header = self.configuration.get('header') self.request_timeout = self.configuration.get('timeout', 1) self.respect_retry_after_header = self.configuration.get('respect_retry_after_header') self.tls_verify = self.configuration.get('tls_verify') self.tls_ca_file = self.configuration.get('tls_ca_file') self.tls_key_file = self.configuration.get('tls_key_file') self.tls_cert_file = self.configuration.get('tls_cert_file') self._manager = None def __make_headers(self, **header_kw): user = header_kw.get('user') or self.user password = header_kw.get('pass') or self.password proxy_user = header_kw.get('proxy_user') or self.proxy_user proxy_password = header_kw.get('proxy_pass') or self.proxy_password custom_header = header_kw.get('header') or self.header header_params = dict(keep_alive=True) proxy_header_params = dict() if user and password: header_params['basic_auth'] = '{user}:{password}'.format(user=user, password=password) if proxy_user and proxy_password: proxy_header_params['proxy_basic_auth'] = '{user}:{password}'.format(user=proxy_user, password=proxy_password) try: header, proxy_header = urllib3.make_headers(**header_params), urllib3.make_headers(**proxy_header_params) except TypeError as error: self.error('build_header() error: {error}'.format(error=error)) return None, None else: header.update(custom_header or dict()) return header, proxy_header def _build_manager(self, **header_kw): header, proxy_header = self.__make_headers(**header_kw) if header is None or proxy_header is None: return None proxy_url = header_kw.get('proxy_url') or self.proxy_url if proxy_url: manager = urllib3.ProxyManager params = dict(proxy_url=proxy_url, headers=header, proxy_headers=proxy_header) else: manager = urllib3.PoolManager params = dict(headers=header) tls_cert_file = self.tls_cert_file if tls_cert_file: params['cert_file'] = tls_cert_file # NOTE: key_file is useless without cert_file, but # cert_file may include the key as well. tls_key_file = self.tls_key_file if tls_key_file: params['key_file'] = tls_key_file tls_ca_file = self.tls_ca_file if tls_ca_file: params['ca_certs'] = tls_ca_file try: url = header_kw.get('url') or self.url if url.startswith('https') and not self.tls_verify and not tls_ca_file: params['ca_certs'] = None return manager(assert_hostname=False, cert_reqs='CERT_NONE', **params) return manager(**params) except (urllib3.exceptions.ProxySchemeUnknown, TypeError) as error: self.error('build_manager() error:', str(error)) return None def _get_raw_data(self, url=None, manager=None, **kwargs): """ Get raw data from http request :return: str """ try: status, data = self._get_raw_data_with_status(url, manager, **kwargs) except Exception as error: self.error('Url: {url}. Error: {error}'.format(url=url or self.url, error=error)) return None if status == 200: return data else: self.debug('Url: {url}. Http response status code: {code}'.format(url=url or self.url, code=status)) return None def _get_raw_data_with_status(self, url=None, manager=None, retries=1, redirect=True, **kwargs): """ Get status and response body content from http request. Does not catch exceptions :return: int, str """ url = url or self.url manager = manager or self._manager retry = urllib3.Retry(retries) if hasattr(retry, 'respect_retry_after_header'): retry.respect_retry_after_header = bool(self.respect_retry_after_header) response = manager.request( method=self.method, url=url, timeout=self.request_timeout, retries=retry, headers=manager.headers, redirect=redirect, **kwargs ) if isinstance(response.data, str): return response.status, response.data return response.status, response.data.decode() def check(self): """ Format configuration data and try to connect to server :return: boolean """ if not (self.url and isinstance(self.url, str)): self.error('URL is not defined or type is not ') return False self._manager = self._build_manager() if not self._manager: return False try: data = self._get_data() except Exception as error: self.error('_get_data() failed. Url: {url}. Error: {error}'.format(url=self.url, error=error)) return False if isinstance(data, dict) and data: return True self.error('_get_data() returned no data or type is not ') return False