diff options
Diffstat (limited to 'third_party/python/taskcluster/taskcluster/aio/asyncutils.py')
-rw-r--r-- | third_party/python/taskcluster/taskcluster/aio/asyncutils.py | 147 |
1 files changed, 147 insertions, 0 deletions
diff --git a/third_party/python/taskcluster/taskcluster/aio/asyncutils.py b/third_party/python/taskcluster/taskcluster/aio/asyncutils.py new file mode 100644 index 0000000000..ce2b9f6945 --- /dev/null +++ b/third_party/python/taskcluster/taskcluster/aio/asyncutils.py @@ -0,0 +1,147 @@ +from __future__ import absolute_import, division, print_function +import aiohttp +import aiohttp.hdrs +import asyncio +import async_timeout +import functools +import logging +import os +import six + +import taskcluster.utils as utils +import taskcluster.exceptions as exceptions + +log = logging.getLogger(__name__) + + +def createSession(*args, **kwargs): + return aiohttp.ClientSession(*args, **kwargs) + + +# Useful information: https://www.blog.pythonlibrary.org/2016/07/26/python-3-an-intro-to-asyncio/ +async def makeHttpRequest(method, url, payload, headers, retries=utils.MAX_RETRIES, session=None): + """ Make an HTTP request and retry it until success, return request """ + retry = -1 + response = None + implicit = False + if session is None: + implicit = True + session = aiohttp.ClientSession() + + def cleanup(): + if implicit: + loop = asyncio.get_event_loop() + loop.run_until_complete(session.close()) + + try: + while True: + retry += 1 + # if this isn't the first retry then we sleep + if retry > 0: + snooze = float(retry * retry) / 10.0 + log.info('Sleeping %0.2f seconds for exponential backoff', snooze) + await asyncio.sleep(snooze) + + # Seek payload to start, if it is a file + if hasattr(payload, 'seek'): + payload.seek(0) + + log.debug('Making attempt %d', retry) + try: + with async_timeout.timeout(60): + response = await makeSingleHttpRequest(method, url, payload, headers, session) + except aiohttp.ClientError as rerr: + if retry < retries: + log.warn('Retrying because of: %s' % rerr) + continue + # raise a connection exception + raise rerr + except ValueError as rerr: + log.warn('ValueError from aiohttp: redirect to non-http or https') + raise rerr + except RuntimeError as rerr: + log.warn('RuntimeError from aiohttp: session closed') + raise rerr + # Handle non 2xx status code and retry if possible + status = response.status + if 500 <= status and status < 600 and retry < retries: + if retry < retries: + log.warn('Retrying because of: %d status' % status) + continue + else: + raise exceptions.TaskclusterRestFailure("Unknown Server Error", superExc=None) + return response + finally: + cleanup() + # This code-path should be unreachable + assert False, "Error from last retry should have been raised!" + + +async def makeSingleHttpRequest(method, url, payload, headers, session=None): + method = method.upper() + log.debug('Making a %s request to %s', method, url) + log.debug('HTTP Headers: %s' % str(headers)) + log.debug('HTTP Payload: %s (limit 100 char)' % str(payload)[:100]) + implicit = False + if session is None: + implicit = True + session = aiohttp.ClientSession() + + skip_auto_headers = [aiohttp.hdrs.CONTENT_TYPE] + + try: + # https://docs.aiohttp.org/en/stable/client_quickstart.html#passing-parameters-in-urls + # we must avoid aiohttp's helpful "requoting" functionality, as it breaks Hawk signatures + url = aiohttp.client.URL(url, encoded=True) + async with session.request( + method, url, data=payload, headers=headers, + skip_auto_headers=skip_auto_headers, compress=False + ) as resp: + response_text = await resp.text() + log.debug('Received HTTP Status: %s' % resp.status) + log.debug('Received HTTP Headers: %s' % str(resp.headers)) + log.debug('Received HTTP Payload: %s (limit 1024 char)' % + six.text_type(response_text)[:1024]) + return resp + finally: + if implicit: + await session.close() + + +async def putFile(filename, url, contentType, session=None): + with open(filename, 'rb') as f: + contentLength = os.fstat(f.fileno()).st_size + return await makeHttpRequest('put', url, f, headers={ + 'Content-Length': str(contentLength), + 'Content-Type': contentType, + }, session=session) + + +def runAsync(coro): + """ + Replacement of asyncio.run, as it doesn't exist in python<3.7. + """ + asyncio.set_event_loop(asyncio.new_event_loop()) + loop = asyncio.get_event_loop() + result = loop.run_until_complete(coro) + loop.close() + return result + + +def ensureCoro(func): + """ + If func is a regular function, execute in a thread and return an + async version of it. If func is already an async function, return + it without change. + """ + if asyncio.iscoroutinefunction(func): + return func + + @functools.wraps(func) + async def coro(*args, **kwargs): + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + functools.partial(func, *args, **kwargs) + ) + return coro |