# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import json import os import sys from argparse import ArgumentParser try: import urlparse except ImportError: import urllib.parse as urlparse import mozpack.path as mozpath from mozpack.chrome.manifest import parse_manifest from six import viewitems from .manifest_handler import ChromeManifestHandler class LcovRecord(object): __slots__ = ( "test_name", "source_file", "functions", "function_exec_counts", "function_count", "covered_function_count", "branches", "branch_count", "covered_branch_count", "lines", "line_count", "covered_line_count", ) def __init__(self): self.functions = {} self.function_exec_counts = {} self.branches = {} self.lines = {} def __iadd__(self, other): # These shouldn't differ. self.source_file = other.source_file if hasattr(other, "test_name"): self.test_name = other.test_name self.functions.update(other.functions) for name, count in viewitems(other.function_exec_counts): self.function_exec_counts[name] = count + self.function_exec_counts.get( name, 0 ) for key, taken in viewitems(other.branches): self.branches[key] = taken + self.branches.get(key, 0) for line, (exec_count, checksum) in viewitems(other.lines): new_exec_count = exec_count if line in self.lines: old_exec_count, _ = self.lines[line] new_exec_count += old_exec_count self.lines[line] = new_exec_count, checksum self.resummarize() return self def resummarize(self): # Re-calculate summaries after generating or splitting a record. self.function_count = len(self.functions.keys()) # Function records may have moved between files, so filter here. self.function_exec_counts = { fn_name: count for fn_name, count in viewitems(self.function_exec_counts) if fn_name in self.functions.values() } self.covered_function_count = len( [c for c in self.function_exec_counts.values() if c] ) self.line_count = len(self.lines) self.covered_line_count = len([c for c, _ in self.lines.values() if c]) self.branch_count = len(self.branches) self.covered_branch_count = len([c for c in self.branches.values() if c]) class RecordRewriter(object): # Helper class for rewriting/spliting individual lcov records according # to what the preprocessor did. def __init__(self): self._ranges = None def _get_range(self, line): for start, end in self._ranges: if line < start: return None if line < end: return start, end return None def _get_mapped_line(self, line, r): inc_source, inc_start = self._current_pp_info[r] start, end = r offs = line - start return inc_start + offs def _get_record(self, inc_source): if inc_source in self._additions: gen_rec = self._additions[inc_source] else: gen_rec = LcovRecord() gen_rec.source_file = inc_source self._additions[inc_source] = gen_rec return gen_rec def _rewrite_lines(self, record): rewritten_lines = {} for ln, line_info in viewitems(record.lines): r = self._get_range(ln) if r is None: rewritten_lines[ln] = line_info continue new_ln = self._get_mapped_line(ln, r) inc_source, _ = self._current_pp_info[r] if inc_source != record.source_file: gen_rec = self._get_record(inc_source) gen_rec.lines[new_ln] = line_info continue # Move exec_count to the new lineno. rewritten_lines[new_ln] = line_info record.lines = rewritten_lines def _rewrite_functions(self, record): rewritten_fns = {} # Sometimes we get multiple entries for a named function ("top-level", for # instance). It's not clear the records that result are well-formed, but # we act as though if a function has multiple FN's, the corresponding # FNDA's are all the same. for ln, fn_name in viewitems(record.functions): r = self._get_range(ln) if r is None: rewritten_fns[ln] = fn_name continue new_ln = self._get_mapped_line(ln, r) inc_source, _ = self._current_pp_info[r] if inc_source != record.source_file: gen_rec = self._get_record(inc_source) gen_rec.functions[new_ln] = fn_name if fn_name in record.function_exec_counts: gen_rec.function_exec_counts[fn_name] = record.function_exec_counts[ fn_name ] continue rewritten_fns[new_ln] = fn_name record.functions = rewritten_fns def _rewrite_branches(self, record): rewritten_branches = {} for (ln, block_number, branch_number), taken in viewitems(record.branches): r = self._get_range(ln) if r is None: rewritten_branches[ln, block_number, branch_number] = taken continue new_ln = self._get_mapped_line(ln, r) inc_source, _ = self._current_pp_info[r] if inc_source != record.source_file: gen_rec = self._get_record(inc_source) gen_rec.branches[(new_ln, block_number, branch_number)] = taken continue rewritten_branches[(new_ln, block_number, branch_number)] = taken record.branches = rewritten_branches def rewrite_record(self, record, pp_info): # Rewrite the lines in the given record according to preprocessor info # and split to additional records when pp_info has included file info. self._current_pp_info = dict( [(tuple([int(l) for l in k.split(",")]), v) for k, v in pp_info.items()] ) self._ranges = sorted(self._current_pp_info.keys()) self._additions = {} self._rewrite_lines(record) self._rewrite_functions(record) self._rewrite_branches(record) record.resummarize() generated_records = self._additions.values() for r in generated_records: r.resummarize() return generated_records class LcovFile(object): # Simple parser/pretty-printer for lcov format. # lcov parsing based on http://ltp.sourceforge.net/coverage/lcov/geninfo.1.php # TN: # SF: # FN:, # FNDA:, # FNF: # FNH: # BRDA:,,, # BRF: # BRH: # DA:,[,] # LF: # LH: # end_of_record PREFIX_TYPES = { "TN": 0, "SF": 0, "FN": 1, "FNDA": 1, "FNF": 0, "FNH": 0, "BRDA": 3, "BRF": 0, "BRH": 0, "DA": 2, "LH": 0, "LF": 0, } def __init__(self, lcov_paths): self.lcov_paths = lcov_paths def iterate_records(self, rewrite_source=None): current_source_file = None current_pp_info = None current_lines = [] for lcov_path in self.lcov_paths: with open(lcov_path, "r", encoding="utf-8") as lcov_fh: for line in lcov_fh: line = line.rstrip() if not line: continue if line == "end_of_record": # We skip records that we couldn't rewrite, that is records for which # rewrite_url returns None. if current_source_file is not None: yield (current_source_file, current_pp_info, current_lines) current_source_file = None current_pp_info = None current_lines = [] continue colon = line.find(":") prefix = line[:colon] if prefix == "SF": sf = line[(colon + 1) :] res = ( rewrite_source(sf) if rewrite_source is not None else (sf, None) ) if res is None: current_lines.append(line) else: current_source_file, current_pp_info = res current_lines.append("SF:" + current_source_file) else: current_lines.append(line) def parse_record(self, record_content): self.current_record = LcovRecord() for line in record_content: colon = line.find(":") prefix = line[:colon] # We occasionally end up with multi-line scripts in data: # uris that will trip up the parser, just skip them for now. if colon < 0 or prefix not in self.PREFIX_TYPES: continue args = line[(colon + 1) :].split(",", self.PREFIX_TYPES[prefix]) def try_convert(a): try: return int(a) except ValueError: return a args = [try_convert(a) for a in args] try: LcovFile.__dict__["parse_" + prefix](self, *args) except ValueError: print("Encountered an error in %s:\n%s" % (self.lcov_fh.name, line)) raise except KeyError: print("Invalid lcov line start in %s:\n%s" % (self.lcov_fh.name, line)) raise except TypeError: print("Invalid lcov line start in %s:\n%s" % (self.lcov_fh.name, line)) raise ret = self.current_record self.current_record = LcovRecord() return ret def print_file(self, fh, rewrite_source, rewrite_record): for source_file, pp_info, record_content in self.iterate_records( rewrite_source ): if pp_info is not None: record = self.parse_record(record_content) for r in rewrite_record(record, pp_info): fh.write(self.format_record(r)) fh.write(self.format_record(record)) else: fh.write("\n".join(record_content) + "\nend_of_record\n") def format_record(self, record): out_lines = [] for name in LcovRecord.__slots__: if hasattr(record, name): out_lines.append(LcovFile.__dict__["format_" + name](self, record)) return "\n".join(out_lines) + "\nend_of_record\n" def format_test_name(self, record): return "TN:%s" % record.test_name def format_source_file(self, record): return "SF:%s" % record.source_file def format_functions(self, record): # Sorting results gives deterministic output (and is a lot faster than # using OrderedDict). fns = [] for start_lineno, fn_name in sorted(viewitems(record.functions)): fns.append("FN:%s,%s" % (start_lineno, fn_name)) return "\n".join(fns) def format_function_exec_counts(self, record): fndas = [] for name, exec_count in sorted(viewitems(record.function_exec_counts)): fndas.append("FNDA:%s,%s" % (exec_count, name)) return "\n".join(fndas) def format_function_count(self, record): return "FNF:%s" % record.function_count def format_covered_function_count(self, record): return "FNH:%s" % record.covered_function_count def format_branches(self, record): brdas = [] for key in sorted(record.branches): taken = record.branches[key] taken = "-" if taken == 0 else taken brdas.append("BRDA:%s" % ",".join(map(str, list(key) + [taken]))) return "\n".join(brdas) def format_branch_count(self, record): return "BRF:%s" % record.branch_count def format_covered_branch_count(self, record): return "BRH:%s" % record.covered_branch_count def format_lines(self, record): das = [] for line_no, (exec_count, checksum) in sorted(viewitems(record.lines)): s = "DA:%s,%s" % (line_no, exec_count) if checksum: s += ",%s" % checksum das.append(s) return "\n".join(das) def format_line_count(self, record): return "LF:%s" % record.line_count def format_covered_line_count(self, record): return "LH:%s" % record.covered_line_count def parse_TN(self, test_name): self.current_record.test_name = test_name def parse_SF(self, source_file): self.current_record.source_file = source_file def parse_FN(self, start_lineno, fn_name): self.current_record.functions[start_lineno] = fn_name def parse_FNDA(self, exec_count, fn_name): self.current_record.function_exec_counts[fn_name] = exec_count def parse_FNF(self, function_count): self.current_record.function_count = function_count def parse_FNH(self, covered_function_count): self.current_record.covered_function_count = covered_function_count def parse_BRDA(self, line_number, block_number, branch_number, taken): taken = 0 if taken == "-" else taken self.current_record.branches[(line_number, block_number, branch_number)] = taken def parse_BRF(self, branch_count): self.current_record.branch_count = branch_count def parse_BRH(self, covered_branch_count): self.current_record.covered_branch_count = covered_branch_count def parse_DA(self, line_number, execution_count, checksum=None): self.current_record.lines[line_number] = (execution_count, checksum) def parse_LH(self, covered_line_count): self.current_record.covered_line_count = covered_line_count def parse_LF(self, line_count): self.current_record.line_count = line_count class UrlFinderError(Exception): pass class UrlFinder(object): # Given a "chrome://" or "resource://" url, uses data from the UrlMapBackend # and install manifests to find a path to the source file and the corresponding # (potentially pre-processed) file in the objdir. def __init__(self, chrome_map_path, appdir, gredir, extra_chrome_manifests): # Cached entries self._final_mapping = {} try: with open(chrome_map_path, "r", encoding="utf-8") as fh: url_prefixes, overrides, install_info, buildconfig = json.load(fh) except OSError: print( "Error reading %s. Run |./mach build-backend -b ChromeMap| to " "populate the ChromeMap backend." % chrome_map_path ) raise self.topobjdir = buildconfig["topobjdir"] self.MOZ_APP_NAME = buildconfig["MOZ_APP_NAME"] self.OMNIJAR_NAME = buildconfig["OMNIJAR_NAME"] # These are added dynamically in nsIResProtocolHandler, we might # need to get them at run time. if "resource:///" not in url_prefixes: url_prefixes["resource:///"] = [appdir] if "resource://gre/" not in url_prefixes: url_prefixes["resource://gre/"] = [gredir] self._url_prefixes = url_prefixes self._url_overrides = overrides self._respath = None mac_bundle_name = buildconfig["MOZ_MACBUNDLE_NAME"] if mac_bundle_name: self._respath = mozpath.join( "dist", mac_bundle_name, "Contents", "Resources" ) if not extra_chrome_manifests: extra_path = os.path.join(self.topobjdir, "_tests", "extra.manifest") if os.path.isfile(extra_path): extra_chrome_manifests = [extra_path] if extra_chrome_manifests: self._populate_chrome(extra_chrome_manifests) self._install_mapping = install_info def _populate_chrome(self, manifests): handler = ChromeManifestHandler() for m in manifests: path = os.path.abspath(m) for e in parse_manifest(None, path): handler.handle_manifest_entry(e) self._url_overrides.update(handler.overrides) self._url_prefixes.update(handler.chrome_mapping) def _find_install_prefix(self, objdir_path): def _prefix(s): for p in mozpath.split(s): if "*" not in p: yield p + "/" offset = 0 for leaf in reversed(mozpath.split(objdir_path)): offset += len(leaf) if objdir_path[:-offset] in self._install_mapping: pattern_prefix, is_pp = self._install_mapping[objdir_path[:-offset]] full_leaf = objdir_path[len(objdir_path) - offset :] src_prefix = "".join(_prefix(pattern_prefix)) self._install_mapping[objdir_path] = ( mozpath.join(src_prefix, full_leaf), is_pp, ) break offset += 1 def _install_info(self, objdir_path): if objdir_path not in self._install_mapping: # If our path is missing, some prefix of it may be in the install # mapping mapped to a wildcard. self._find_install_prefix(objdir_path) if objdir_path not in self._install_mapping: raise UrlFinderError("Couldn't find entry in manifest for %s" % objdir_path) return self._install_mapping[objdir_path] def _abs_objdir_install_info(self, term): obj_relpath = term[len(self.topobjdir) + 1 :] res = self._install_info(obj_relpath) # Some urls on osx will refer to paths in the mac bundle, so we # re-interpret them as being their original location in dist/bin. if not res and self._respath and obj_relpath.startswith(self._respath): obj_relpath = obj_relpath.replace(self._respath, "dist/bin") res = self._install_info(obj_relpath) if not res: raise UrlFinderError("Couldn't find entry in manifest for %s" % obj_relpath) return res def find_files(self, url): # Returns a tuple of (source file, pp_info) # for the given "resource:", "chrome:", or "file:" uri. term = url if term in self._url_overrides: term = self._url_overrides[term] if os.path.isabs(term) and term.startswith(self.topobjdir): source_path, pp_info = self._abs_objdir_install_info(term) return source_path, pp_info for prefix, dests in viewitems(self._url_prefixes): if term.startswith(prefix): for dest in dests: if not dest.endswith("/"): dest += "/" objdir_path = term.replace(prefix, dest) while objdir_path.startswith("//"): # The mochitest harness produces some wonky file:// uris # that need to be fixed. objdir_path = objdir_path[1:] try: if os.path.isabs(objdir_path) and objdir_path.startswith( self.topobjdir ): return self._abs_objdir_install_info(objdir_path) else: src_path, pp_info = self._install_info(objdir_path) return mozpath.normpath(src_path), pp_info except UrlFinderError: pass if dest.startswith("resource://") or dest.startswith("chrome://"): result = self.find_files(term.replace(prefix, dest)) if result: return result raise UrlFinderError("No objdir path for %s" % term) def rewrite_url(self, url): # This applies one-off rules and returns None for urls that we aren't # going to be able to resolve to a source file ("about:" urls, for # instance). if url in self._final_mapping: return self._final_mapping[url] if url.endswith("> eval"): return None if url.endswith("> Function"): return None if " -> " in url: url = url.split(" -> ")[1].rstrip() if "?" in url: url = url.split("?")[0] url_obj = urlparse.urlparse(url) if url_obj.scheme == "jar": app_name = self.MOZ_APP_NAME omnijar_name = self.OMNIJAR_NAME if app_name in url: if omnijar_name in url: # e.g. file:///home/worker/workspace/build/application/firefox/omni.ja!/components/MainProcessSingleton.js # noqa parts = url_obj.path.split(omnijar_name + "!", 1) elif ".xpi!" in url: # e.g. file:///home/worker/workspace/build/application/firefox/browser/features/e10srollout@mozilla.org.xpi!/bootstrap.js # noqa parts = url_obj.path.split(".xpi!", 1) else: # We don't know how to handle this jar: path, so return it to the # caller to make it print a warning. return url_obj.path, None dir_parts = parts[0].rsplit(app_name + "/", 1) url = mozpath.normpath( mozpath.join( self.topobjdir, "dist", "bin", dir_parts[1].lstrip("/"), parts[1].lstrip("/"), ) ) elif ".xpi!" in url: # This matching mechanism is quite brittle and based on examples seen in the wild. # There's no rule to match the XPI name to the path in dist/xpi-stage. parts = url_obj.path.split(".xpi!", 1) addon_name = os.path.basename(parts[0]) if "-test@mozilla.org" in addon_name: addon_name = addon_name[: -len("-test@mozilla.org")] elif addon_name.endswith("@mozilla.org"): addon_name = addon_name[: -len("@mozilla.org")] url = mozpath.normpath( mozpath.join( self.topobjdir, "dist", "xpi-stage", addon_name, parts[1].lstrip("/"), ) ) elif url_obj.scheme == "file" and os.path.isabs(url_obj.path): path = url_obj.path if not os.path.isfile(path): # This may have been in a profile directory that no # longer exists. return None if not path.startswith(self.topobjdir): return path, None url = url_obj.path elif url_obj.scheme in ("http", "https", "javascript", "data", "about"): return None result = self.find_files(url) self._final_mapping[url] = result return result class LcovFileRewriter(object): # Class for partial parses of LCOV format and rewriting to resolve urls # and preprocessed file lines. def __init__( self, chrome_map_path, appdir="dist/bin/browser/", gredir="dist/bin/", extra_chrome_manifests=[], ): self.url_finder = UrlFinder( chrome_map_path, appdir, gredir, extra_chrome_manifests ) self.pp_rewriter = RecordRewriter() def rewrite_files(self, in_paths, output_file, output_suffix): unknowns = set() found_valid = [False] def rewrite_source(url): try: res = self.url_finder.rewrite_url(url) if res is None: return None except Exception as e: if url not in unknowns: # The exception can contain random filename used by # test cases, and there can be character that cannot be # encoded with the stdout encoding. sys.stdout.buffer.write( ( "Error: %s.\nCouldn't find source info for %s, removing record\n" % (e, url) ).encode(sys.stdout.encoding, errors="replace") ) unknowns.add(url) return None source_file, pp_info = res # We can't assert that the file exists here, because we don't have the source # checkout available on test machines. We can bring back this assertion when # bug 1432287 is fixed. # assert os.path.isfile(source_file), "Couldn't find mapped source file %s at %s!" % ( # url, source_file) found_valid[0] = True return res in_paths = [os.path.abspath(in_path) for in_path in in_paths] if output_file: lcov_file = LcovFile(in_paths) with open(output_file, "w+", encoding="utf-8") as out_fh: lcov_file.print_file( out_fh, rewrite_source, self.pp_rewriter.rewrite_record ) else: for in_path in in_paths: lcov_file = LcovFile([in_path]) with open(in_path + output_suffix, "w+", encoding="utf-8") as out_fh: lcov_file.print_file( out_fh, rewrite_source, self.pp_rewriter.rewrite_record ) if not found_valid[0]: print("WARNING: No valid records found in %s" % in_paths) return def main(): parser = ArgumentParser( description="Given a set of gcov .info files produced " "by spidermonkey's code coverage, re-maps file urls " "back to source files and lines in preprocessed files " "back to their original locations." ) parser.add_argument( "--chrome-map-path", default="chrome-map.json", help="Path to the chrome-map.json file.", ) parser.add_argument( "--app-dir", default="dist/bin/browser/", help="Prefix of the appdir in use. This is used to map " "urls starting with resource:///. It may differ by " "app, but defaults to the valid value for firefox.", ) parser.add_argument( "--gre-dir", default="dist/bin/", help="Prefix of the gre dir in use. This is used to map " "urls starting with resource://gre. It may differ by " "app, but defaults to the valid value for firefox.", ) parser.add_argument( "--output-suffix", default=".out", help="The suffix to append to output files." ) parser.add_argument( "--extra-chrome-manifests", nargs="+", help="Paths to files containing extra chrome registration.", ) parser.add_argument( "--output-file", default="", help="The output file where the results are merged. Leave empty to make the rewriter not " "merge files.", ) parser.add_argument("files", nargs="+", help="The set of files to process.") args = parser.parse_args() rewriter = LcovFileRewriter( args.chrome_map_path, args.app_dir, args.gre_dir, args.extra_chrome_manifests ) files = [] for f in args.files: if os.path.isdir(f): files += [os.path.join(f, e) for e in os.listdir(f)] else: files.append(f) rewriter.rewrite_files(files, args.output_file, args.output_suffix) if __name__ == "__main__": main()