1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import print_function
import datetime
import json
import logging
import os
import sys
import requests
import voluptuous
import voluptuous.humanize
from mozbuild.telemetry import (
schema as build_telemetry_schema,
verify_statedir,
)
BUILD_TELEMETRY_URL = "https://incoming.telemetry.mozilla.org/{endpoint}"
SUBMIT_ENDPOINT = "submit/eng-workflow/build/1/{ping_uuid}"
STATUS_ENDPOINT = "status"
def delete_expired_files(directory, days=30):
"""Discards files in a directory older than a specified number
of days
"""
now = datetime.datetime.now()
for filename in os.listdir(directory):
filepath = os.path.join(directory, filename)
ctime = os.path.getctime(filepath)
then = datetime.datetime.fromtimestamp(ctime)
if (now - then) > datetime.timedelta(days=days):
os.remove(filepath)
return
def check_edge_server_status(session):
"""Returns True if the Telemetry Edge Server
is ready to accept data
"""
status_url = BUILD_TELEMETRY_URL.format(endpoint=STATUS_ENDPOINT)
response = session.get(status_url)
if response.status_code != 200:
return False
return True
def send_telemetry_ping(session, data, ping_uuid):
"""Sends a single build telemetry ping to the
edge server, returning the response object
"""
resource_url = SUBMIT_ENDPOINT.format(ping_uuid=str(ping_uuid))
url = BUILD_TELEMETRY_URL.format(endpoint=resource_url)
response = session.post(url, json=data)
return response
def submit_telemetry_data(outgoing, submitted):
"""Sends information about `./mach build` invocations to
the Telemetry pipeline
"""
with requests.Session() as session:
# Confirm the server is OK
if not check_edge_server_status(session):
logging.error('Error posting to telemetry: server status is not "200 OK"')
return 1
for filename in os.listdir(outgoing):
path = os.path.join(outgoing, filename)
if os.path.isdir(path) or not path.endswith(".json"):
logging.info("skipping item {}".format(path))
continue
ping_uuid = os.path.splitext(filename)[0] # strip ".json" to get ping UUID
try:
with open(path, "r") as f:
data = json.load(f)
# Verify the data matches the schema
voluptuous.humanize.validate_with_humanized_errors(
data, build_telemetry_schema
)
response = send_telemetry_ping(session, data, ping_uuid)
if response.status_code != 200:
msg = "response code {code} sending {uuid} to telemetry: {body}".format(
body=response.content,
code=response.status_code,
uuid=ping_uuid,
)
logging.error(msg)
continue
# Move from "outgoing" to "submitted"
os.rename(
os.path.join(outgoing, filename), os.path.join(submitted, filename)
)
logging.info("successfully posted {} to telemetry".format(ping_uuid))
except ValueError as ve:
# ValueError is thrown if JSON cannot be decoded
logging.exception("exception parsing JSON at %s: %s" % (path, str(ve)))
os.remove(path)
except voluptuous.Error as e:
# Invalid is thrown if some data does not fit
# the correct Schema
logging.exception("invalid data found at %s: %s" % (path, e.message))
os.remove(path)
except Exception as e:
logging.error("exception posting to telemetry " "server: %s" % str(e))
break
delete_expired_files(submitted)
return 0
if __name__ == "__main__":
if len(sys.argv) != 2:
print("usage: python submit_telemetry_data.py <statedir>")
sys.exit(1)
statedir = sys.argv[1]
try:
outgoing, submitted, telemetry_log = verify_statedir(statedir)
# Configure logging
logging.basicConfig(
filename=telemetry_log,
format="%(asctime)s %(message)s",
level=logging.DEBUG,
)
sys.exit(submit_telemetry_data(outgoing, submitted))
except Exception as e:
# Handle and print messages from `statedir` verification
print(e.message)
sys.exit(1)
|