# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import asyncio import contextlib import time from base64 import b64decode from io import BytesIO from urllib.parse import quote import pytest import webdriver from PIL import Image from webdriver.bidi.error import InvalidArgumentException, NoSuchFrameException from webdriver.bidi.modules.script import ContextTarget class Client: def __init__(self, request, session, event_loop): self.request = request self.session = session self.event_loop = event_loop self.subscriptions = {} self.content_blocker_loaded = False @property def current_url(self): return self.session.url @property def alert(self): return self.session.alert @property def context(self): return self.session.send_session_command("GET", "moz/context") @context.setter def context(self, context): self.session.send_session_command("POST", "moz/context", {"context": context}) @contextlib.contextmanager def using_context(self, context): orig_context = self.context needs_change = context != orig_context if needs_change: self.context = context try: yield finally: if needs_change: self.context = orig_context async def apz_click(self, element, no_up=False): pt = self.execute_script( """ const e = arguments[0]; const b = e.getClientRects()[0]; const leftChrome = window.mozInnerScreenX; const topChrome = window.mozInnerScreenY; const x = window.scrollX + leftChrome + b.x; const y = window.scrollY + topChrome + b.y; return [x, y]; """, element, ) with self.using_context("chrome"): return self.execute_async_script( """ const [pt, no_up, done] = arguments; const utils = window.windowUtils; const scale = window.devicePixelRatio; const res = utils.getResolution(); const ele = window.document.documentElement; window.windowUtils.sendNativeMouseEvent( pt[0] * scale * res, pt[1] * scale * res, utils.NATIVE_MOUSE_MESSAGE_BUTTON_DOWN, 0, 0, ele, () => { if (no_up) { return done(); } utils.sendNativeMouseEvent( pt[0] * scale * res, pt[1] * scale * res, utils.NATIVE_MOUSE_MESSAGE_BUTTON_UP, 0, 0, ele, done ); } ); """, pt, no_up, ) def wait_for_content_blocker(self): if not self.content_blocker_loaded: with self.using_context("chrome"): self.session.execute_async_script( """ const done = arguments[0], signal = "safebrowsing-update-finished"; function finish() { Services.obs.removeObserver(finish, signal); done(); } Services.obs.addObserver(finish, signal); """ ) self.content_blocker_loaded = True @property def keyboard(self): return self.session.actions.sequence("key", "keyboard_id") @property def mouse(self): return self.session.actions.sequence( "pointer", "pointer_id", {"pointerType": "mouse"} ) @property def pen(self): return self.session.actions.sequence( "pointer", "pointer_id", {"pointerType": "pen"} ) @property def touch(self): return self.session.actions.sequence( "pointer", "pointer_id", {"pointerType": "touch"} ) @property def wheel(self): return self.session.actions.sequence("wheel", "wheel_id") @property def modifier_key(self): if self.session.capabilities["platformName"] == "mac": return "\ue03d" # meta (command) else: return "\ue009" # control def inline(self, doc): return "data:text/html;charset=utf-8,{}".format(quote(doc)) async def top_context(self): contexts = await self.session.bidi_session.browsing_context.get_tree() return contexts[0] async def subscribe(self, events): if type(events) is not list: events = [events] must_sub = [] for event in events: if not event in self.subscriptions: must_sub.append(event) self.subscriptions[event] = 1 else: self.subscriptions[event] += 1 if len(must_sub): await self.session.bidi_session.session.subscribe(events=must_sub) async def unsubscribe(self, events): if type(events) is not list: events = [events] must_unsub = [] for event in events: self.subscriptions[event] -= 1 if not self.subscriptions[event]: must_unsub.append(event) if len(must_unsub): try: await self.session.bidi_session.session.unsubscribe(events=must_unsub) except (InvalidArgumentException, NoSuchFrameException): pass async def wait_for_events(self, events, checkFn=None, timeout=None): if type(events) is not list: events = [events] if timeout is None: timeout = 10 future = self.event_loop.create_future() remove_listeners = [] async def on_event(event, data): val = data if checkFn: val = await checkFn(event, data) if val is None: return for remover in remove_listeners: remover() await self.unsubscribe(events) future.set_result(val) for event in events: remove_listeners.append( self.session.bidi_session.add_event_listener(event, on_event) ) await self.subscribe(events) return await asyncio.wait_for(future, timeout=timeout) async def get_iframe_by_url(self, url): def check_children(children): for child in children: if "url" in child and url in child["url"]: return child for child in children: if "children" in child: frame = check_children(child["children"]) if frame: return frame return None tree = await self.session.bidi_session.browsing_context.get_tree() for top in tree: frame = check_children(top["children"]) if frame is not None: return frame return None async def is_iframe(self, context): def check_children(children): for child in children: if "context" in child and child["context"] == context: return True if "children" in child: return check_children(child["children"]) return False for top in await self.session.bidi_session.browsing_context.get_tree(): if check_children(top["children"]): return True return False async def wait_for_iframe_loaded(self, url, timeout=None): async def wait_for_url(_, data): if url in data["url"] and await self.is_iframe(data["context"]): return data["context"] return None return self.wait_for_events("browsingContext.load", wait_for_url) async def run_script_in_context(self, context, script): return await self.session.bidi_session.script.evaluate( expression=script, target=ContextTarget(context), await_promise=False, ) async def await_script_in_context(self, context, script): return await self.session.bidi_session.script.evaluate( expression=script, target=ContextTarget(context), await_promise=True, ) async def navigate(self, url, timeout=90, no_skip=False, **kwargs): try: return await asyncio.wait_for( asyncio.ensure_future(self._navigate(url, **kwargs)), timeout=timeout ) except asyncio.exceptions.TimeoutError as t: if no_skip: raise t return pytest.skip( "%s: Timed out navigating to site after %s seconds. Please try again later." % (self.request.fspath.basename, timeout) ) except webdriver.bidi.error.UnknownErrorException as e: if no_skip: raise e return s = str(e) if "Address rejected" in s: pytest.skip( "%s: Site not responding. Please try again later." % self.request.fspath.basename ) return elif "NS_ERROR_REDIRECT_LOOP" in s: pytest.skip( "%s: Site is stuck in a redirect loop. Please try again later." % self.request.fspath.basename ) return raise e async def _navigate(self, url, wait="complete", await_console_message=None): if self.session.test_config.get("use_pbm") or self.session.test_config.get( "use_strict_etp" ): print("waiting for content blocker...") self.wait_for_content_blocker() if await_console_message is not None: console_message = await self.promise_console_message_listener( await_console_message ) if wait == "load": page_load = await self.promise_readystate_listener("load", url=url) try: await self.session.bidi_session.browsing_context.navigate( context=(await self.top_context())["context"], url=url, wait=wait if wait != "load" else None, ) except webdriver.bidi.error.UnknownErrorException as u: m = str(u) if ( "NS_BINDING_ABORTED" not in m and "NS_ERROR_ABORT" not in m and "NS_ERROR_WONT_HANDLE_CONTENT" not in m ): raise u if wait == "load": await page_load if await_console_message is not None: await console_message async def promise_event_listener(self, events, check_fn=None, timeout=20): if type(events) is not list: events = [events] await self.session.bidi_session.session.subscribe(events=events) future = self.event_loop.create_future() listener_removers = [] def remove_listeners(): for listener_remover in listener_removers: try: listener_remover() except Exception: pass async def on_event(method, data): print("on_event", method, data) val = None if check_fn is not None: val = check_fn(method, data) if val is None: return future.set_result(val) for event in events: r = self.session.bidi_session.add_event_listener(event, on_event) listener_removers.append(r) async def task(): try: return await asyncio.wait_for(future, timeout=timeout) finally: remove_listeners() try: await asyncio.wait_for( self.session.bidi_session.session.unsubscribe(events=events), timeout=4, ) except asyncio.exceptions.TimeoutError: print("Unexpectedly timed out unsubscribing", events) pass return asyncio.create_task(task()) async def promise_console_message_listener(self, msg, **kwargs): def check(method, data): if "text" in data: if msg in data["text"]: return data if "args" in data and len(data["args"]): for arg in data["args"]: if "value" in arg and msg in arg["value"]: return data return await self.promise_event_listener("log.entryAdded", check, **kwargs) async def is_console_message(self, message): try: await (await self.promise_console_message_listener(message, timeout=2)) return True except asyncio.exceptions.TimeoutError: return False async def promise_readystate_listener(self, state, url=None, **kwargs): event = f"browsingContext.{state}" def check(method, data): if url is None or url in data["url"]: return data return await self.promise_event_listener(event, check, **kwargs) async def promise_frame_listener(self, url, state="domContentLoaded", **kwargs): event = f"browsingContext.{state}" def check(method, data): if url is None or url in data["url"]: return Client.Context(self, data["context"]) return await self.promise_event_listener(event, check, **kwargs) async def find_frame_context_by_url(self, url): def find_in(arr, url): for context in arr: if url in context["url"]: return context for context in arr: found = find_in(context["children"], url) if found: return found return find_in([await self.top_context()], url) class Context: def __init__(self, client, id): self.client = client self.target = ContextTarget(id) async def find_css(self, selector, all=False): all = "All" if all else "" return await self.client.session.bidi_session.script.evaluate( expression=f"document.querySelector{all}('{selector}')", target=self.target, await_promise=False, ) def timed_js(self, timeout, poll, fn, is_displayed=False): return f"""() => new Promise((_good, _bad) => {{ {self.is_displayed_js()} var _poll = {poll} * 1000; var _time = {timeout} * 1000; var _done = false; var resolve = val => {{ if ({is_displayed}) {{ if (val.length) {{ val = val.filter(v = is_displayed(v)); }} else {{ val = is_displayed(val) && val; }} if (!val.length && !val.matches) {{ return; }} }} _done = true; clearInterval(_int); _good(val); }}; var reject = str => {{ _done = true; clearInterval(_int); _bad(val); }}; var _int = setInterval(() => {{ {fn}; if (!_done) {{ _time -= _poll; if (_time <= 0) {{ reject(); }} }} }}, poll); }})""" def is_displayed_js(self): return """ function is_displayed(e) { const s = window.getComputedStyle(e), v = s.visibility === "visible", o = Math.abs(parseFloat(s.opacity)); return e.getClientRects().length > 0 && v && (isNaN(o) || o === 1.0); } """ async def await_css( self, selector, all=False, timeout=10, poll=0.25, condition=False, is_displayed=False, ): all = "All" if all else "" condition = ( f"var elem=arguments[0]; if ({condition})" if condition else False ) return await self.client.session.bidi_session.script.evaluate( expression=self.timed_js( timeout, poll, f""" var ele = document.querySelector{all}('{selector}')"; if (ele && (!"length" in ele || ele.length > 0)) {{ '{condition}' resolve(ele); }} """, ), target=self.target, await_promise=True, ) async def await_text(self, text, **kwargs): xpath = f"//*[text()[contains(.,'{text}')]]" return await self.await_xpath(self, xpath, **kwargs) async def await_xpath( self, xpath, all=False, timeout=10, poll=0.25, is_displayed=False ): all = "true" if all else "false" return await self.client.session.bidi_session.script.evaluate( expression=self.timed_js( timeout, poll, """ var ret = []; var r, res = document.evaluate(`{xpath}`, document, null, 4); while (r = res.iterateNext()) { ret.push(r); } resolve({all} ? ret : ret[0]); """, ), target=self.target, await_promise=True, ) def wrap_script_args(self, args): if args is None: return args out = [] for arg in args: if arg is None: out.append({"type": "undefined"}) continue t = type(arg) if t is int or t is float: out.append({"type": "number", "value": arg}) elif t is bool: out.append({"type": "boolean", "value": arg}) elif t is str: out.append({"type": "string", "value": arg}) else: if "type" in arg: out.push(arg) continue raise ValueError(f"Unhandled argument type: {t}") return out class PreloadScript: def __init__(self, client, script, target): self.client = client self.script = script if type(target) is list: self.target = target[0] else: self.target = target def stop(self): return self.client.session.bidi_session.script.remove_preload_script( script=self.script ) async def run(self, fn, *args, await_promise=False): val = await self.client.session.bidi_session.script.call_function( arguments=self.client.wrap_script_args(args), await_promise=await_promise, function_declaration=fn, target=self.target, ) if val and "value" in val: return val["value"] return val async def make_preload_script(self, text, sandbox=None, args=None, context=None): if not context: context = (await self.top_context())["context"] target = ContextTarget(context, sandbox) if args is None: text = f"() => {{ {text} }}" script = await self.session.bidi_session.script.add_preload_script( function_declaration=text, arguments=self.wrap_script_args(args), sandbox=sandbox, ) return Client.PreloadScript(self, script, target) async def disable_window_alert(self): return await self.make_preload_script("window.alert = () => {}") async def await_alert(self, text): if not hasattr(self, "alert_preload_script"): self.alert_preload_script = await self.make_preload_script( """ window.__alerts = []; window.wrappedJSObject.alert = function(text) { window.__alerts.push(text); } """, "alert_detector", ) return self.alert_preload_script.run( """(msg) => new Promise(done => { const to = setInterval(() => { if (window.__alerts.includes(msg)) { clearInterval(to); done(); } }, 200); }) """, text, await_promise=True, ) async def await_popup(self, url=None): if not hasattr(self, "popup_preload_script"): self.popup_preload_script = await self.make_preload_script( """ window.__popups = []; window.wrappedJSObject.open = function(url) { window.__popups.push(url); } """, "popup_detector", ) return self.popup_preload_script.run( """(url) => new Promise(done => { const to = setInterval(() => { if (url === undefined && window.__popups.length) { clearInterval(to); return done(window.__popups[0]); } const found = window.__popups.find(u => u.includes(url)); if (found !== undefined) { clearInterval(to); done(found); } }, 1000); }) """, url, await_promise=True, ) async def track_listener(self, type, selector): if not hasattr(self, "listener_preload_script"): self.listener_preload_script = await self.make_preload_script( """ window.__listeners = {}; var proto = EventTarget.wrappedJSObject.prototype; var def = Object.getOwnPropertyDescriptor(proto, "addEventListener"); var old = def.value; def.value = function(type, fn, opts) { if ("matches" in this) { if (!window.__listeners[type]) { window.__listeners[type] = new Set(); } window.__listeners[type].add(this); } return old.call(this, type, fn, opts) }; Object.defineProperty(proto, "addEventListener", def); """, "listener_detector", ) return Client.ListenerTracker(self.listener_preload_script, type, selector) @contextlib.asynccontextmanager async def preload_script(self, text, *args): script = await self.make_preload_script(text, "preload", args=args) yield script await script.stop() def back(self): self.session.back() def switch_to_frame(self, frame=None): if not frame: return self.session.transport.send( "POST", "session/{session_id}/frame/parent".format(**vars(self.session)) ) return self.session.transport.send( "POST", "session/{session_id}/frame".format(**vars(self.session)), {"id": frame}, encoder=webdriver.protocol.Encoder, decoder=webdriver.protocol.Decoder, session=self.session, ) def switch_frame(self, frame): self.session.switch_frame(frame) async def load_page_and_wait_for_iframe( self, url, finder, loads=1, timeout=None, **kwargs ): while loads > 0: await self.navigate(url, **kwargs) frame = self.await_element(finder, timeout=timeout) loads -= 1 self.switch_frame(frame) return frame def execute_script(self, script, *args): return self.session.execute_script(script, args=args) def execute_async_script(self, script, *args, **kwargs): return self.session.execute_async_script(script, args, **kwargs) def clear_all_cookies(self): self.session.transport.send( "DELETE", "session/%s/cookie" % self.session.session_id ) def send_element_command(self, element, method, uri, body=None): url = "element/%s/%s" % (element.id, uri) return self.session.send_session_command(method, url, body) def get_element_attribute(self, element, name): return self.send_element_command(element, "GET", "attribute/%s" % name) def _do_is_displayed_check(self, ele, is_displayed): if ele is None: return None if type(ele) in [list, tuple]: return [x for x in ele if self._do_is_displayed_check(x, is_displayed)] if is_displayed is False and ele and self.is_displayed(ele): return None if is_displayed is True and ele and not self.is_displayed(ele): return None return ele def find_css(self, *args, all=False, is_displayed=None, **kwargs): try: ele = self.session.find.css(*args, all=all, **kwargs) return self._do_is_displayed_check(ele, is_displayed) except webdriver.error.NoSuchElementException: return None def find_xpath(self, xpath, all=False, is_displayed=None): route = "elements" if all else "element" body = {"using": "xpath", "value": xpath} try: ele = self.session.send_session_command("POST", route, body) return self._do_is_displayed_check(ele, is_displayed) except webdriver.error.NoSuchElementException: return None def find_text(self, text, is_displayed=None, **kwargs): try: e = self.find_xpath(f"//*[text()[contains(.,'{text}')]]", **kwargs) return self._do_is_displayed_check(e, is_displayed) except webdriver.error.NoSuchElementException: return None def find_element(self, finder, is_displayed=None, **kwargs): ele = finder.find(self, **kwargs) return self._do_is_displayed_check(ele, is_displayed) def await_css(self, selector, **kwargs): return self.await_element(self.css(selector), **kwargs) def await_xpath(self, selector, **kwargs): return self.await_element(self.xpath(selector), **kwargs) def await_text(self, selector, *args, **kwargs): return self.await_element(self.text(selector), **kwargs) def await_element(self, finder, **kwargs): return self.await_first_element_of([finder], **kwargs)[0] class css: def __init__(self, selector): self.selector = selector def find(self, client, **kwargs): return client.find_css(self.selector, **kwargs) class xpath: def __init__(self, selector): self.selector = selector def find(self, client, **kwargs): return client.find_xpath(self.selector, **kwargs) class text: def __init__(self, selector): self.selector = selector def find(self, client, **kwargs): return client.find_text(self.selector, **kwargs) def await_first_element_of( self, finders, timeout=None, delay=0.25, condition=False, **kwargs ): t0 = time.time() condition = f"var elem=arguments[0]; return {condition}" if condition else False if timeout is None: timeout = 10 found = [None for finder in finders] exc = None while time.time() < t0 + timeout: for i, finder in enumerate(finders): try: result = finder.find(self, **kwargs) if result and ( not condition or self.session.execute_script(condition, [result]) ): found[i] = result return found except webdriver.error.NoSuchElementException as e: exc = e time.sleep(delay) raise exc if exc is not None else webdriver.error.NoSuchElementException return found async def dom_ready(self, timeout=None): if timeout is None: timeout = 20 async def wait(): return self.session.execute_async_script( """ const cb = arguments[0]; setInterval(() => { if (document.readyState === "complete") { cb(); } }, 500); """ ) task = asyncio.create_task(wait()) return await asyncio.wait_for(task, timeout) def is_float_cleared(self, elem1, elem2): return self.session.execute_script( """return (function(a, b) { // Ensure that a is placed under b (and not to its right) return a?.offsetTop >= b?.offsetTop + b?.offsetHeight && a?.offsetLeft < b?.offsetLeft + b?.offsetWidth; }(arguments[0], arguments[1]));""", elem1, elem2, ) def is_content_wider_than_screen(self): return self.execute_script("return window.innerWidth > window.outerWidth") @contextlib.contextmanager def assert_getUserMedia_called(self): self.execute_script( """ navigator.mediaDevices.getUserMedia = navigator.mozGetUserMedia = navigator.getUserMedia = () => { window.__gumCalled = true; }; """ ) yield assert self.execute_script("return window.__gumCalled === true;") def await_element_hidden(self, finder, timeout=None, delay=0.25): t0 = time.time() if timeout is None: timeout = 20 elem = finder.find(self) while time.time() < t0 + timeout: try: if not self.is_displayed(elem): return time.sleep(delay) except webdriver.error.StaleElementReferenceException: return def try_closing_popup(self, close_button_finder, timeout=None): try: self.soft_click( self.await_element( close_button_finder, is_displayed=True, timeout=timeout ) ) self.await_element_hidden(close_button_finder) return True except webdriver.error.NoSuchElementException: return False def try_closing_popups(self, popup_close_button_finders, timeout=None): left_to_try = list(popup_close_button_finders) closed_one = False num_intercepted = 0 while len(left_to_try): finder = left_to_try.pop(0) try: if self.try_closing_popup(finder, timeout=timeout): closed_one = True num_intercepted = 0 except webdriver.error.ElementClickInterceptedException as e: # If more than one popup is visible at the same time, we will # get this exception for all but the topmost one. So we re-try # removing the others again after the topmost one is dismissed, # until we've removed them all. num_intercepted += 1 if num_intercepted == len(left_to_try): raise e left_to_try.append(finder) return closed_one def click(self, element, force=False, popups=None, popups_timeout=None): tries = 0 while True: self.scroll_into_view(element) try: element.click() return except webdriver.error.ElementClickInterceptedException as c: if force: self.clear_covering_elements(element) elif not popups or not self.try_closing_popups( popups, timeout=popups_timeout ): raise c except webdriver.error.WebDriverException as e: if not "could not be scrolled into view" in str(e): raise e tries += 1 if tries == 5: raise e time.sleep(0.5) def soft_click(self, element, popups=None, popups_timeout=None): while True: try: self.execute_script("arguments[0].click()", element) return except webdriver.error.ElementClickInterceptedException as e: if not popups or not self.try_closing_popups( popups, timeout=popups_timeout ): raise e def remove_element(self, element): self.execute_script("arguments[0].remove()", element) def clear_covering_elements(self, element): self.execute_script( """ const getInViewCentrePoint = function(rect, win) { const { floor, max, min } = Math; let visible = { left: max(0, min(rect.x, rect.x + rect.width)), right: min(win.innerWidth, max(rect.x, rect.x + rect.width)), top: max(0, min(rect.y, rect.y + rect.height)), bottom: min(win.innerHeight, max(rect.y, rect.y + rect.height)), }; let x = (visible.left + visible.right) / 2.0; let y = (visible.top + visible.bottom) / 2.0; x = floor(x); y = floor(y); return { x, y }; }; const el = arguments[0]; if (el.isConnected) { const rect = el.getClientRects()[0]; if (rect) { const c = getInViewCentrePoint(rect, window); const efp = el.getRootNode().elementsFromPoint(c.x, c.y); for (const cover of efp) { if (cover == el) { break; } cover.style.visibility = "hidden"; } } } """, element, ) def scroll_into_view(self, element): self.execute_script( "arguments[0].scrollIntoView({block:'center', inline:'center', behavior: 'instant'})", element, ) @contextlib.asynccontextmanager async def ensure_fastclick_activates(self): fastclick_preload_script = await self.make_preload_script( """ var _ = document.createElement("webcompat_test"); _.style = "position:absolute;right:-1px;width:1px;height:1px"; document.documentElement.appendChild(_); """, "fastclick_forcer", ) yield fastclick_preload_script.stop() async def test_entrata_banner_hidden(self, url, iframe_css=None): # some sites take a while to load, but they always have the browser # warning popup, it just isn't shown until the page finishes loading. await self.navigate(url, wait="none") if iframe_css: frame = self.await_css(iframe_css) self.switch_frame(frame) self.await_css("#browser-warning-popup", timeout=120) try: self.await_css("#browser-warning-popup", is_displayed=True, timeout=2) return False except webdriver.error.NoSuchElementException: return True def test_for_fastclick(self, element): # FastClick cancels touchend, breaking default actions on Fenix. # It instead fires a mousedown or click, which we can detect. self.execute_script( """ const sel = arguments[0]; window.fastclicked = false; const evt = sel.nodeName === "SELECT" ? "mousedown" : "click"; document.addEventListener(evt, e => { if (e.target === sel && !e.isTrusted) { window.fastclicked = true; } }, true); sel.style.position = "absolute"; sel.style.zIndex = 2147483647; """, element, ) self.scroll_into_view(element) self.clear_covering_elements(element) # tap a few times in case the site's other code interferes self.touch.click(element=element).perform() self.touch.click(element=element).perform() self.touch.click(element=element).perform() return self.execute_script("return window.fastclicked") def is_displayed(self, element): if element is None: return False return self.session.execute_script( """ const e = arguments[0], s = window.getComputedStyle(e), v = s.visibility === "visible", o = Math.abs(parseFloat(s.opacity)); return e.getClientRects().length > 0 && v && (isNaN(o) || o === 1.0); """, args=[element], ) def is_one_solid_color(self, element, max_fuzz=8): # max_fuzz is needed as screenshots can have slight color bleeding/fringing shotb64 = element.screenshot() shot = Image.open(BytesIO(b64decode(shotb64))).convert("RGB") for min, max in shot.getextrema(): if max - min > max_fuzz: return False return True