1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
|
# ----------------------------------------------------------------------------
# Copyright (c) 2016-2023, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
import os
import pathlib
import shutil
import distutils
import tempfile
import weakref
from qiime2.core.util import set_permissions, USER_GROUP_RWX
_ConcretePath = type(pathlib.Path())
def _party_parrot(self, *args):
raise TypeError("Cannot mutate %r." % self)
class OwnedPath(_ConcretePath):
def __new__(cls, *args, **kwargs):
self = super().__new__(cls, *args, **kwargs)
self._user_owned = True
return self
def _copy_dir_or_file(self, other):
if self.is_dir():
return distutils.dir_util.copy_tree(str(self), str(other))
else:
return shutil.copy(str(self), str(other))
def _destruct(self):
if self.is_dir():
distutils.dir_util.remove_tree(str(self))
else:
self.unlink()
def _move_or_copy(self, other):
if self._user_owned:
return self._copy_dir_or_file(other)
else:
# Certain networked filesystems will experience a race
# condition on `rename`, so fall back to copying.
try:
return _ConcretePath.rename(self, other)
except (FileExistsError, OSError) as e:
# OSError errno 18 is cross device link, if we have this error
# we can solve it by copying. If we have a different OSError we
# still want to explode. FileExistsErrors are apparently
# instances of OSError, so we also make sure we don't have one
# of them when we explode
if isinstance(e, OSError) and e.errno != 18 and \
not isinstance(e, FileExistsError):
raise e
copied = self._copy_dir_or_file(other)
self._destruct()
return copied
class InPath(OwnedPath):
def __new__(cls, path):
self = super().__new__(cls, path)
self.__backing_path = path
if hasattr(path, '_user_owned'):
self._user_owned = path._user_owned
return self
chmod = lchmod = rename = replace = rmdir = symlink_to = touch = unlink = \
write_bytes = write_text = _party_parrot
def open(self, mode='r', buffering=-1, encoding=None, errors=None,
newline=None):
if 'w' in mode or ' ' in mode or 'a' in mode:
_party_parrot(self)
return super().open(mode=mode, buffering=buffering, encoding=encoding,
errors=errors, newline=newline)
class OutPath(OwnedPath):
@classmethod
def _destruct(cls, path):
if not os.path.exists(path):
return
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.unlink(path)
def __new__(cls, dir=False, **kwargs):
"""
Create a tempfile, return pathlib.Path reference to it.
"""
if dir:
name = tempfile.mkdtemp(**kwargs)
else:
fd, name = tempfile.mkstemp(**kwargs)
# fd is now assigned to our process table, but we don't need to do
# anything with the file. We will call `open` on the `name` later
# producing a different file descriptor, so close this one to
# prevent a resource leak.
os.close(fd)
obj = super().__new__(cls, name)
obj._destructor = weakref.finalize(obj, cls._destruct, str(obj))
return obj
def __exit__(self, t, v, tb):
self._destructor()
class InternalDirectory(_ConcretePath):
DEFAULT_PREFIX = 'qiime2-'
@classmethod
def _destruct(cls, path):
"""DO NOT USE DIRECTLY, use `_destructor()` instead"""
if os.path.exists(path):
set_permissions(path, None, USER_GROUP_RWX)
shutil.rmtree(path)
@classmethod
def __new(cls, *args):
self = super().__new__(cls, *args)
self._destructor = weakref.finalize(self, self._destruct, str(self))
return self
def __new__(cls, *args, prefix=None):
if args and prefix is not None:
raise TypeError("Cannot pass a path and a prefix at the same time")
elif args:
# This happens when the base-class's __reduce__ method is invoked
# for pickling.
return cls.__new(*args)
else:
if prefix is None:
prefix = cls.DEFAULT_PREFIX
elif not prefix.startswith(cls.DEFAULT_PREFIX):
prefix = cls.DEFAULT_PREFIX prefix
# TODO: normalize when temp-directories are configurable
path = tempfile.mkdtemp(prefix=prefix)
return cls.__new(path)
def __truediv__(self, path):
# We don't want to create self-destructing paths when using the join
# operator
return _ConcretePath(str(self), path)
def __rtruediv__(self, path):
# Same reasoning as truediv
return _ConcretePath(path, str(self))
def rename(self, path):
# We don't want to rename and lose the target right away.
# This overrides post python3.8 behavior.
_ConcretePath(str(self)).rename(path)
def replace(self, path):
# Same reasoning as replace
_ConcretePath(str(self)).replace(path)
class ArchivePath(InternalDirectory):
DEFAULT_PREFIX = 'qiime2-archive-'
class ProvenancePath(InternalDirectory):
DEFAULT_PREFIX = 'qiime2-provenance-'
|