Skip to content

igraph

IGraphMemoryBackend

Bases: PermissionGraphBackend

IGraph based PermissionGraphBackend implementation.

Source code in src/permission_graph/backends/igraph.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class IGraphMemoryBackend(PermissionGraphBackend):
    """IGraph based PermissionGraphBackend implementation."""

    def __init__(self):
        self._g = igraph.Graph(directed=True)

    def add_vertex(self, vertex: Vertex, **kwargs) -> None:
        try:
            self._get_igraph_vertex(vertex.id)
        except ValueError:
            self._g.add_vertex(f"{vertex.id}", vtype=vertex.vtype, **kwargs)
        else:
            raise ValueError(f"Vertex already exists: {vertex}")

    def remove_vertex(self, vertex: Vertex) -> None:
        v = self._g.vs.find(vertex.id)
        self._g.delete_vertices(v.index)

    def update_vertex_attributes(self, vertex: Vertex, **kwargs: Any) -> None:
        v = self._g.vs.find(vertex.id)
        for key, value in kwargs.items():
            v[key] = value

    def get_vertices_to(self, vertex: Vertex) -> list[Vertex]:
        v = self._get_igraph_vertex(vertex.id)
        sources = [self.vertex_factory(edge.source_vertex["name"]) for edge in self._g.es.select(_target=v)]
        return sources

    def get_vertices_from(self, vertex: Vertex) -> list[Vertex]:
        v = self._get_igraph_vertex(vertex.id)
        sources = [self.vertex_factory(edge.target_vertex["name"]) for edge in self._g.es.select(_source=v)]
        return sources

    def _get_igraph_vertex(self, vertex_id: str) -> igraph.Vertex:
        """Get an igraph vertex given a vertex id."""
        return self._g.vs.find(vertex_id)

    def vertex_exists(self, vertex: Vertex) -> bool:
        """Return True if a vertex wit hthat id already exists."""
        try:
            self._get_igraph_vertex(vertex.id)
            return True
        except ValueError:
            return False

    def add_edge(self, etype: EdgeType, source: Vertex, target: Vertex, **kwargs) -> None:
        v1 = self._get_igraph_vertex(source.id)
        v2 = self._get_igraph_vertex(target.id)
        try:
            self._get_igraph_edge(source, target)
        except ValueError:
            extra_attrs = {attr: [val] for attr, val in kwargs.items()}
            self._g.add_edges([(v1, v2)], attributes=dict(etype=[etype.value], **extra_attrs))
        else:
            raise ValueError(f"There is already an edge between vertices '{v1.index}' and '{v2.index}'")

    def _get_igraph_edge(self, source: Vertex, target: Vertex) -> igraph.Edge:
        """Return an IGraph edge given edge definition."""
        v1 = self._get_igraph_vertex(source.id)
        v2 = self._get_igraph_vertex(target.id)
        return self._g.es.find(_source=v1.index, _target=v2.index)

    def edge_exists(self, source: Vertex, target: Vertex) -> bool:
        """Return True if there is an edge between source and target."""
        try:
            self._get_igraph_edge(source, target)
            return True
        except ValueError:
            return False

    def remove_edge(self, source: Vertex, target: Vertex) -> None:
        """Remove an edge from the permission graph."""
        e = self._get_igraph_edge(source, target)
        if e is not None:
            self._g.delete_edges(e.index)

    def shortest_paths(self, source: Vertex, target: Vertex) -> list[list[Vertex]]:
        """Return all shortest paths from source to target."""
        v1 = self._get_igraph_vertex(source.id)
        v2 = self._get_igraph_vertex(target.id)
        paths = self._g.get_all_shortest_paths(v1, v2)
        output = []
        for path in paths:
            vertex_path = []
            for index in path:
                v = self._g.vs[index]
                vertex_path.append(self.vertex_factory(v["name"]))
            output.append(vertex_path)
        return output

    def get_edge_type(self, source: Vertex, target: Vertex) -> EdgeType:
        """Get the type of edge from source to target."""
        e = self._get_igraph_edge(source, target)
        if e is None:
            raise ValueError(f"There is no edge from {source} to {target}.")
        return EdgeType(e["etype"])

    def vertex_factory(self, vertex_id) -> Vertex:
        """Return a vertex from a vertex id."""
        vtype_map = {
            "actor": Actor,
            "resource": Resource,
            "action": Action,
            "group": Group,
            "resource_type": ResourceType,
        }
        vtype = vertex_id.split(":")[0]
        v = self._get_igraph_vertex(vertex_id)
        attributes = v.attributes()
        attributes = {k: v for k, v in v.attributes().items() if k not in ("vtype", "name") and v is not None}
        return vtype_map[vtype].from_id(vertex_id, **attributes)

edge_exists(source, target)

Return True if there is an edge between source and target.

Source code in src/permission_graph/backends/igraph.py
79
80
81
82
83
84
85
def edge_exists(self, source: Vertex, target: Vertex) -> bool:
    """Return True if there is an edge between source and target."""
    try:
        self._get_igraph_edge(source, target)
        return True
    except ValueError:
        return False

get_edge_type(source, target)

Get the type of edge from source to target.

Source code in src/permission_graph/backends/igraph.py
107
108
109
110
111
112
def get_edge_type(self, source: Vertex, target: Vertex) -> EdgeType:
    """Get the type of edge from source to target."""
    e = self._get_igraph_edge(source, target)
    if e is None:
        raise ValueError(f"There is no edge from {source} to {target}.")
    return EdgeType(e["etype"])

remove_edge(source, target)

Remove an edge from the permission graph.

Source code in src/permission_graph/backends/igraph.py
87
88
89
90
91
def remove_edge(self, source: Vertex, target: Vertex) -> None:
    """Remove an edge from the permission graph."""
    e = self._get_igraph_edge(source, target)
    if e is not None:
        self._g.delete_edges(e.index)

shortest_paths(source, target)

Return all shortest paths from source to target.

Source code in src/permission_graph/backends/igraph.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def shortest_paths(self, source: Vertex, target: Vertex) -> list[list[Vertex]]:
    """Return all shortest paths from source to target."""
    v1 = self._get_igraph_vertex(source.id)
    v2 = self._get_igraph_vertex(target.id)
    paths = self._g.get_all_shortest_paths(v1, v2)
    output = []
    for path in paths:
        vertex_path = []
        for index in path:
            v = self._g.vs[index]
            vertex_path.append(self.vertex_factory(v["name"]))
        output.append(vertex_path)
    return output

vertex_exists(vertex)

Return True if a vertex wit hthat id already exists.

Source code in src/permission_graph/backends/igraph.py
54
55
56
57
58
59
60
def vertex_exists(self, vertex: Vertex) -> bool:
    """Return True if a vertex wit hthat id already exists."""
    try:
        self._get_igraph_vertex(vertex.id)
        return True
    except ValueError:
        return False

vertex_factory(vertex_id)

Return a vertex from a vertex id.

Source code in src/permission_graph/backends/igraph.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def vertex_factory(self, vertex_id) -> Vertex:
    """Return a vertex from a vertex id."""
    vtype_map = {
        "actor": Actor,
        "resource": Resource,
        "action": Action,
        "group": Group,
        "resource_type": ResourceType,
    }
    vtype = vertex_id.split(":")[0]
    v = self._get_igraph_vertex(vertex_id)
    attributes = v.attributes()
    attributes = {k: v for k, v in v.attributes().items() if k not in ("vtype", "name") and v is not None}
    return vtype_map[vtype].from_id(vertex_id, **attributes)