diff options
Diffstat (limited to 'security/nss/automation/taskcluster/graph/src/queue.js')
-rw-r--r-- | security/nss/automation/taskcluster/graph/src/queue.js | 304 |
1 files changed, 304 insertions, 0 deletions
diff --git a/security/nss/automation/taskcluster/graph/src/queue.js b/security/nss/automation/taskcluster/graph/src/queue.js new file mode 100644 index 0000000000..1baa604174 --- /dev/null +++ b/security/nss/automation/taskcluster/graph/src/queue.js @@ -0,0 +1,304 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +import {clone} from "merge"; +import merge from "./merge"; +import slugid from "slugid"; +import taskcluster from "taskcluster-client"; +import * as image_builder from "./image_builder"; + +let maps = []; +let filters = []; + +let tasks = new Map(); +let tags = new Map(); +let image_tasks = new Map(); +let parameters = {}; + +let queue = new taskcluster.Queue({ + rootUrl: process.env.TASKCLUSTER_PROXY_URL, +}); + +function fromNow(hours) { + let d = new Date(); + d.setHours(d.getHours() + (hours|0)); + return d.toJSON(); +} + +function parseRoutes(routes) { + let rv = [ + `tc-treeherder.v2.${process.env.TC_PROJECT}.${process.env.NSS_HEAD_REVISION}.${process.env.NSS_PUSHLOG_ID}`, + ...routes + ]; + + // Notify about failures (except on try). + // Turned off, too noisy. + /*if (process.env.TC_PROJECT != "nss-try") { + rv.push(`notify.email.${process.env.TC_OWNER}.on-failed`, + `notify.email.${process.env.TC_OWNER}.on-exception`); + }*/ + + return rv; +} + +function parseFeatures(list) { + return list.reduce((map, feature) => { + map[feature] = true; + return map; + }, {}); +} + +function parseArtifacts(artifacts) { + let copy = clone(artifacts); + Object.keys(copy).forEach(key => { + copy[key].expires = fromNow(copy[key].expires); + }); + return copy; +} + +function parseCollection(name) { + let collection = {}; + collection[name] = true; + return collection; +} + +function parseTreeherder(def) { + let treeherder = { + build: { + platform: def.platform + }, + machine: { + platform: def.platform + }, + symbol: def.symbol, + jobKind: def.kind + }; + + if (def.group) { + treeherder.groupSymbol = def.group; + } + + if (def.collection) { + treeherder.collection = parseCollection(def.collection); + } + + if (def.tier) { + treeherder.tier = def.tier; + } + + return treeherder; +} + +function convertTask(def) { + let scopes = []; + let dependencies = []; + + let env = merge({ + NSS_HEAD_REPOSITORY: process.env.NSS_HEAD_REPOSITORY, + NSS_HEAD_REVISION: process.env.NSS_HEAD_REVISION, + NSS_MAX_MP_PBE_ITERATION_COUNT: "100", + }, def.env || {}); + + if (def.parent) { + dependencies.push(def.parent); + env.TC_PARENT_TASK_ID = def.parent; + } + if (def.parents) { + dependencies = dependencies.concat(def.parents); + } + + if (def.tests) { + env.NSS_TESTS = def.tests; + } + + if (def.cycle) { + env.NSS_CYCLES = def.cycle; + } + if (def.kind === "build") { + // Disable leak checking during builds (bug 1579290). + if (env.ASAN_OPTIONS) { + env.ASAN_OPTIONS += ":detect_leaks=0"; + } else { + env.ASAN_OPTIONS = "detect_leaks=0"; + } + } + + let payload = { + env, + command: def.command, + maxRunTime: def.maxRunTime || 3600 + }; + + if (def.image) { + payload.image = def.image; + } + + if (def.artifacts) { + payload.artifacts = parseArtifacts(def.artifacts); + } + + if (def.features) { + payload.features = parseFeatures(def.features); + + if (payload.features.allowPtrace) { + scopes.push("docker-worker:feature:allowPtrace"); + } + } + + if (def.scopes) { + // Need to add existing scopes in the task definition + scopes.push.apply(scopes, def.scopes) + } + + let extra = Object.assign({ + treeherder: parseTreeherder(def) + }, parameters); + + return { + provisionerId: def.provisioner || `nss-${process.env.MOZ_SCM_LEVEL}`, + workerType: def.workerType || "linux-gcp", + schedulerId: process.env.TC_SCHEDULER_ID, + taskGroupId: process.env.TASK_ID, + + scopes, + created: fromNow(0), + deadline: fromNow(24), + + dependencies, + requires: def.requires || "all-completed", + routes: parseRoutes(def.routes || []), + + metadata: { + name: def.name, + description: def.name, + owner: process.env.TC_OWNER, + source: process.env.TC_SOURCE + }, + + payload, + extra, + }; +} + +export function map(fun) { + maps.push(fun); +} + +export function filter(fun) { + filters.push(fun); +} + +export function addParameters(params) { + parameters = Object.assign(parameters, params); +} + +export function clearFilters(fun) { + filters = []; +} + +export function taggedTasks(tag) { + return tags[tag]; +} + +export function scheduleTask(def) { + let taskId = slugid.v4(); + tasks.set(taskId, merge({}, def)); + return taskId; +} + +export async function submit() { + let promises = new Map(); + + for (let [taskId, task] of tasks) { + // Allow filtering tasks before we schedule them. + if (!filters.every(filter => filter(task))) { + continue; + } + + // Allow changing tasks before we schedule them. + maps.forEach(map => { task = map(merge({}, task)) }); + + let log_id = `${task.name} @ ${task.platform}[${task.collection || "opt"}]`; + if (task.group) { + log_id = `${task.group}::${log_id}`; + } + console.log(`+ Submitting ${log_id}.`); + + // Index that task for each tag specified + if(task.tags) { + task.tags.map(tag => { + if(!tags[tag]) { + tags[tag] = []; + } + tags[tag].push(taskId); + }); + } + + let parent = task.parent; + + // Convert the task definition. + task = await convertTask(task); + + // Convert the docker image definition. + let image_def = task.payload.image; + if (image_def && image_def.hasOwnProperty("path")) { + let key = `${image_def.name}:${image_def.path}`; + let data = {}; + + // Check the cache first. + if (image_tasks.has(key)) { + data = image_tasks.get(key); + } else { + data.taskId = await image_builder.findTask(image_def); + data.isPending = !data.taskId; + + // No task found. + if (data.isPending) { + let image_task = await image_builder.buildTask(image_def); + + // Schedule a new image builder task immediately. + data.taskId = slugid.v4(); + + try { + await queue.createTask(data.taskId, convertTask(image_task)); + } catch (e) { + console.error("! FAIL: Scheduling image builder task failed."); + continue; /* Skip this task on failure. */ + } + } + + // Store in cache. + image_tasks.set(key, data); + } + + if (data.isPending) { + task.dependencies.push(data.taskId); + } + + task.payload.image = { + path: "public/image.tar.zst", + taskId: data.taskId, + type: "task-image" + }; + } + + // Wait for the parent task to be created before scheduling dependants. + let predecessor = parent ? promises.get(parent) : Promise.resolve(); + + promises.set(taskId, predecessor.then(() => { + // Schedule the task. + return queue.createTask(taskId, task).catch(err => { + console.error(`! FAIL: Scheduling ${log_id} failed.`, err); + }); + })); + } + + // Wait for all requests to finish. + if (promises.length) { + await Promise.all([...promises.values()]); + console.log("=== Total:", promises.length, "tasks. ==="); + } + + tasks.clear(); +} |