Leveraged pyupgrade to remove usage of six as well as modernize some code that no longer needs to be compatible with Python 2. Differential Revision: https://phabricator.services.mozilla.com/D230612
210 lines
6.6 KiB
Python
210 lines
6.6 KiB
Python
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
import time
|
|
from copy import deepcopy
|
|
from cProfile import Profile
|
|
from pathlib import Path
|
|
|
|
from .base import MachError
|
|
|
|
INVALID_COMMAND_CONTEXT = r"""
|
|
It looks like you tried to run a mach command from an invalid context. The %s
|
|
command failed to meet the following conditions: %s
|
|
|
|
Run |mach help| to show a list of all commands available to the current context.
|
|
""".lstrip()
|
|
|
|
|
|
class MachRegistrar:
|
|
"""Container for mach command and config providers."""
|
|
|
|
def __init__(self):
|
|
self.command_handlers = {}
|
|
self.commands_by_category = {}
|
|
self.settings_providers = set()
|
|
self.categories = {}
|
|
self.require_conditions = False
|
|
self.command_depth = 0
|
|
|
|
def register_command_handler(self, handler):
|
|
name = handler.name
|
|
|
|
if not handler.category:
|
|
raise MachError(
|
|
"Cannot register a mach command without a " "category: %s" % name
|
|
)
|
|
|
|
if handler.category not in self.categories:
|
|
raise MachError(
|
|
"Cannot register a command to an undefined "
|
|
"category: %s -> %s" % (name, handler.category)
|
|
)
|
|
|
|
self.command_handlers[name] = handler
|
|
self.commands_by_category[handler.category].add(name)
|
|
|
|
def register_settings_provider(self, cls):
|
|
self.settings_providers.add(cls)
|
|
|
|
def register_category(self, name, title, description, priority=50):
|
|
self.categories[name] = (title, description, priority)
|
|
self.commands_by_category[name] = set()
|
|
|
|
@classmethod
|
|
def _condition_failed_message(cls, name, conditions):
|
|
msg = ["\n"]
|
|
for c in conditions:
|
|
part = [" %s" % getattr(c, "__name__", c)]
|
|
if c.__doc__ is not None:
|
|
part.append(c.__doc__)
|
|
msg.append(" - ".join(part))
|
|
return INVALID_COMMAND_CONTEXT % (name, "\n".join(msg))
|
|
|
|
@classmethod
|
|
def _instance(_, handler, context, **kwargs):
|
|
if context is None:
|
|
raise ValueError("Expected a non-None context.")
|
|
|
|
prerun = getattr(context, "pre_dispatch_handler", None)
|
|
if prerun:
|
|
prerun(context, handler, args=kwargs)
|
|
|
|
context.handler = handler
|
|
return handler.create_instance(context, handler.virtualenv_name)
|
|
|
|
@classmethod
|
|
def _fail_conditions(_, handler, instance):
|
|
fail_conditions = []
|
|
if handler.conditions:
|
|
for c in handler.conditions:
|
|
if not c(instance):
|
|
fail_conditions.append(c)
|
|
|
|
return fail_conditions
|
|
|
|
def _run_command_handler(
|
|
self,
|
|
handler,
|
|
context,
|
|
command_site_manager=None,
|
|
debug_command=False,
|
|
profile_command=False,
|
|
**kwargs,
|
|
):
|
|
instance = MachRegistrar._instance(handler, context, **kwargs)
|
|
fail_conditions = MachRegistrar._fail_conditions(handler, instance)
|
|
if fail_conditions:
|
|
print(
|
|
MachRegistrar._condition_failed_message(handler.name, fail_conditions)
|
|
)
|
|
return 1
|
|
|
|
self.command_depth += 1
|
|
fn = handler.func
|
|
if handler.virtualenv_name:
|
|
if command_site_manager:
|
|
instance.virtualenv_manager = command_site_manager
|
|
else:
|
|
instance.activate_virtualenv()
|
|
|
|
profile = None
|
|
if profile_command:
|
|
profile = Profile()
|
|
profile.enable()
|
|
|
|
start_time = time.monotonic()
|
|
|
|
if debug_command:
|
|
import pdb
|
|
|
|
result = pdb.runcall(fn, instance, **kwargs)
|
|
else:
|
|
result = fn(instance, **kwargs)
|
|
|
|
end_time = time.monotonic()
|
|
|
|
if profile_command:
|
|
profile.disable()
|
|
profile_file = (
|
|
Path(context.topdir) / f"mach_profile_{handler.name}.cProfile"
|
|
)
|
|
profile.dump_stats(profile_file)
|
|
print(
|
|
f'Mach command profile created at "{profile_file}". To visualize, use '
|
|
f"snakeviz:"
|
|
)
|
|
print("python3 -m pip install snakeviz")
|
|
print(f"python3 -m snakeviz {profile_file.name}")
|
|
|
|
result = result or 0
|
|
assert isinstance(result, int)
|
|
|
|
if not debug_command:
|
|
postrun = getattr(context, "post_dispatch_handler", None)
|
|
if postrun:
|
|
postrun(
|
|
context,
|
|
handler,
|
|
instance,
|
|
not result,
|
|
start_time,
|
|
end_time,
|
|
self.command_depth,
|
|
args=kwargs,
|
|
)
|
|
self.command_depth -= 1
|
|
|
|
return result
|
|
|
|
def dispatch(self, name, context, argv=None, subcommand=None, **kwargs):
|
|
"""Dispatch/run a command.
|
|
|
|
Commands can use this to call other commands.
|
|
"""
|
|
from mach.command_util import load_command_module_from_command_name
|
|
|
|
handler = self.command_handlers.get(name)
|
|
|
|
if not handler:
|
|
load_command_module_from_command_name(name, context.topdir)
|
|
handler = self.command_handlers.get(name)
|
|
if not handler:
|
|
raise MachError(
|
|
f"Mach was not able to load the module for the '{name}' command."
|
|
)
|
|
|
|
if subcommand:
|
|
handler = handler.subcommand_handlers[subcommand]
|
|
|
|
if handler.parser:
|
|
parser = handler.parser
|
|
|
|
# save and restore existing defaults and actions so **kwargs don't
|
|
# persist across subsequent invocations of Registrar.dispatch()
|
|
old_defaults = deepcopy(parser._defaults)
|
|
old_actions = deepcopy(parser._actions)
|
|
|
|
try:
|
|
parser.set_defaults(**kwargs)
|
|
kwargs, unknown = parser.parse_known_args(argv or [])
|
|
kwargs = vars(kwargs)
|
|
finally:
|
|
parser._defaults = old_defaults
|
|
parser._actions = old_actions
|
|
|
|
if unknown:
|
|
if subcommand:
|
|
name = "{} {}".format(name, subcommand)
|
|
parser.error(
|
|
"unrecognized arguments for {}: {}".format(
|
|
name, ", ".join(["'{}'".format(arg) for arg in unknown])
|
|
)
|
|
)
|
|
|
|
return self._run_command_handler(handler, context, **kwargs)
|
|
|
|
|
|
Registrar = MachRegistrar()
|