<html><head><meta name="color-scheme" content="light dark"></head><body><pre style="word-wrap: break-word; white-space: pre-wrap;"># Copyright (c) Microsoft Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import json
from pathlib import Path
from types import SimpleNamespace
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    List,
    Optional,
    Pattern,
    Set,
    Union,
    cast,
)

from playwright._impl._api_structures import (
    Cookie,
    Geolocation,
    SetCookieParam,
    StorageState,
)
from playwright._impl._api_types import Error
from playwright._impl._artifact import Artifact
from playwright._impl._cdp_session import CDPSession
from playwright._impl._connection import (
    ChannelOwner,
    from_channel,
    from_nullable_channel,
)
from playwright._impl._event_context_manager import EventContextManagerImpl
from playwright._impl._fetch import APIRequestContext
from playwright._impl._frame import Frame
from playwright._impl._har_router import HarRouter
from playwright._impl._helper import (
    HarRecordingMetadata,
    RouteFromHarNotFoundPolicy,
    RouteHandler,
    RouteHandlerCallback,
    TimeoutSettings,
    URLMatch,
    URLMatcher,
    async_readfile,
    async_writefile,
    is_safe_close_error,
    locals_to_params,
    prepare_record_har_options,
    to_impl,
)
from playwright._impl._network import Request, Response, Route, serialize_headers
from playwright._impl._page import BindingCall, Page, Worker
from playwright._impl._tracing import Tracing
from playwright._impl._wait_helper import WaitHelper

if TYPE_CHECKING:  # pragma: no cover
    from playwright._impl._browser import Browser
    from playwright._impl._browser_type import BrowserType


class BrowserContext(ChannelOwner):

    Events = SimpleNamespace(
        BackgroundPage="backgroundpage",
        Close="close",
        Page="page",
        ServiceWorker="serviceworker",
        Request="request",
        Response="response",
        RequestFailed="requestfailed",
        RequestFinished="requestfinished",
    )

    def __init__(
        self, parent: ChannelOwner, type: str, guid: str, initializer: Dict
    ) -&gt; None:
        super().__init__(parent, type, guid, initializer)
        self._pages: List[Page] = []
        self._routes: List[RouteHandler] = []
        self._bindings: Dict[str, Any] = {}
        self._timeout_settings = TimeoutSettings(None)
        self._browser: Optional["Browser"] = None
        self._owner_page: Optional[Page] = None
        self._options: Dict[str, Any] = {}
        self._background_pages: Set[Page] = set()
        self._service_workers: Set[Worker] = set()
        self._tracing = cast(Tracing, from_channel(initializer["tracing"]))
        self._har_recorders: Dict[str, HarRecordingMetadata] = {}
        self._request: APIRequestContext = from_channel(initializer["requestContext"])
        self._channel.on(
            "bindingCall",
            lambda params: self._on_binding(from_channel(params["binding"])),
        )
        self._channel.on("close", lambda _: self._on_close())
        self._channel.on(
            "page", lambda params: self._on_page(from_channel(params["page"]))
        )
        self._channel.on(
            "route",
            lambda params: asyncio.create_task(
                self._on_route(
                    from_channel(params.get("route")),
                )
            ),
        )

        self._channel.on(
            "backgroundPage",
            lambda params: self._on_background_page(from_channel(params["page"])),
        )

        self._channel.on(
            "serviceWorker",
            lambda params: self._on_service_worker(from_channel(params["worker"])),
        )
        self._channel.on(
            "request",
            lambda params: self._on_request(
                from_channel(params["request"]),
                from_nullable_channel(params.get("page")),
            ),
        )
        self._channel.on(
            "response",
            lambda params: self._on_response(
                from_channel(params["response"]),
                from_nullable_channel(params.get("page")),
            ),
        )
        self._channel.on(
            "requestFailed",
            lambda params: self._on_request_failed(
                from_channel(params["request"]),
                params["responseEndTiming"],
                params.get("failureText"),
                from_nullable_channel(params.get("page")),
            ),
        )
        self._channel.on(
            "requestFinished",
            lambda params: self._on_request_finished(
                from_channel(params["request"]),
                from_nullable_channel(params.get("response")),
                params["responseEndTiming"],
                from_nullable_channel(params.get("page")),
            ),
        )
        self._closed_future: asyncio.Future = asyncio.Future()
        self.once(
            self.Events.Close, lambda context: self._closed_future.set_result(True)
        )

    def __repr__(self) -&gt; str:
        return f"&lt;BrowserContext browser={self.browser}&gt;"

    def _on_page(self, page: Page) -&gt; None:
        self._pages.append(page)
        self.emit(BrowserContext.Events.Page, page)
        if page._opener and not page._opener.is_closed():
            page._opener.emit(Page.Events.Popup, page)

    async def _on_route(self, route: Route) -&gt; None:
        route_handlers = self._routes.copy()
        for route_handler in route_handlers:
            if not route_handler.matches(route.request.url):
                continue
            if route_handler.will_expire:
                self._routes.remove(route_handler)
            try:
                handled = await route_handler.handle(route)
            finally:
                if len(self._routes) == 0:
                    asyncio.create_task(self._disable_interception())
            if handled:
                return
        await route._internal_continue(is_internal=True)

    def _on_binding(self, binding_call: BindingCall) -&gt; None:
        func = self._bindings.get(binding_call._initializer["name"])
        if func is None:
            return
        asyncio.create_task(binding_call.call(func))

    def set_default_navigation_timeout(self, timeout: float) -&gt; None:
        self._timeout_settings.set_navigation_timeout(timeout)
        self._channel.send_no_reply(
            "setDefaultNavigationTimeoutNoReply", dict(timeout=timeout)
        )

    def set_default_timeout(self, timeout: float) -&gt; None:
        self._timeout_settings.set_timeout(timeout)
        self._channel.send_no_reply("setDefaultTimeoutNoReply", dict(timeout=timeout))

    @property
    def pages(self) -&gt; List[Page]:
        return self._pages.copy()

    @property
    def browser(self) -&gt; Optional["Browser"]:
        return self._browser

    def _set_browser_type(self, browser_type: "BrowserType") -&gt; None:
        self._browser_type = browser_type
        if self._options.get("recordHar"):
            self._har_recorders[""] = {
                "path": self._options["recordHar"]["path"],
                "content": self._options["recordHar"].get("content"),
            }

    async def new_page(self) -&gt; Page:
        if self._owner_page:
            raise Error("Please use browser.new_context()")
        return from_channel(await self._channel.send("newPage"))

    async def cookies(self, urls: Union[str, List[str]] = None) -&gt; List[Cookie]:
        if urls is None:
            urls = []
        if not isinstance(urls, list):
            urls = [urls]
        return await self._channel.send("cookies", dict(urls=urls))

    async def add_cookies(self, cookies: List[SetCookieParam]) -&gt; None:
        await self._channel.send("addCookies", dict(cookies=cookies))

    async def clear_cookies(self) -&gt; None:
        await self._channel.send("clearCookies")

    async def grant_permissions(
        self, permissions: List[str], origin: str = None
    ) -&gt; None:
        await self._channel.send("grantPermissions", locals_to_params(locals()))

    async def clear_permissions(self) -&gt; None:
        await self._channel.send("clearPermissions")

    async def set_geolocation(self, geolocation: Geolocation = None) -&gt; None:
        await self._channel.send("setGeolocation", locals_to_params(locals()))

    async def set_extra_http_headers(self, headers: Dict[str, str]) -&gt; None:
        await self._channel.send(
            "setExtraHTTPHeaders", dict(headers=serialize_headers(headers))
        )

    async def set_offline(self, offline: bool) -&gt; None:
        await self._channel.send("setOffline", dict(offline=offline))

    async def add_init_script(
        self, script: str = None, path: Union[str, Path] = None
    ) -&gt; None:
        if path:
            script = (await async_readfile(path)).decode()
        if not isinstance(script, str):
            raise Error("Either path or script parameter must be specified")
        await self._channel.send("addInitScript", dict(source=script))

    async def expose_binding(
        self, name: str, callback: Callable, handle: bool = None
    ) -&gt; None:
        for page in self._pages:
            if name in page._bindings:
                raise Error(
                    f'Function "{name}" has been already registered in one of the pages'
                )
        if name in self._bindings:
            raise Error(f'Function "{name}" has been already registered')
        self._bindings[name] = callback
        await self._channel.send(
            "exposeBinding", dict(name=name, needsHandle=handle or False)
        )

    async def expose_function(self, name: str, callback: Callable) -&gt; None:
        await self.expose_binding(name, lambda source, *args: callback(*args))

    async def route(
        self, url: URLMatch, handler: RouteHandlerCallback, times: int = None
    ) -&gt; None:
        self._routes.insert(
            0,
            RouteHandler(
                URLMatcher(self._options.get("baseURL"), url),
                handler,
                True if self._dispatcher_fiber else False,
                times,
            ),
        )
        if len(self._routes) == 1:
            await self._channel.send(
                "setNetworkInterceptionEnabled", dict(enabled=True)
            )

    async def unroute(
        self, url: URLMatch, handler: Optional[RouteHandlerCallback] = None
    ) -&gt; None:
        self._routes = list(
            filter(
                lambda r: r.matcher.match != url or (handler and r.handler != handler),
                self._routes,
            )
        )
        if len(self._routes) == 0:
            await self._disable_interception()

    async def _record_into_har(
        self,
        har: Union[Path, str],
        page: Optional[Page] = None,
        url: Union[Pattern[str], str] = None,
    ) -&gt; None:
        params = {
            "options": prepare_record_har_options(
                {
                    "recordHarPath": har,
                    "recordHarContent": "attach",
                    "recordHarMode": "minimal",
                    "recordHarUrlFilter": url,
                }
            )
        }
        if page:
            params["page"] = page._channel
        har_id = await self._channel.send("harStart", params)
        self._har_recorders[har_id] = {"path": str(har), "content": "attach"}

    async def route_from_har(
        self,
        har: Union[Path, str],
        url: Union[Pattern[str], str] = None,
        not_found: RouteFromHarNotFoundPolicy = None,
        update: bool = None,
    ) -&gt; None:
        if update:
            await self._record_into_har(har=har, page=None, url=url)
            return
        router = await HarRouter.create(
            local_utils=self._connection.local_utils,
            file=str(har),
            not_found_action=not_found or "abort",
            url_matcher=url,
        )
        await router.add_context_route(self)

    async def _disable_interception(self) -&gt; None:
        await self._channel.send("setNetworkInterceptionEnabled", dict(enabled=False))

    def expect_event(
        self,
        event: str,
        predicate: Callable = None,
        timeout: float = None,
    ) -&gt; EventContextManagerImpl:
        if timeout is None:
            timeout = self._timeout_settings.timeout()
        wait_helper = WaitHelper(self, f"browser_context.expect_event({event})")
        wait_helper.reject_on_timeout(
            timeout, f'Timeout {timeout}ms exceeded while waiting for event "{event}"'
        )
        if event != BrowserContext.Events.Close:
            wait_helper.reject_on_event(
                self, BrowserContext.Events.Close, Error("Context closed")
            )
        wait_helper.wait_for_event(self, event, predicate)
        return EventContextManagerImpl(wait_helper.result())

    def _on_close(self) -&gt; None:
        if self._browser:
            self._browser._contexts.remove(self)

        self.emit(BrowserContext.Events.Close, self)

    async def close(self) -&gt; None:
        try:
            for har_id, params in self._har_recorders.items():
                har = cast(
                    Artifact,
                    from_channel(
                        await self._channel.send("harExport", {"harId": har_id})
                    ),
                )
                # Server side will compress artifact if content is attach or if file is .zip.
                is_compressed = params.get("content") == "attach" or params[
                    "path"
                ].endswith(".zip")
                need_compressed = params["path"].endswith(".zip")
                if is_compressed and not need_compressed:
                    tmp_path = params["path"] + ".tmp"
                    await har.save_as(tmp_path)
                    await self._connection.local_utils.har_unzip(
                        zipFile=tmp_path, harFile=params["path"]
                    )
                else:
                    await har.save_as(params["path"])
                await har.delete()
            await self._channel.send("close")
            await self._closed_future
        except Exception as e:
            if not is_safe_close_error(e):
                raise e

    async def _pause(self) -&gt; None:
        await self._channel.send("pause")

    async def storage_state(self, path: Union[str, Path] = None) -&gt; StorageState:
        result = await self._channel.send_return_as_dict("storageState")
        if path:
            await async_writefile(path, json.dumps(result))
        return result

    async def wait_for_event(
        self, event: str, predicate: Callable = None, timeout: float = None
    ) -&gt; Any:
        async with self.expect_event(event, predicate, timeout) as event_info:
            pass
        return await event_info

    def expect_page(
        self,
        predicate: Callable[[Page], bool] = None,
        timeout: float = None,
    ) -&gt; EventContextManagerImpl[Page]:
        return self.expect_event(BrowserContext.Events.Page, predicate, timeout)

    def _on_background_page(self, page: Page) -&gt; None:
        self._background_pages.add(page)
        self.emit(BrowserContext.Events.BackgroundPage, page)

    def _on_service_worker(self, worker: Worker) -&gt; None:
        worker._context = self
        self._service_workers.add(worker)
        self.emit(BrowserContext.Events.ServiceWorker, worker)

    def _on_request_failed(
        self,
        request: Request,
        response_end_timing: float,
        failure_text: Optional[str],
        page: Optional[Page],
    ) -&gt; None:
        request._failure_text = failure_text
        request._set_response_end_timing(response_end_timing)
        self.emit(BrowserContext.Events.RequestFailed, request)
        if page:
            page.emit(Page.Events.RequestFailed, request)

    def _on_request_finished(
        self,
        request: Request,
        response: Optional[Response],
        response_end_timing: float,
        page: Optional[Page],
    ) -&gt; None:
        request._set_response_end_timing(response_end_timing)
        self.emit(BrowserContext.Events.RequestFinished, request)
        if page:
            page.emit(Page.Events.RequestFinished, request)
        if response:
            response._finished_future.set_result(True)

    def _on_request(self, request: Request, page: Optional[Page]) -&gt; None:
        self.emit(BrowserContext.Events.Request, request)
        if page:
            page.emit(Page.Events.Request, request)

    def _on_response(self, response: Response, page: Optional[Page]) -&gt; None:
        self.emit(BrowserContext.Events.Response, response)
        if page:
            page.emit(Page.Events.Response, response)

    @property
    def background_pages(self) -&gt; List[Page]:
        return list(self._background_pages)

    @property
    def service_workers(self) -&gt; List[Worker]:
        return list(self._service_workers)

    async def new_cdp_session(self, page: Union[Page, Frame]) -&gt; CDPSession:
        page = to_impl(page)
        params = {}
        if isinstance(page, Page):
            params["page"] = page._channel
        elif isinstance(page, Frame):
            params["frame"] = page._channel
        else:
            raise Error("page: expected Page or Frame")
        return from_channel(await self._channel.send("newCDPSession", params))

    @property
    def tracing(self) -&gt; Tracing:
        return self._tracing

    @property
    def request(self) -&gt; "APIRequestContext":
        return self._request
</pre></body></html>