Решение на In-memory файлова система от Теодор Климентов

Обратно към всички решения

Към профила на Теодор Климентов

Резултати

  • 9 точки от тестове
  • 0 бонус точки
  • 9 точки общо
  • 14 успешни тест(а)
  • 4 неуспешни тест(а)

Код

from enum import Enum
from copy import copy
class Path:
def __init__(self, path):
self._path = path
@property
def exists(self):
return self._path != ''
@property
def is_flat(self):
return self._path.count('/') == 0
def name(self):
if self.is_flat:
return self._path
return self._path.rsplit('/', 1)[1]
def top(self):
if self.is_flat:
return self._path
return self._path.split('/', 1)[0]
def parrent(self):
if self.is_flat:
return Path('')
return Path(self._path.rsplit('/', 1)[0] or '/')
def subpath(self):
if self.is_flat:
return Path('')
return Path(self._path.split('/', 1)[1])
def __str__(self):
return self._path
class NodeType(Enum):
directory = 'd'
file = 'f'
class Node:
def __init__(self, fs, type):
self._fs = fs
self._type = type
@property
def is_directory(self):
return self._type == NodeType.directory
@property
def type(self):
return self._type
@property
def file_system(self):
return self._fs
def finalize(self):
self._fs.free_space(1)
class Directory(Node):
def __init__(self, fs):
Node.__init__(self, fs, NodeType.directory)
self._nodes = {}
def get(self, name):
return self._nodes[name]
def set(self, name, node):
self._nodes[name] = node
def remove(self, name):
del self._nodes[name]
def __contains__(self, item):
return item in self._nodes
@property
def nodes(self):
return self.directories + self.files
@property
def directories(self):
return self._filter_nodes(NodeType.directory)
@property
def files(self):
return self._filter_nodes(NodeType.file)
def _filter_nodes(self, type):
result = sorted(
[
item
for item
in self._nodes.items()
if item[1].type == type
],
key=lambda x: x[0]
)
return [x[1] for x in result]
def finalize(self):
for node in self.nodes:
node.finalize()
class LinkPathError(Exception):
pass
class SymbolicLink(Node):
def __init__(self, fs, link_path):
Node.__init__(self, fs, fs.get_node(link_path).type)
self._path = link_path
def __getattr__(self, attr):
try:
node = self._fs.get_node(self._path)
return getattr(node, attr)
except Exception:
raise LinkPathError("'{}' does not exist".format(self._path))
def finalize(self):
super.finalize()
class FileContent:
def __init__(self, content):
self._content = content
self._refs = 0
@property
def content(self):
return self._content
@content.setter
def content(self, value):
self._content = value
def increment_refs(self):
self._refs += 1
def decrement_refs(self):
self._refs -= 1
@property
def refs(self):
return self._refs
class File(Node):
def __init__(self, fs, content):
Node.__init__(self, fs, NodeType.file)
self._content = FileContent(content)
self._content.increment_refs()
@property
def content(self):
return self._content.content
def append(self, text):
self._fs.obtain_space(len(text))
self._content.content = self.content + text
def truncate(self, text):
additional_space = max(0, len(text) + 1 - self.size)
self._fs.validate_free_space(additional_space)
self._fs.free_space(self.size)
self._content.content = text
self._fs.obtain_space(self.size)
@property
def size(self):
return len(self.content) + 1
def finalize(self):
if self._content.refs == 1:
self._fs.free_space(self.size)
else:
self._fs.free_space(1)
self._content.decrement_refs()
class HardLink(File):
def __init__(self, fs, file):
File.__init__(self, fs, '')
self._content = file._content
class NodeDoesNotExistError(Exception):
pass
class DestinationNodeDoesNotExistError(NodeDoesNotExistError):
pass
class SourceNodeDoesNotExistError(NodeDoesNotExistError):
pass
class DestinationNodeExistsError(Exception):
pass
class NotEnoughSpaceError(Exception):
pass
class NonExplicitDirectoryDeletionError(Exception):
pass
class NonEmptyDirectoryDeletionError(Exception):
pass
class DestinationNotADirectoryError(Exception):
pass
class FileSystemMountError(Exception):
pass
class MountPointDoesNotExistError(FileSystemMountError):
pass
class MountPointNotADirectoryError(FileSystemMountError):
pass
class MountPointNotEmptyError(FileSystemMountError):
pass
class NotAMountpointError(FileSystemMountError):
pass
class DirectoryHardLinkError(Exception):
pass
class FileSystem:
def __init__(self, size):
self._size = size
self._available_size = size - 1
self._root = Directory(self)
@property
def size(self):
return self._size
@property
def available_size(self):
return self._available_size
def validate_free_space(self, amount):
if self._available_size < amount:
message_format = 'Available space is {}, but {} is needed'
raise NotEnoughSpaceError(
message_format.format(self._available_size, amount)
)
def obtain_space(self, amount):
self.validate_free_space(amount)
self._available_size -= amount
def free_space(self, amount):
self._available_size += amount
def _get_node_on_path(self, path):
try:
if not path.exists:
raise KeyError()
current_path = path.subpath()
node = self._root
while current_path.exists:
node = node.get(current_path.top())
current_path = current_path.subpath()
return node
except KeyError:
message_format = "'{}' does not exist"
raise NodeDoesNotExistError(message_format.format(path))
def get_node(self, path):
return self._get_node_on_path(Path(path))
def validate_not_existing(self, path):
try:
self._get_node_on_path(path)
message_format = "'{}' already exists"
raise DestinationNodeExistsError(message_format.format(path))
except NodeDoesNotExistError:
return
def create(self, path, directory=False, content=''):
path = Path(path)
self.validate_not_existing(path)
parrent_dir = self._get_destination_node(path.parrent())
needed_space = directory and 1 or len(content) + 1
self.validate_free_space(needed_space)
self._available_size -= needed_space
if directory:
parrent_dir.set(path.name(), Directory(self))
else:
parrent_dir.set(path.name(), File(self, content))
def remove(self, path, directory=False, force=True):
path = Path(path)
node = self._get_node_on_path(path)
parrent_dir = self._get_node_on_path(path.parrent())
if not node.is_directory:
parrent_dir.remove(path.name())
return
if not directory:
raise NonExplicitDirectoryDeletionError(
"'{}' is a directory".format(path)
)
if len(node.nodes) > 0 and not force:
raise NonEmptyDirectoryDeletionError(
"Directory '{}' is not empty".format(path)
)
parrent_dir.remove(path.name())
def _get_source_node(self, path):
try:
return self._get_node_on_path(path)
except NodeDoesNotExistError:
raise SourceNodeDoesNotExistError()
def _get_destination_node(self, path):
try:
return self._get_node_on_path(path)
except NodeDoesNotExistError as e:
raise DestinationNodeDoesNotExistError(e)
def _validate_node_is_not_dir(self, path):
if not self._get_node_on_path(path).is_directory:
raise DestinationNotADirectoryError(
"'{}' is not a directory".format(path)
)
def _validate_node_not_contains(self, path, name):
if name in self._get_node_on_path(path):
raise DestinationNodeExistsError(
"Directory '{}' already contains '{}'".format(path, name)
)
def move(self, source, destination):
source = Path(source)
destination = Path(destination)
source_parrent = self._get_source_node(source.parrent())
destination_dir = self._get_destination_node(destination)
self._validate_node_is_not_dir(destination)
self._validate_node_not_contains(destination, source.name())
node = self._get_source_node(source)
source_parrent.remove(source.name())
destination_dir.set(source.name(), node)
def _validate_node_is_in_fs(self, path):
if self._get_node_on_path(path).file_system != self:
raise NodeDoesNotExistError(
"'{}' is on another file system".format(path)
)
def _validate_not_creating_hard_link_to_dir(self, path):
try:
self._validate_node_is_not_dir(path)
raise DirectoryHardLinkError(
"Cannot create hard link to directory '{}'".format(path)
)
except DestinationNotADirectoryError:
return
def link(self, source, destination, symbolic=True):
source = Path(source)
self.validate_free_space(1)
destination = Path(destination)
parrent_dir = self._get_node_on_path(destination.parrent())
if symbolic:
self._validate_node_is_in_fs(source)
parrent_dir.set(destination.name(), SymbolicLink(self, str(source)))
self._available_size -= 1
return
source_node = self._get_source_node(source)
self._validate_not_creating_hard_link_to_dir(source)
parrent_dir.set(destination.name(), HardLink(self, source_node))
self._available_size -= 1
def _validate_mounting_point_exists(self, path):
try:
self._get_node_on_path(path)
except NodeDoesNotExistError:
raise MountPointDoesNotExistError()
def _validate_mounting_point_is_directory(self, path):
if not self._get_node_on_path(path).is_directory:
message_format = "Cannot mount on non-directory node '{}'"
raise MountPointNotADirectoryError(message_format.format(path))
def _validate_mounting_point_is_empty(self, path):
if self._get_node_on_path(path).nodes:
message_format = "Mounting point '{}' is not empty"
raise MountPointNotEmptyError(message_format.format(path))
def mount(self, file_system, path):
path = Path(path)
self._validate_mounting_point_exists(path)
self._validate_mounting_point_is_directory(path)
self._validate_mounting_point_is_empty(path)
parrent_dir = self._get_node_on_path(path.parrent())
parrent_dir.set(path.name(), file_system.get_node('/'))
def _validate_node_is_mounting_point(self, path):
if self._get_node_on_path(path).file_system == self:
error_message = "'{}' is not mounting point".format(path)
raise NotAMountpointError(error_message)
def unmount(self, path):
path = Path(path)
parrent_dir = self._get_node_on_path(path.parrent())
node = self._get_node_on_path(path)
self._validate_node_is_mounting_point(path)
parrent_dir.set(path.name(), Directory(self))

Лог от изпълнението

........E....E.EE.
======================================================================
ERROR: test_mounting (test.TestFileSystem)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "lib/language/python/runner.py", line 65, in thread
    raise TimeoutError
TimeoutError

======================================================================
ERROR: test_remove_empty_directory (test.TestFileSystem)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "lib/language/python/runner.py", line 65, in thread
    raise TimeoutError
TimeoutError

======================================================================
ERROR: test_remove_nonempty_directory (test.TestFileSystem)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "lib/language/python/runner.py", line 65, in thread
    raise TimeoutError
TimeoutError

======================================================================
ERROR: test_symlink_to_missing_file (test.TestFileSystem)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "lib/language/python/runner.py", line 65, in thread
    raise TimeoutError
TimeoutError

----------------------------------------------------------------------
Ran 18 tests in 8.184s

FAILED (errors=4)

История (1 версия и 0 коментара)

Теодор обнови решението на 30.04.2015 00:13 (преди почти 9 години)

+from enum import Enum
+from copy import copy
+
+
+class Path:
+
+ def __init__(self, path):
+ self._path = path
+
+ @property
+ def exists(self):
+ return self._path != ''
+
+ @property
+ def is_flat(self):
+ return self._path.count('/') == 0
+
+ def name(self):
+ if self.is_flat:
+ return self._path
+ return self._path.rsplit('/', 1)[1]
+
+ def top(self):
+ if self.is_flat:
+ return self._path
+ return self._path.split('/', 1)[0]
+
+ def parrent(self):
+ if self.is_flat:
+ return Path('')
+ return Path(self._path.rsplit('/', 1)[0] or '/')
+
+ def subpath(self):
+ if self.is_flat:
+ return Path('')
+ return Path(self._path.split('/', 1)[1])
+
+ def __str__(self):
+ return self._path
+
+
+class NodeType(Enum):
+ directory = 'd'
+ file = 'f'
+
+
+class Node:
+
+ def __init__(self, fs, type):
+ self._fs = fs
+ self._type = type
+
+ @property
+ def is_directory(self):
+ return self._type == NodeType.directory
+
+ @property
+ def type(self):
+ return self._type
+
+ @property
+ def file_system(self):
+ return self._fs
+
+ def finalize(self):
+ self._fs.free_space(1)
+
+
+class Directory(Node):
+
+ def __init__(self, fs):
+ Node.__init__(self, fs, NodeType.directory)
+ self._nodes = {}
+
+ def get(self, name):
+ return self._nodes[name]
+
+ def set(self, name, node):
+ self._nodes[name] = node
+
+ def remove(self, name):
+ del self._nodes[name]
+
+ def __contains__(self, item):
+ return item in self._nodes
+
+ @property
+ def nodes(self):
+ return self.directories + self.files
+
+ @property
+ def directories(self):
+ return self._filter_nodes(NodeType.directory)
+
+ @property
+ def files(self):
+ return self._filter_nodes(NodeType.file)
+
+ def _filter_nodes(self, type):
+ result = sorted(
+ [
+ item
+ for item
+ in self._nodes.items()
+ if item[1].type == type
+ ],
+ key=lambda x: x[0]
+ )
+ return [x[1] for x in result]
+
+ def finalize(self):
+ for node in self.nodes:
+ node.finalize()
+
+
+class LinkPathError(Exception):
+ pass
+
+
+class SymbolicLink(Node):
+
+ def __init__(self, fs, link_path):
+ Node.__init__(self, fs, fs.get_node(link_path).type)
+ self._path = link_path
+
+ def __getattr__(self, attr):
+ try:
+ node = self._fs.get_node(self._path)
+ return getattr(node, attr)
+ except Exception:
+ raise LinkPathError("'{}' does not exist".format(self._path))
+
+ def finalize(self):
+ super.finalize()
+
+
+class FileContent:
+
+ def __init__(self, content):
+ self._content = content
+ self._refs = 0
+
+ @property
+ def content(self):
+ return self._content
+
+ @content.setter
+ def content(self, value):
+ self._content = value
+
+ def increment_refs(self):
+ self._refs += 1
+
+ def decrement_refs(self):
+ self._refs -= 1
+
+ @property
+ def refs(self):
+ return self._refs
+
+
+class File(Node):
+
+ def __init__(self, fs, content):
+ Node.__init__(self, fs, NodeType.file)
+ self._content = FileContent(content)
+ self._content.increment_refs()
+
+ @property
+ def content(self):
+ return self._content.content
+
+ def append(self, text):
+ self._fs.obtain_space(len(text))
+ self._content.content = self.content + text
+
+ def truncate(self, text):
+ additional_space = max(0, len(text) + 1 - self.size)
+ self._fs.validate_free_space(additional_space)
+
+ self._fs.free_space(self.size)
+ self._content.content = text
+ self._fs.obtain_space(self.size)
+
+ @property
+ def size(self):
+ return len(self.content) + 1
+
+ def finalize(self):
+ if self._content.refs == 1:
+ self._fs.free_space(self.size)
+ else:
+ self._fs.free_space(1)
+ self._content.decrement_refs()
+
+
+class HardLink(File):
+
+ def __init__(self, fs, file):
+ File.__init__(self, fs, '')
+ self._content = file._content
+
+
+class NodeDoesNotExistError(Exception):
+ pass
+
+
+class DestinationNodeDoesNotExistError(NodeDoesNotExistError):
+ pass
+
+
+class SourceNodeDoesNotExistError(NodeDoesNotExistError):
+ pass
+
+
+class DestinationNodeExistsError(Exception):
+ pass
+
+
+class NotEnoughSpaceError(Exception):
+ pass
+
+
+class NonExplicitDirectoryDeletionError(Exception):
+ pass
+
+
+class NonEmptyDirectoryDeletionError(Exception):
+ pass
+
+
+class DestinationNotADirectoryError(Exception):
+ pass
+
+
+class FileSystemMountError(Exception):
+ pass
+
+
+class MountPointDoesNotExistError(FileSystemMountError):
+ pass
+
+
+class MountPointNotADirectoryError(FileSystemMountError):
+ pass
+
+
+class MountPointNotEmptyError(FileSystemMountError):
+ pass
+
+
+class NotAMountpointError(FileSystemMountError):
+ pass
+
+
+class DirectoryHardLinkError(Exception):
+ pass
+
+
+class FileSystem:
+
+ def __init__(self, size):
+ self._size = size
+ self._available_size = size - 1
+ self._root = Directory(self)
+
+ @property
+ def size(self):
+ return self._size
+
+ @property
+ def available_size(self):
+ return self._available_size
+
+ def validate_free_space(self, amount):
+ if self._available_size < amount:
+ message_format = 'Available space is {}, but {} is needed'
+ raise NotEnoughSpaceError(
+ message_format.format(self._available_size, amount)
+ )
+
+ def obtain_space(self, amount):
+ self.validate_free_space(amount)
+ self._available_size -= amount
+
+ def free_space(self, amount):
+ self._available_size += amount
+
+ def _get_node_on_path(self, path):
+ try:
+ if not path.exists:
+ raise KeyError()
+ current_path = path.subpath()
+ node = self._root
+ while current_path.exists:
+ node = node.get(current_path.top())
+ current_path = current_path.subpath()
+ return node
+ except KeyError:
+ message_format = "'{}' does not exist"
+ raise NodeDoesNotExistError(message_format.format(path))
+
+ def get_node(self, path):
+ return self._get_node_on_path(Path(path))
+
+ def validate_not_existing(self, path):
+ try:
+ self._get_node_on_path(path)
+ message_format = "'{}' already exists"
+ raise DestinationNodeExistsError(message_format.format(path))
+ except NodeDoesNotExistError:
+ return
+
+ def create(self, path, directory=False, content=''):
+ path = Path(path)
+ self.validate_not_existing(path)
+ parrent_dir = self._get_destination_node(path.parrent())
+
+ needed_space = directory and 1 or len(content) + 1
+ self.validate_free_space(needed_space)
+
+ self._available_size -= needed_space
+ if directory:
+ parrent_dir.set(path.name(), Directory(self))
+ else:
+ parrent_dir.set(path.name(), File(self, content))
+
+ def remove(self, path, directory=False, force=True):
+ path = Path(path)
+ node = self._get_node_on_path(path)
+ parrent_dir = self._get_node_on_path(path.parrent())
+
+ if not node.is_directory:
+ parrent_dir.remove(path.name())
+ return
+
+ if not directory:
+ raise NonExplicitDirectoryDeletionError(
+ "'{}' is a directory".format(path)
+ )
+
+ if len(node.nodes) > 0 and not force:
+ raise NonEmptyDirectoryDeletionError(
+ "Directory '{}' is not empty".format(path)
+ )
+
+ parrent_dir.remove(path.name())
+
+ def _get_source_node(self, path):
+ try:
+ return self._get_node_on_path(path)
+ except NodeDoesNotExistError:
+ raise SourceNodeDoesNotExistError()
+
+ def _get_destination_node(self, path):
+ try:
+ return self._get_node_on_path(path)
+ except NodeDoesNotExistError as e:
+ raise DestinationNodeDoesNotExistError(e)
+
+ def _validate_node_is_not_dir(self, path):
+ if not self._get_node_on_path(path).is_directory:
+ raise DestinationNotADirectoryError(
+ "'{}' is not a directory".format(path)
+ )
+
+ def _validate_node_not_contains(self, path, name):
+ if name in self._get_node_on_path(path):
+ raise DestinationNodeExistsError(
+ "Directory '{}' already contains '{}'".format(path, name)
+ )
+
+ def move(self, source, destination):
+ source = Path(source)
+ destination = Path(destination)
+ source_parrent = self._get_source_node(source.parrent())
+ destination_dir = self._get_destination_node(destination)
+
+ self._validate_node_is_not_dir(destination)
+ self._validate_node_not_contains(destination, source.name())
+
+ node = self._get_source_node(source)
+ source_parrent.remove(source.name())
+ destination_dir.set(source.name(), node)
+
+ def _validate_node_is_in_fs(self, path):
+ if self._get_node_on_path(path).file_system != self:
+ raise NodeDoesNotExistError(
+ "'{}' is on another file system".format(path)
+ )
+
+ def _validate_not_creating_hard_link_to_dir(self, path):
+ try:
+ self._validate_node_is_not_dir(path)
+ raise DirectoryHardLinkError(
+ "Cannot create hard link to directory '{}'".format(path)
+ )
+ except DestinationNotADirectoryError:
+ return
+
+ def link(self, source, destination, symbolic=True):
+ source = Path(source)
+ self.validate_free_space(1)
+ destination = Path(destination)
+ parrent_dir = self._get_node_on_path(destination.parrent())
+
+ if symbolic:
+ self._validate_node_is_in_fs(source)
+ parrent_dir.set(destination.name(), SymbolicLink(self, str(source)))
+ self._available_size -= 1
+ return
+
+ source_node = self._get_source_node(source)
+ self._validate_not_creating_hard_link_to_dir(source)
+ parrent_dir.set(destination.name(), HardLink(self, source_node))
+ self._available_size -= 1
+
+ def _validate_mounting_point_exists(self, path):
+ try:
+ self._get_node_on_path(path)
+ except NodeDoesNotExistError:
+ raise MountPointDoesNotExistError()
+
+ def _validate_mounting_point_is_directory(self, path):
+ if not self._get_node_on_path(path).is_directory:
+ message_format = "Cannot mount on non-directory node '{}'"
+ raise MountPointNotADirectoryError(message_format.format(path))
+
+ def _validate_mounting_point_is_empty(self, path):
+ if self._get_node_on_path(path).nodes:
+ message_format = "Mounting point '{}' is not empty"
+ raise MountPointNotEmptyError(message_format.format(path))
+
+ def mount(self, file_system, path):
+ path = Path(path)
+ self._validate_mounting_point_exists(path)
+ self._validate_mounting_point_is_directory(path)
+ self._validate_mounting_point_is_empty(path)
+
+ parrent_dir = self._get_node_on_path(path.parrent())
+ parrent_dir.set(path.name(), file_system.get_node('/'))
+
+ def _validate_node_is_mounting_point(self, path):
+ if self._get_node_on_path(path).file_system == self:
+ error_message = "'{}' is not mounting point".format(path)
+ raise NotAMountpointError(error_message)
+
+ def unmount(self, path):
+ path = Path(path)
+ parrent_dir = self._get_node_on_path(path.parrent())
+ node = self._get_node_on_path(path)
+
+ self._validate_node_is_mounting_point(path)
+ parrent_dir.set(path.name(), Directory(self))