#!/usr/bin/env python3 import os import queue import random import re import socket import threading import time import urllib.error import urllib.parse import urllib.request import aptsources.distro import aptsources.sourceslist socket.setdefaulttimeout(2) class MirrorTest: class PingWorker(threading.Thread): def __init__(self, jobs, results, id): self.id = id self.jobs = jobs self.results = results self.match_result = re.compile(r"^rtt .* = [\.\d]+/([\.\d]+)/.*") threading.Thread.__init__(self) def run(self): result = None while MirrorTest.completed_pings < MirrorTest.todo: try: mirror = self.jobs.get(True, 1) host = mirror.hostname except: continue print( "Pinging (Worker %s) %s (%s) ..." % (self.id, host, MirrorTest.completed_pings) ) commando = os.popen("ping -q -c 4 -W 2 -i 0.3 %s" % host, "r") while True: line = commando.readline() if not line: break result = re.findall(self.match_result, line) MirrorTest.completed_pings_lock.acquire() MirrorTest.completed_pings += 1 if result: self.results.append([float(result[0]), host, mirror]) MirrorTest.completed_pings_lock.release() def speed_test(self, mirror): url = f"{mirror.get_repo_urls()[0]}/{self.test_file}" print("Downloading %s ..." % url) start = time.time() try: urllib.request.urlopen(url).read(51200) return 50 / (time.time() - start) except: return 0 def __init__(self, hosts, test_file): self.test_file = test_file jobs = queue.Queue() results = [] for h in hosts: jobs.put(h) self.threads = [] MirrorTest.completed_pings = 0 MirrorTest.completed_pings_lock = threading.Lock() MirrorTest.todo = len(hosts) for i in range(10): t = MirrorTest.PingWorker(jobs, results, i) self.threads.append(t) t.start() for t in self.threads: t.join() results.sort() print("\n\nTop ten RTTs:") for r in results[0:10]: print(f"{r[1]}: {r[0]}") print("\n\n") results.insert(0, [0, "rand", hosts[random.randint(1, len(hosts))]]) results.insert(0, [0, "rand", hosts[random.randint(1, len(hosts))]]) final = [(self.speed_test(r[2]), r[2]) for r in results[0:12]] final.sort() final.reverse() print("\n\nBest mirrors:") for f in final: print(f"{f[1].hostname}: {int(f[0])} KByte/s") if __name__ == "__main__": distro = aptsources.distro.get_distro() distro.get_sources(aptsources.sourceslist.SourcesList()) pipe = os.popen("dpkg --print-architecture") arch = pipe.read().strip() test_file = "dists/{}/{}/binary-{}/Packages.gz".format( distro.source_template.name, distro.source_template.components[0].name, arch, ) app = MirrorTest(distro.source_template.mirror_set.values(), test_file)