<html><head><meta name="color-scheme" content="light dark"></head><body><pre style="word-wrap: break-word; white-space: pre-wrap;">"""Base class for filesystem wrappers.
"""

from __future__ import unicode_literals

import typing

import six

from . import errors
from .base import FS
from .copy import copy_dir, copy_file
from .error_tools import unwrap_errors
from .info import Info
from .path import abspath, join, normpath

if typing.TYPE_CHECKING:
    from typing import (
        IO,
        Any,
        AnyStr,
        BinaryIO,
        Callable,
        Collection,
        Iterable,
        Iterator,
        List,
        Mapping,
        Optional,
        Text,
        Tuple,
        Union,
    )

    from datetime import datetime
    from threading import RLock

    from .enums import ResourceType
    from .info import RawInfo
    from .permissions import Permissions
    from .subfs import SubFS
    from .walk import BoundWalker

    _T = typing.TypeVar("_T", bound="FS")
    _OpendirFactory = Callable[[_T, Text], SubFS[_T]]


_F = typing.TypeVar("_F", bound="FS", covariant=True)
_W = typing.TypeVar("_W", bound="WrapFS[FS]")


@six.python_2_unicode_compatible
class WrapFS(FS, typing.Generic[_F]):
    """A proxy for a filesystem object.

    This class exposes an filesystem interface, where the data is
    stored on another filesystem(s), and is the basis for
    `~fs.subfs.SubFS` and other *virtual* filesystems.

    """

    wrap_name = None  # type: Optional[Text]

    def __init__(self, wrap_fs):  # noqa: D107
        # type: (_F) -&gt; None
        self._wrap_fs = wrap_fs
        super(WrapFS, self).__init__()

    def __repr__(self):
        # type: () -&gt; Text
        return "{}({!r})".format(self.__class__.__name__, self._wrap_fs)

    def __str__(self):
        # type: () -&gt; Text
        wraps = []
        _fs = self  # type: Union[FS, WrapFS[FS]]
        while hasattr(_fs, "_wrap_fs"):
            wrap_name = getattr(_fs, "wrap_name", None)
            if wrap_name is not None:
                wraps.append(wrap_name)
            _fs = _fs._wrap_fs  # type: ignore
        if wraps:
            _str = "{}({})".format(_fs, ", ".join(wraps[::-1]))
        else:
            _str = "{}".format(_fs)
        return _str

    def delegate_path(self, path):
        # type: (Text) -&gt; Tuple[_F, Text]
        """Encode a path for proxied filesystem.

        Arguments:
            path (str): A path on the filesystem.

        Returns:
            (FS, str): a tuple of ``(&lt;filesystem&gt;, &lt;new_path&gt;)``

        """
        return self._wrap_fs, path

    def delegate_fs(self):
        # type: () -&gt; _F
        """Get the proxied filesystem.

        This method should return a filesystem for methods not
        associated with a path, e.g. `~fs.base.FS.getmeta`.

        """
        return self._wrap_fs

    def appendbytes(self, path, data):
        # type: (Text, bytes) -&gt; None
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            return _fs.appendbytes(_path, data)

    def appendtext(
        self,
        path,  # type: Text
        text,  # type: Text
        encoding="utf-8",  # type: Text
        errors=None,  # type: Optional[Text]
        newline="",  # type: Text
    ):
        # type: (...) -&gt; None
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            return _fs.appendtext(
                _path, text, encoding=encoding, errors=errors, newline=newline
            )

    def getinfo(self, path, namespaces=None):
        # type: (Text, Optional[Collection[Text]]) -&gt; Info
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            raw_info = _fs.getinfo(_path, namespaces=namespaces).raw
        if abspath(normpath(path)) == "/":
            raw_info = dict(raw_info)
            raw_info["basic"]["name"] = ""  # type: ignore
        return Info(raw_info)

    def listdir(self, path):
        # type: (Text) -&gt; List[Text]
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            dir_list = _fs.listdir(_path)
        return dir_list

    def lock(self):
        # type: () -&gt; RLock
        self.check()
        _fs = self.delegate_fs()
        return _fs.lock()

    def makedir(
        self,
        path,  # type: Text
        permissions=None,  # type: Optional[Permissions]
        recreate=False,  # type: bool
    ):
        # type: (...) -&gt; SubFS[FS]
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            return _fs.makedir(_path, permissions=permissions, recreate=recreate)

    def move(self, src_path, dst_path, overwrite=False, preserve_time=False):
        # type: (Text, Text, bool, bool) -&gt; None
        _fs, _src_path = self.delegate_path(src_path)
        _, _dst_path = self.delegate_path(dst_path)
        with unwrap_errors({_src_path: src_path, _dst_path: dst_path}):
            _fs.move(
                _src_path, _dst_path, overwrite=overwrite, preserve_time=preserve_time
            )

    def movedir(self, src_path, dst_path, create=False, preserve_time=False):
        # type: (Text, Text, bool, bool) -&gt; None
        _fs, _src_path = self.delegate_path(src_path)
        _, _dst_path = self.delegate_path(dst_path)
        with unwrap_errors({_src_path: src_path, _dst_path: dst_path}):
            _fs.movedir(
                _src_path, _dst_path, create=create, preserve_time=preserve_time
            )

    def openbin(self, path, mode="r", buffering=-1, **options):
        # type: (Text, Text, int, **Any) -&gt; BinaryIO
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            bin_file = _fs.openbin(_path, mode=mode, buffering=-1, **options)
        return bin_file

    def remove(self, path):
        # type: (Text) -&gt; None
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _fs.remove(_path)

    def removedir(self, path):
        # type: (Text) -&gt; None
        self.check()
        _path = abspath(normpath(path))
        if _path == "/":
            raise errors.RemoveRootError()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _fs.removedir(_path)

    def removetree(self, dir_path):
        # type: (Text) -&gt; None
        self.check()
        _path = abspath(normpath(dir_path))
        _delegate_fs, _delegate_path = self.delegate_path(dir_path)
        with unwrap_errors(dir_path):
            if _path == "/":
                # with root path, we must remove the contents but
                # not the directory itself, so we can't just directly
                # delegate
                for info in _delegate_fs.scandir(_delegate_path):
                    info_path = join(_delegate_path, info.name)
                    if info.is_dir:
                        _delegate_fs.removetree(info_path)
                    else:
                        _delegate_fs.remove(info_path)
            else:
                _delegate_fs.removetree(_delegate_path)

    def scandir(
        self,
        path,  # type: Text
        namespaces=None,  # type: Optional[Collection[Text]]
        page=None,  # type: Optional[Tuple[int, int]]
    ):
        # type: (...) -&gt; Iterator[Info]
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            for info in _fs.scandir(_path, namespaces=namespaces, page=page):
                yield info

    def setinfo(self, path, info):
        # type: (Text, RawInfo) -&gt; None
        self.check()
        _fs, _path = self.delegate_path(path)
        return _fs.setinfo(_path, info)

    def settimes(self, path, accessed=None, modified=None):
        # type: (Text, Optional[datetime], Optional[datetime]) -&gt; None
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _fs.settimes(_path, accessed=accessed, modified=modified)

    def touch(self, path):
        # type: (Text) -&gt; None
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _fs.touch(_path)

    def copy(self, src_path, dst_path, overwrite=False, preserve_time=False):
        # type: (Text, Text, bool, bool) -&gt; None
        src_fs, _src_path = self.delegate_path(src_path)
        dst_fs, _dst_path = self.delegate_path(dst_path)
        with unwrap_errors({_src_path: src_path, _dst_path: dst_path}):
            if not overwrite and dst_fs.exists(_dst_path):
                raise errors.DestinationExists(_dst_path)
            copy_file(src_fs, _src_path, dst_fs, _dst_path, preserve_time=preserve_time)

    def copydir(self, src_path, dst_path, create=False, preserve_time=False):
        # type: (Text, Text, bool, bool) -&gt; None
        src_fs, _src_path = self.delegate_path(src_path)
        dst_fs, _dst_path = self.delegate_path(dst_path)
        with unwrap_errors({_src_path: src_path, _dst_path: dst_path}):
            if not create and not dst_fs.exists(_dst_path):
                raise errors.ResourceNotFound(dst_path)
            if not src_fs.getinfo(_src_path).is_dir:
                raise errors.DirectoryExpected(src_path)
            copy_dir(src_fs, _src_path, dst_fs, _dst_path, preserve_time=preserve_time)

    def create(self, path, wipe=False):
        # type: (Text, bool) -&gt; bool
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            return _fs.create(_path, wipe=wipe)

    def desc(self, path):
        # type: (Text) -&gt; Text
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            desc = _fs.desc(_path)
        return desc

    def download(self, path, file, chunk_size=None, **options):
        # type: (Text, BinaryIO, Optional[int], **Any) -&gt; None
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _fs.download(_path, file, chunk_size=chunk_size, **options)

    def exists(self, path):
        # type: (Text) -&gt; bool
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            exists = _fs.exists(_path)
        return exists

    def filterdir(
        self,
        path,  # type: Text
        files=None,  # type: Optional[Iterable[Text]]
        dirs=None,  # type: Optional[Iterable[Text]]
        exclude_dirs=None,  # type: Optional[Iterable[Text]]
        exclude_files=None,  # type: Optional[Iterable[Text]]
        namespaces=None,  # type: Optional[Collection[Text]]
        page=None,  # type: Optional[Tuple[int, int]]
    ):
        # type: (...) -&gt; Iterator[Info]
        self.check()
        _fs, _path = self.delegate_path(path)
        iter_files = iter(
            _fs.filterdir(
                _path,
                exclude_dirs=exclude_dirs,
                exclude_files=exclude_files,
                files=files,
                dirs=dirs,
                namespaces=namespaces,
                page=page,
            )
        )
        with unwrap_errors(path):
            for info in iter_files:
                yield info

    def readbytes(self, path):
        # type: (Text) -&gt; bytes
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _bytes = _fs.readbytes(_path)
        return _bytes

    def readtext(
        self,
        path,  # type: Text
        encoding=None,  # type: Optional[Text]
        errors=None,  # type: Optional[Text]
        newline="",  # type: Text
    ):
        # type: (...) -&gt; Text
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _text = _fs.readtext(
                _path, encoding=encoding, errors=errors, newline=newline
            )
        return _text

    def getmeta(self, namespace="standard"):
        # type: (Text) -&gt; Mapping[Text, object]
        self.check()
        meta = self.delegate_fs().getmeta(namespace=namespace)
        return meta

    def getsize(self, path):
        # type: (Text) -&gt; int
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            size = _fs.getsize(_path)
        return size

    def getsyspath(self, path):
        # type: (Text) -&gt; Text
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            sys_path = _fs.getsyspath(_path)
        return sys_path

    def gettype(self, path):
        # type: (Text) -&gt; ResourceType
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _type = _fs.gettype(_path)
        return _type

    def geturl(self, path, purpose="download"):
        # type: (Text, Text) -&gt; Text
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            return _fs.geturl(_path, purpose=purpose)

    def hassyspath(self, path):
        # type: (Text) -&gt; bool
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            has_sys_path = _fs.hassyspath(_path)
        return has_sys_path

    def hasurl(self, path, purpose="download"):
        # type: (Text, Text) -&gt; bool
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            has_url = _fs.hasurl(_path, purpose=purpose)
        return has_url

    def isdir(self, path):
        # type: (Text) -&gt; bool
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _isdir = _fs.isdir(_path)
        return _isdir

    def isfile(self, path):
        # type: (Text) -&gt; bool
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _isfile = _fs.isfile(_path)
        return _isfile

    def islink(self, path):
        # type: (Text) -&gt; bool
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _islink = _fs.islink(_path)
        return _islink

    def makedirs(
        self,
        path,  # type: Text
        permissions=None,  # type: Optional[Permissions]
        recreate=False,  # type: bool
    ):
        # type: (...) -&gt; SubFS[FS]
        self.check()
        _fs, _path = self.delegate_path(path)
        return _fs.makedirs(_path, permissions=permissions, recreate=recreate)

    # FIXME(@althonos): line_buffering is not a FS.open declared argument
    def open(
        self,
        path,  # type: Text
        mode="r",  # type: Text
        buffering=-1,  # type: int
        encoding=None,  # type: Optional[Text]
        errors=None,  # type: Optional[Text]
        newline="",  # type: Text
        line_buffering=False,  # type: bool
        **options  # type: Any
    ):
        # type: (...) -&gt; IO[AnyStr]
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            open_file = _fs.open(
                _path,
                mode=mode,
                buffering=buffering,
                encoding=encoding,
                errors=errors,
                newline=newline,
                line_buffering=line_buffering,
                **options
            )
        return open_file

    def opendir(
        self,  # type: _W
        path,  # type: Text
        factory=None,  # type: Optional[_OpendirFactory]
    ):
        # type: (...) -&gt; SubFS[_W]
        from .subfs import SubFS

        factory = factory or SubFS
        if not self.getinfo(path).is_dir:
            raise errors.DirectoryExpected(path=path)
        with unwrap_errors(path):
            return factory(self, path)

    def writebytes(self, path, contents):
        # type: (Text, bytes) -&gt; None
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _fs.writebytes(_path, contents)

    def upload(self, path, file, chunk_size=None, **options):
        # type: (Text, BinaryIO, Optional[int], **Any) -&gt; None
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _fs.upload(_path, file, chunk_size=chunk_size, **options)

    def writefile(
        self,
        path,  # type: Text
        file,  # type: IO[AnyStr]
        encoding=None,  # type: Optional[Text]
        errors=None,  # type: Optional[Text]
        newline="",  # type: Text
    ):
        # type: (...) -&gt; None
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _fs.writefile(
                _path, file, encoding=encoding, errors=errors, newline=newline
            )

    def validatepath(self, path):
        # type: (Text) -&gt; Text
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _fs.validatepath(_path)
        path = abspath(normpath(path))
        return path

    def hash(self, path, name):
        # type: (Text, Text) -&gt; Text
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            return _fs.hash(_path, name)

    @property
    def walk(self):
        # type: () -&gt; BoundWalker
        return self._wrap_fs.walker_class.bind(self)
</pre></body></html>