summaryrefslogtreecommitdiffstats
path: root/.github/scripts/netdata-pkgcloud-cleanup.py
blob: f6311e47cd627276400bcb3808d3acb8c112a90a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#!/bin/env python3

import requests
from requests.auth import HTTPBasicAuth
from datetime import date, datetime, timedelta
import os
import sys
import argparse
from pprint import pprint
from datetime import datetime
from dateutil import parser


class PackageCloud:
    NUM_PACKAGE_MINOR_TO_KEEP = 5
    NUM_RETENTION_DAYS = 30
    # number of pages to process. Use '0' to process all
    MAX_PAGES = 0

    def __init__(self, repo_type, dry_run=True, auth_token=None):
        self.headers = {
            "Accept" : "application/json",
            "Content-Type" : "application/json",
        }
        self.dry_run = dry_run
        self.repo_type = repo_type
        if repo_type == "stable":
            repo = "netdata/netdata"
        elif repo_type == "devel":
            repo = "netdata/netdata-devel"
        elif repo_type == "edge":
            repo = "netdata/netdata-edge"
        else:
            print(f"ERROR: unknown repo type '{repo_type}'!\nAccepted values are: stable,devel,edge")
            sys.exit(1)
        self.base_url = f"https://packagecloud.io/api/v1/repos/{repo}"
        self.auth = HTTPBasicAuth(username=auth_token, password='') if auth_token else None

    def get_all_packages(self):
        page = 1
        all_pkg_list = []
        while True:
            url = f"{self.base_url}/packages.json?page={page}"
            if page > self.MAX_PAGES and self.MAX_PAGES != 0:
                break
            else:
                pkg_list = requests.get(url, auth=self.auth, headers=self.headers).json()
                if len(pkg_list) == 0:
                    break
                else:
                    print(f"Processing page: {page}")
                    for element in pkg_list:
                        self.is_pkg_older_than_days(element, 30)
                        if element['name'] != 'netdata-repo' and element['name'] != 'netdata-repo-edge':
                            all_pkg_list.append(element)
            page += 1
        return all_pkg_list

    def delete_package(self, destroy_url):
        if self.dry_run:
            print(f"         - DRY_RUN mode. Not deleting package '{destroy_url}'.")
        else:
            print(f"         - Deleting package: {destroy_url}")
            url = f"https://packagecloud.io{destroy_url}"
            response = requests.delete(url, auth=self.auth, headers=self.headers).json()
            response = None
            if not response:
                print(f"           Package deleted successfully.")
            else:
                print(f"           Failed deleting package!")

    def get_destroy_url(self, pkg_url):
        url = f"https://packagecloud.io{pkg_url}"
        response = requests.get(url, auth=self.auth, headers=self.headers)
        response.raise_for_status()
        return response.json()['destroy_url']

    def get_packages_for_distro(self, distro, all_pkg_list):
        distro_pkg_list = [ pkg for pkg in all_pkg_list if pkg['distro_version'] == distro ]
        return distro_pkg_list

    def get_packages_for_arch(self, arch, all_pkg_list):
        arch_pkg_list = [ pkg for pkg in all_pkg_list if pkg['package_url'].split('/')[11] == arch ]
        return arch_pkg_list

    def get_arches(self, pkg_list):
        arches = list(set([pkg['package_url'].split('/')[11] for pkg in pkg_list ]))
        return arches

    def get_pkg_list(self, pkg_name, pkg_list):
        filtered_list = [ pkg for pkg in pkg_list if pkg['name'] == pkg_name ]
        return filtered_list

    def get_minor_versions(self, all_versions):
        minor_versions = ['.'.join(version.split('.')[:-1]) for version in all_versions ]
        minor_versions = list(set(minor_versions))
        minor_versions.sort()
        return minor_versions

    def is_pkg_older_than_days(self, pkg, num_days):
        pkg_create_date = datetime.strptime(pkg['created_at'], '%Y-%m-%dT%H:%M:%S.%fZ')
        time_difference = datetime.now() - pkg_create_date
        return time_difference.days > num_days

    def cleanup_repo(self):
        if self.repo_type == 'stable':
            self.cleanup_stable_repo()
        else:
            self.cleanup_edge_repo()

    def cleanup_edge_repo(self):
        all_pkg_list = self.get_all_packages()
        pkgs_to_delete = []
        pkgs_to_keep = []
        for package in all_pkg_list:
            if self.is_pkg_older_than_days(package, self.NUM_RETENTION_DAYS):
                pkgs_to_delete.append(package)
            else:
                pkgs_to_keep.append(package)
        print(f"Keeping the following packages (newer than {self.NUM_RETENTION_DAYS} days):")
        for pkg in pkgs_to_keep:
            print(f"    > pkg: {pkg['package_html_url']} / created_at: {pkg['created_at']}")
        print(f"Deleting the following packages (older than {self.NUM_RETENTION_DAYS} days):")
        for pkg in pkgs_to_delete:
            print(f"    > pkg: {pkg['package_html_url']} / created_at: {pkg['created_at']}")
            self.delete_package(pkg['destroy_url'])

    def cleanup_stable_repo(self):
        all_pkg_list = self.get_all_packages()
        all_distros = list(set([ pkg['distro_version'] for pkg in all_pkg_list ]))
        all_distros = sorted(all_distros)
        print(f"<> Distributions list: {all_distros}")

        for distro in all_distros:
            print(f">> Processing distro: {distro}")
            pkg_list_distro = self.get_packages_for_distro(distro, all_pkg_list)
            arches = self.get_arches(pkg_list_distro)
            print(f"   <> Arch list: {arches}")
            for arch in arches:
                print(f"   >> Processing arch: {distro} -> {arch}")
                pkg_list_arch = self.get_packages_for_arch(arch, pkg_list_distro)
                pkg_names = [pkg['name'] for pkg in pkg_list_arch]
                pkg_names = list(set(pkg_names))
                print(f"     <> Package names: {pkg_names}")
                for pkg_name in pkg_names:
                    print(f"     >> Processing package: {distro} -> {arch} -> {pkg_name}")
                    pkg_list = self.get_pkg_list(pkg_name, pkg_list_arch)
                    pkg_versions = [pkg['version'] for pkg in pkg_list]
                    pkg_minor_versions = self.get_minor_versions(pkg_versions)
                    pkg_minor_to_keep = pkg_minor_versions[-self.NUM_PACKAGE_MINOR_TO_KEEP:]
                    print(f"       <> Minor Package Versions to Keep: {pkg_minor_to_keep}")
                    pkg_minor_to_delete = list(set(pkg_minor_versions) - set(pkg_minor_to_keep))
                    print(f"       <> Minor Package Versions to Delete: {pkg_minor_to_delete}")
                    urls_to_keep = [pkg['package_url'] for pkg in pkg_list if '.'.join(pkg['version'].split('.')[:-1]) in pkg_minor_to_keep]
                    urls_to_delete = [pkg['package_url'] for pkg in pkg_list if '.'.join(pkg['version'].split('.')[:-1]) in pkg_minor_to_delete]
                    for pkg_url in urls_to_delete:
                        destroy_url = self.get_destroy_url(pkg_url)
                        self.delete_package(destroy_url)


def configure():
    parser = argparse.ArgumentParser()
    parser.add_argument('--repo-type', '-r', required=True,
                        help='Repository type against to perform cleanup')
    parser.add_argument('--dry-run', '-d', action='store_true',
                        help='Dry-run Mode')
    args = parser.parse_args()
    try:
        token = os.environ['PKGCLOUD_TOKEN']
    except Exception as e:
        print(f"FATAL: 'PKGCLOUD_TOKEN' environment variable is not set!", file=sys.stderr)
        sys.exit(1)
    repo_type = args.repo_type
    dry_run = args.dry_run
    conf = {
        'repo_type': args.repo_type,
        'dry_run': args.dry_run,
        'token': token
    }
    return conf


def main():
    config = configure()
    pkg_cloud = PackageCloud(config['repo_type'], config['dry_run'], config['token'])
    pkg_cloud.cleanup_repo()


if __name__ == "__main__":
    main()