Source code for zyklop.search

# Copyright (C) 2011-2012, Roman Joost <roman@bromeco.de>
#
# This library is free software: you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.  If not, see
# <http://www.gnu.org/licenses/>.
import collections
import logging
import os
import re


class SearchResult(object):

    def __init__(self, path, level, children=None, visited=None):
        self.path = path
        self.level = level
        self.children = children is not None and children or collections.deque()
        self.visited = visited is not None and visited or []

    def __repr__(self):
        return '<{0} object {1}>'.format(
            self.__class__.__name__, self.path)


[docs]def absolute_path(node): """ Helper function to traverse a given :obj:`TreeNode` and returns an absolute path. """ segms = absolute_path_helper(node) segms.reverse() return os.path.join(*[x.name for x in segms])
[docs]def absolute_path_helper(node, segments=None): """ Returns the traversed node segments to create an absolute path in reverse order. """ if segments is None: segments = [node] parent = node.parent if parent is None: return segments else: segments.append(parent) absolute_path_helper(parent, segments) return segments
[docs]class TreeNode(object): """ A node in an abstract directory tree. The tree will degrade to a linked list and is not balanced, nor weightened. All nodes are inserted based on their name. Duplicates are not inserted. Top most node is labeled with :attr:`/`. The :class:`TreeNode` supports access to children via the ``__getitem__`` method and comparisons with ``__contains__``. """ def __init__(self, name="/", parent=None): self.name = name == "" and "/" or name self.parent = parent self.children = []
[docs] def traverse(self, segms): """ Traverses the given segments and appends each directory to the tree as a :obj:`TreeNode` (leaf). A leaf is not appended if it already exists. The path should be already split into segments, eg. ``/foo/bar`` should be passed as: ``['foo', 'bar']`` """ segms = collections.deque(segms) while (segms): nname = segms.popleft() node = TreeNode(nname, self) try: node = self[node.name] except KeyError: pass if not node.name in self: self.children.append(node) return node.traverse(segms)
[docs] def traverse_path(self, path): """ Traverses given path and appends them to the tree as :obj:`TreeNodes`. This is a wrapper method for :meth:`traverse`. """ result = path.split('/') if os.path.isabs(path): result = result[1:] return self.traverse(result)
def __repr__(self): return '<{0} object {1}@{2}>'.format( self.__class__.__name__, self.name, len(self.children)) def __getitem__(self, key): item = [x for x in self.children if x.name == key] if not item: raise KeyError return item[0] def __contains__(self, item): return item in [x.name for x in self.children]
class Search(object): maxdepth = 30 def __init__(self, top, regexp, childnodeprovider): self.top = top self.regexp = re.compile(regexp) self.childnodeprovider = childnodeprovider self.logger = logging.getLogger('zyklop') def find(self, children=None, visited=None, level=0): """ Performs a BFS and matches every child with the provided regular expression to find the goal node. """ if not children: children = collections.deque( self.childnodeprovider.get_children(self.top)) if level == self.maxdepth or not children: return if visited is None: visited = [] while children: child = children.pop() self.logger.debug("Searching {0}.".format(child)) if child in visited: continue if self.regexp.search(child): visited.append(child) return SearchResult(child, level, children, visited) visited.append(child) for c in visited: children.extendleft( self.childnodeprovider.get_children(c)) level += 1 return self.find(children, visited, level=level) def findall(self): """ Generator to yield all absolute paths.""" children = None visited = None while 1: result = self.find(children=children, visited=visited) if result is None: break children = result.children visited = result.visited yield result class DirectoryChildNodeProvider(object): def get_children(self, abspath): if not abspath.startswith('/'): raise ValueError( "abspath parameter needs to be an absolute path: {0}".format( abspath)) return self._get_children_helper(abspath) def _get_children_helper(self, abspath): """ Helper function which returns a list of children. """ raise NotImplementedError("Must be implemented in sublcasses.") class ParamikoChildNodeProvider(DirectoryChildNodeProvider): def __init__(self, sftpclient): self.sftpclient = sftpclient def _get_children_helper(self, abspath): try: result = [os.path.join(abspath, c) for c in\ self.sftpclient.listdir(abspath)] except IOError: result = [] return result