summaryrefslogtreecommitdiffstats
path: root/third_party/python/taskcluster/taskcluster/aio/reader_writer.py
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/python/taskcluster/taskcluster/aio/reader_writer.py')
-rw-r--r--third_party/python/taskcluster/taskcluster/aio/reader_writer.py81
1 files changed, 81 insertions, 0 deletions
diff --git a/third_party/python/taskcluster/taskcluster/aio/reader_writer.py b/third_party/python/taskcluster/taskcluster/aio/reader_writer.py
new file mode 100644
index 0000000000..2d9880b3a0
--- /dev/null
+++ b/third_party/python/taskcluster/taskcluster/aio/reader_writer.py
@@ -0,0 +1,81 @@
+"""
+Utilities supporting the "reader" and "writer" definitions used in uploads and downloads.
+"""
+import asyncio
+import io
+
+
+class BufferWriter:
+ """A writer that writes to an in-memory buffer"""
+ def __init__(self):
+ self.buf = io.BytesIO()
+
+ async def write(self, chunk):
+ self.buf.write(chunk)
+
+ def getbuffer(self):
+ """Get the content of the in-memory buffer"""
+ return self.buf.getbuffer()
+
+
+class BufferReader:
+ """A reader that reads from an in-memory buffer"""
+ def __init__(self, data):
+ self.buf = io.BytesIO(data)
+
+ async def read(self, max_size):
+ return self.buf.read(max_size)
+
+
+class FileWriter:
+ """A writer that writes to a (sync) file. The file should be opened in binary mode
+ and empty."""
+ def __init__(self, file):
+ self.file = file
+
+ async def write(self, chunk):
+ self.file.write(chunk)
+
+
+class FileReader:
+ """A reader that reads from a (sync) file. The file should be opened in binary mode,
+ and positioned at its beginning."""
+ def __init__(self, file):
+ self.file = file
+
+ async def read(self, max_size):
+ return self.file.read(max_size)
+
+
+async def streamingCopy(reader, writer):
+ "Copy data from a reader to a writer, as those are defined in upload.py and download.py"
+ # we will read and write concurrently, but with limited buffering -- just enough
+ # that read and write operations are not forced to alternate
+ chunk_size = 64 * 1024
+ q = asyncio.Queue(maxsize=1)
+
+ async def read_loop():
+ while True:
+ chunk = await reader.read(chunk_size)
+ await q.put(chunk)
+ if not chunk:
+ break
+
+ async def write_loop():
+ while True:
+ chunk = await q.get()
+ if not chunk:
+ q.task_done()
+ break
+ await writer.write(chunk)
+ q.task_done()
+
+ read_task = asyncio.ensure_future(read_loop())
+ write_task = asyncio.ensure_future(write_loop())
+
+ try:
+ await asyncio.gather(read_task, write_task)
+ finally:
+ # cancel any still-running tasks, as in case of an exception
+ read_task.cancel()
+ write_task.cancel()