Currently, many tasks fetch content from the Internets. A problem with that is fetching from the Internets is unreliable: servers may have outages or be slow; content may disappear or change out from under us. The unreliability of 3rd party services poses a risk to Firefox CI. If services aren't available, we could potentially not run some CI tasks. In the worst case, we might not be able to release Firefox. That would be bad. In fact, as I write this, gmplib.org has been unavailable for ~24 hours and Firefox CI is unable to retrieve the GMP source code. As a result, building GCC toolchains is failing. A solution to this is to make tasks more hermetic by depending on fewer network services (which by definition aren't reliable over time and therefore introduce instability). This commit attempts to mitigate some external service dependencies by introducing the *fetch* task kind. The primary goal of the *fetch* kind is to obtain remote content and re-expose it as a task artifact. By making external content available as a cached task artifact, we allow dependent tasks to consume this content without touching the service originally providing that content, thus eliminating a run-time dependency and making tasks more hermetic and reproducible over time. We introduce a single "fetch-url" "using" flavor to define tasks that fetch single URLs and then re-expose that URL as an artifact. Powering this is a new, minimal "fetch" Docker image that contains a "fetch-content" Python script that does the work for us. We have added tasks to fetch source archives used to build the GCC toolchains. Fetching remote content and re-exposing it as an artifact is not very useful by itself: the value is in having tasks use those artifacts. We introduce a taskgraph transform that allows tasks to define an array of "fetches." Each entry corresponds to the name of a "fetch" task kind. When present, the corresponding "fetch" task is added as a dependency. And the task ID and artifact path from that "fetch" task is added to the MOZ_FETCHES environment variable of the task depending on it. Our "fetch-content" script has a "task-artifacts" sub-command that tasks can execute to perform retrieval of all artifacts listed in MOZ_FETCHES. To prove all of this works, the code for fetching dependencies when building GCC toolchains has been updated to use `fetch-content`. The now-unused legacy code has been deleted. This commit improves the reliability and efficiency of GCC toolchain tasks. Dependencies now all come from task artifacts and should always be available in the common case. In addition, `fetch-content` downloads and extracts files concurrently. This makes it faster than the serial application which we were previously using. There are some things I don't like about this commit. First, a new Docker image and Python script for downloading URLs feels a bit heavyweight. The Docker image is definitely overkill as things stand. I can eventually justify it because I want to implement support for fetching and repackaging VCS repositories and for caching Debian packages. These will require more packages than what I'm comfortable installing on the base Debian image, therefore justifying a dedicated image. The `fetch-content static-url` sub-command could definitely be implemented as a shell script. But Python is readily available and is more pleasant to maintain than shell, so I wrote it in Python. `fetch-content task-artifacts` is more advanced and writing it in Python is more justified, IMO. FWIW, the script is Python 3 only, which conveniently gives us access to `concurrent.futures`, which facilitates concurrent download. `fetch-content` also duplicates functionality found elsewhere. generic-worker's task payload supports a "mounts" feature which facilitates downloading remote content, including from a task artifact. However, this feature doesn't exist on docker-worker. So we have to implement downloading inside the task rather than at the worker level. I concede that if all workers had generic-worker's "mounts" feature and supported concurrent download, `fetch-content` wouldn't need to exist. `fetch-content` also duplicates functionality of `mach artifact toolchain`. I probably could have used `mach artifact toolchain` instead of writing `fetch-content task-artifacts`. However, I didn't want to introduce the requirement of a VCS checkout. `mach artifact toolchain` has its origins in providing a feature to the build system. And "fetching artifacts from tasks" is a more generic feature than that. I think it should be implemented as a generic feature and not something that is "toolchain" specific. I think the best place for a generic "fetch content" feature is in the worker, where content can be defined in the task payload. But as explained above, that feature isn't universally available. The next best place is probably run-task. run-task already performs generic, very-early task preparation steps, such as performing a VCS checkout. I would like to fold `fetch-content` into run-task and make it all driven by environment variables. But run-task is currently Python 2 and achieving concurrency would involve a bit of programming (or adding package dependencies). I may very well port run-task to Python 3 and then fold fetch-content into it. Or maybe we leave `fetch-content` as a standalone script. MozReview-Commit-ID: AGuTcwNcNJR
269 lines
8.2 KiB
Python
Executable File
269 lines
8.2 KiB
Python
Executable File
#!/usr/bin/python3 -u
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
import argparse
|
|
import concurrent.futures
|
|
import hashlib
|
|
import multiprocessing
|
|
import os
|
|
import pathlib
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
import urllib.request
|
|
|
|
|
|
ARTIFACT_URL = ('https://queue.taskcluster.net/v1/task/{task}/artifacts/'
|
|
'{artifact}')
|
|
|
|
CONCURRENCY = multiprocessing.cpu_count()
|
|
|
|
|
|
class IntegrityError(Exception):
|
|
"""Represents an integrity error when downloading a URL."""
|
|
|
|
|
|
def stream_download(url, sha256=None, size=None):
|
|
"""Download a URL to a generator, optionally with content verification.
|
|
|
|
If ``sha256`` or ``size`` are defined, the downloaded URL will be
|
|
validated against those requirements and ``IntegrityError`` will be
|
|
raised if expectations do not match.
|
|
|
|
Because verification cannot occur until the file is completely downloaded
|
|
it is recommended for consumers to not do anything meaningful with the
|
|
data if content verification is being used. To securely handle retrieved
|
|
content, it should be streamed to a file or memory and only operated
|
|
on after the generator is exhausted without raising.
|
|
"""
|
|
print('downloading %s' % url)
|
|
|
|
h = hashlib.sha256()
|
|
length = 0
|
|
|
|
t0 = time.time()
|
|
with urllib.request.urlopen(url) as fh:
|
|
while True:
|
|
chunk = fh.read(65536)
|
|
if not chunk:
|
|
break
|
|
|
|
h.update(chunk)
|
|
length += len(chunk)
|
|
|
|
yield chunk
|
|
|
|
duration = time.time() - t0
|
|
digest = h.hexdigest()
|
|
|
|
print('%s resolved to %d bytes with sha256 %s in %.3fs' % (
|
|
url, length, digest, duration))
|
|
|
|
if size:
|
|
if size == length:
|
|
print('verified size of %s' % url)
|
|
else:
|
|
raise IntegrityError('size mismatch on %s: wanted %d; got %d' % (
|
|
url, size, length))
|
|
|
|
if sha256:
|
|
if digest == sha256:
|
|
print('verified sha256 integrity of %s' % url)
|
|
else:
|
|
raise IntegrityError('sha256 mismatch on %s: wanted %s; got %s' % (
|
|
url, sha256, digest))
|
|
|
|
|
|
def download_to_path(url, path, sha256=None, size=None):
|
|
"""Download a URL to a filesystem path, possibly with verification."""
|
|
|
|
# We download to a temporary file and rename at the end so there's
|
|
# no chance of the final file being partially written or containing
|
|
# bad data.
|
|
try:
|
|
path.unlink()
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
tmp = path.with_name('%s.tmp' % path.name)
|
|
|
|
print('downloading %s to %s' % (url, tmp))
|
|
|
|
try:
|
|
with tmp.open('wb') as fh:
|
|
for chunk in stream_download(url, sha256=sha256, size=size):
|
|
fh.write(chunk)
|
|
|
|
print('renaming to %s' % path)
|
|
tmp.rename(path)
|
|
except IntegrityError:
|
|
tmp.unlink()
|
|
raise
|
|
|
|
|
|
def gpg_verify_path(path: pathlib.Path, public_key_data: bytes,
|
|
signature_data: bytes):
|
|
"""Verify that a filesystem path verifies using GPG.
|
|
|
|
Takes a Path defining a file to verify. ``public_key_data`` contains
|
|
bytes with GPG public key data. ``signature_data`` contains a signed
|
|
GPG document to use with ``gpg --verify``.
|
|
"""
|
|
print('validating GPG signature of %s' % path)
|
|
print('GPG key data:\n%s' % public_key_data.decode('ascii'))
|
|
|
|
with tempfile.TemporaryDirectory() as td:
|
|
try:
|
|
# --batch since we're running unattended.
|
|
gpg_args = ['gpg', '--homedir', td, '--batch']
|
|
|
|
print('importing GPG key...')
|
|
subprocess.run(gpg_args + ['--import'],
|
|
input=public_key_data,
|
|
check=True)
|
|
|
|
print('verifying GPG signature...')
|
|
subprocess.run(gpg_args + ['--verify', '-', '%s' % path],
|
|
input=signature_data,
|
|
check=True)
|
|
|
|
print('GPG signature verified!')
|
|
finally:
|
|
# There is a race between the agent self-terminating and
|
|
# shutil.rmtree() from the temporary directory cleanup that can
|
|
# lead to exceptions. Kill the agent before cleanup to prevent this.
|
|
env = dict(os.environ)
|
|
env['GNUPGHOME'] = td
|
|
subprocess.run(['gpgconf', '--kill', 'gpg-agent'], env=env)
|
|
|
|
|
|
def extract_archive(path, dest_dir):
|
|
"""Extract an archive to a destination directory."""
|
|
|
|
if re.search('\.tar\..*$', path.name):
|
|
args = ['tar', 'xaf', str(path)]
|
|
elif path.name.endswith('.zip'):
|
|
args = ['unzip', str(path)]
|
|
else:
|
|
print('%s is unknown archive format; ignoring' % path)
|
|
return False
|
|
|
|
print('extracting %s to %s using %r' % (path, dest_dir, args))
|
|
t0 = time.time()
|
|
subprocess.check_call(args, cwd=str(dest_dir), bufsize=0)
|
|
print('%s extracted in %.3fs' % (path, time.time() - t0))
|
|
return True
|
|
|
|
|
|
def fetch_and_extract(url, dest_dir, sha256=None, size=None):
|
|
"""Fetch a URL and extract it to a destination path.
|
|
|
|
If the downloaded URL is an archive, it is extracted automatically
|
|
and the archive is deleted. Otherwise the file remains in place in
|
|
the destination directory.
|
|
"""
|
|
|
|
basename = url.split('/')[-1]
|
|
dest_path = dest_dir / basename
|
|
|
|
download_to_path(url, dest_path, sha256=sha256, size=size)
|
|
|
|
if extract_archive(dest_path, dest_dir):
|
|
print('removing %s' % dest_path)
|
|
dest_path.unlink()
|
|
|
|
|
|
def fetch_urls(urls, dest):
|
|
"""Fetch URLs pairs to a pathlib.Path."""
|
|
dest.mkdir(parents=True, exist_ok=True)
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(CONCURRENCY) as e:
|
|
fs = []
|
|
|
|
for url in urls:
|
|
fs.append(e.submit(fetch_and_extract, url, dest))
|
|
|
|
for f in fs:
|
|
f.result()
|
|
|
|
|
|
def command_static_url(args):
|
|
gpg_sig_url = args.gpg_sig_url
|
|
gpg_env_key = args.gpg_key_env
|
|
|
|
if bool(gpg_sig_url) != bool(gpg_env_key):
|
|
print('--gpg-sig-url and --gpg-key-env must both be defined')
|
|
return 1
|
|
|
|
if gpg_sig_url:
|
|
gpg_signature = b''.join(stream_download(gpg_sig_url))
|
|
gpg_key = os.environb[gpg_env_key.encode('ascii')]
|
|
|
|
dest = pathlib.Path(args.dest)
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
download_to_path(args.url, dest, sha256=args.sha256, size=args.size)
|
|
|
|
if gpg_sig_url:
|
|
gpg_verify_path(dest, gpg_key, gpg_signature)
|
|
|
|
except Exception:
|
|
try:
|
|
dest.unlink()
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
raise
|
|
|
|
|
|
def command_task_artifacts(args):
|
|
urls = []
|
|
|
|
for word in args.artifacts:
|
|
# Format is name@task
|
|
artifact, task_id = word.split('@', 1)
|
|
|
|
urls.append(ARTIFACT_URL.format(task=task_id,
|
|
artifact=artifact))
|
|
|
|
fetch_urls(urls, pathlib.Path(args.dest))
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
subparsers = parser.add_subparsers(title='sub commands')
|
|
|
|
url = subparsers.add_parser('static-url', help='Download a static URL')
|
|
url.set_defaults(func=command_static_url)
|
|
url.add_argument('--sha256', required=True,
|
|
help='SHA-256 of downloaded content')
|
|
url.add_argument('--size', required=True, type=int,
|
|
help='Size of downloaded content, in bytes')
|
|
url.add_argument('--gpg-sig-url',
|
|
help='URL containing signed GPG document validating '
|
|
'URL to fetch')
|
|
url.add_argument('--gpg-key-env',
|
|
help='Environment variable containing GPG key to validate')
|
|
url.add_argument('url', help='URL to fetch')
|
|
url.add_argument('dest', help='Destination path')
|
|
|
|
artifacts = subparsers.add_parser('task-artifacts',
|
|
help='Fetch task artifacts')
|
|
artifacts.set_defaults(func=command_task_artifacts)
|
|
artifacts.add_argument('-d', '--dest', help='Destination directory')
|
|
artifacts.add_argument('artifacts', nargs='+',
|
|
help='Artifacts to fetch. Of form path@task_id')
|
|
|
|
args = parser.parse_args()
|
|
|
|
return args.func(args)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|