~mvo/python-apt/debian-sid-mirror

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#!/usr/bin/env python
import threading, Queue, time, re, os, tempfile
import aptsources.sourceslist
import aptsources.distro
from timeit import Timer
import urllib
import socket
import random
socket.setdefaulttimeout(2)

class MirrorTest:
    class PingWorker(threading.Thread):
        def __init__(self, jobs, results, id):
            self.id = id
            self.jobs = jobs
            self.results = results
            self.match_result = re.compile(r"^rtt .* = [\.\d]+/([\.\d]+)/.*")
            threading.Thread.__init__(self)
        def run(self):
            result = None
            while MirrorTest.completed_pings < MirrorTest.todo:
                try:
                    mirror = self.jobs.get(True, 1)
                    host = mirror.hostname
                except:
                    continue
                print "Pinging (Worker %s) %s (%s) ..." % (self.id,
                                                           host,
                                                           MirrorTest.completed_pings)
                commando = os.popen("ping -q -c 4 -W 2 -i 0.3 %s" % host,
                                    "r")
                while True:
                    line = commando.readline()
                    if not line:
                        break
                    result = re.findall(self.match_result, line)
                MirrorTest.completed_pings_lock.acquire()
                MirrorTest.completed_pings += 1
                if result:
                    self.results.append([float(result[0]), host, mirror])
                MirrorTest.completed_pings_lock.release()

    def speed_test(self, mirror):
        url = "%s/%s" % (mirror.get_repo_urls()[0],
                         self.test_file)
        print "Downloading %s ..." % url
        start = time.time()
        try:
            data = urllib.urlopen(url).read(51200)
            return 50 / (time.time() - start)
        except:
            return 0

    def __init__(self, hosts, test_file):
        self.test_file = test_file
        jobs = Queue.Queue()
        results = []
        for h in hosts:
            jobs.put(h)
        self.threads = []
        MirrorTest.completed_pings = 0
        MirrorTest.completed_pings_lock = threading.Lock()
        MirrorTest.todo = len(hosts)

        for i in range(10):
            t = MirrorTest.PingWorker(jobs, results, i)
            self.threads.append(t)
            t.start()

        for t in self.threads:
            t.join()

        results.sort()
        print "\n\nTop ten RTTs:"
        for r in results[0:10]:
            print "%s: %s" % (r[1], r[0])
        print "\n\n"

        results.insert(0, [0, "rand", hosts[random.randint(1, len(hosts))]])
        results.insert(0, [0, "rand", hosts[random.randint(1, len(hosts))]])

        final = map(lambda r: (self.speed_test(r[2]), r[2]),
                    results[0:12])
        final.sort()
        final.reverse()
        print "\n\nBest mirrors:"
        for f in final:
            print "%s: %s KByte/s" % (f[1].hostname, int(f[0]))

if __name__ == "__main__":
    distro = aptsources.distro.get_distro()
    distro.get_sources(aptsources.sourceslist.SourcesList())
    pipe = os.popen("dpkg --print-architecture")
    arch = pipe.read().strip()
    test_file = "dists/%s/%s/binary-%s/Packages.gz" % \
                (distro.source_template.name,
                 distro.source_template.components[0].name,
                 arch)
    app = MirrorTest(distro.source_template.mirror_set.values(),
                     test_file)