Coverage for volexport/lvm2.py: 66%
194 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-20 14:19 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-20 14:19 +0000
1import shlex
2import datetime
3from abc import abstractmethod
4from .util import runcmd
5from .config import config
6from logging import getLogger
7from typing import Sequence, override
9_log = getLogger(__name__)
12def parse(input: Sequence[str], indent: int, width: int) -> list[dict]:
13 res = []
14 ent = {}
15 for i in input:
16 if len(i) < indent:
17 _log.debug("short line: %s", i)
18 continue
19 if not i.startswith(" " * indent): 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true
20 _log.debug("no indent: %s", i)
21 continue
22 if i[indent : indent + 3] == "---":
23 _log.debug("separator: %s", i)
24 if ent:
25 _log.debug("append: %s", i)
26 res.append(ent)
27 ent = {}
28 continue
29 if len(i) <= indent + width:
30 _log.debug("short width: %s", i)
31 continue
32 if i[indent + width] != " ": 32 ↛ 33line 32 didn't jump to line 33 because the condition on line 32 was never true
33 _log.debug("not split: %s / %s", repr(i[indent + width]), i)
34 continue
35 name = i[indent : indent + width].strip()
36 val = i[indent + width + 1 :].strip()
37 ent[name] = val
38 if ent:
39 res.append(ent)
40 return res
43def runparse(cmd: list[str], indent: int, width: int) -> list[dict]:
44 if config.LVM_BIN: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true
45 cmd[0:0] = shlex.split(config.LVM_BIN)
46 res = runcmd(cmd, root=True)
47 return parse(res.stdout.splitlines(keepends=False), indent, width)
50class Base:
51 def __init__(self, name: str | None = None):
52 self.name = name
54 def find_by(self, data: list[dict], keyname: str, value: str):
55 for i in data:
56 if i.get(keyname) == value:
57 return i
58 return None
60 @abstractmethod
61 def get(self) -> dict | None:
62 raise NotImplementedError("get")
64 @abstractmethod
65 def getlist(self) -> list[dict]:
66 raise NotImplementedError("list")
68 @abstractmethod
69 def create(self) -> dict:
70 raise NotImplementedError("create")
72 @abstractmethod
73 def delete(self) -> None:
74 raise NotImplementedError("delete")
76 @abstractmethod
77 def scan(self) -> list[dict]:
78 raise NotImplementedError("scan")
81class PV(Base):
82 @override
83 def get(self) -> dict | None:
84 if self.name is None:
85 return None
86 return self.find_by(self.getlist(), "PV Name", self.name)
88 @override
89 def getlist(self) -> list[dict]:
90 return runparse(["pvdisplay", "--unit", "b"], 2, 21)
92 @override
93 def create(self) -> dict:
94 assert self.name is not None
95 runcmd(["pvcreate", self.name], True)
96 res = self.get()
97 assert res is not None
98 return res
100 @override
101 def delete(self) -> None:
102 assert self.name is not None
103 runcmd(["pvremove", self.name, "-y"], True)
105 @override
106 def scan(self) -> list[dict]:
107 runcmd(["pvscan"], True)
108 return self.getlist()
111class VG(Base):
112 @override
113 def get(self) -> dict | None:
114 if self.name is None: 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true
115 return None
116 res = runparse(["vgdisplay", "--unit", "b", self.name], indent=2, width=21)
117 if len(res) == 0:
118 return None
119 return res[0]
121 @override
122 def getlist(self) -> list[dict]:
123 return runparse(["vgdisplay", "--unit", "b"], 2, 21)
125 @override
126 def create(self, pvs: list[PV]) -> dict:
127 assert self.name is not None
128 runcmd(["vgcreate", self.name, *[x.name for x in pvs if x.name is not None]], True)
129 res = self.get()
130 assert res is not None
131 return res
133 @override
134 def delete(self) -> None:
135 assert self.name is not None
136 runcmd(["vgremove", self.name, "-y"], True)
138 @override
139 def scan(self) -> list[dict]:
140 runcmd(["vgscan"], True)
141 return self.getlist()
143 def addpv(self, pv: PV):
144 assert self.name is not None
145 assert pv.name is not None
146 runcmd(["vgextend", self.name, pv.name], True)
148 def delpv(self, pv: PV):
149 assert self.name is not None
150 assert pv.name is not None
151 runcmd(["vgreduce", self.name, pv.name], True)
154class LV(Base):
155 def __init__(self, vgname: str, name: str | None = None):
156 super().__init__(name)
157 self.vgname = vgname
159 @property
160 def volname(self):
161 assert self.name is not None
162 return self.vgname + "/" + self.name
164 @override
165 def get(self) -> dict | None:
166 if self.name is None:
167 return None
168 return self.find_by(self.getlist(), "LV Name", self.name)
170 @override
171 def getlist(self) -> list[dict]:
172 return runparse(["lvdisplay", "--unit", "b", self.vgname], 2, 22)
174 @override
175 def create(self, size: int) -> dict:
176 assert self.name is not None
177 runcmd(["lvcreate", "--size", f"{size}b", self.vgname, "--name", self.name])
178 return dict(name=self.name, size=size, device=f"/dev/{self.vgname}/{self.name}")
180 def create_snapshot(self, size: int, parent: str) -> dict:
181 assert self.name is not None
182 runcmd(["lvcreate", "--snapshot", "--size", f"{size}b", "--name", self.name, f"/dev/{self.vgname}/{parent}"])
183 return dict(name=self.name, size=size, device=f"/dev/{self.vgname}/{self.name}")
185 def create_thinpool(self, size: int) -> dict:
186 assert self.name is not None
187 runcmd(["lvcreate", "--thinpool", self.name, "-L", f"{size}b", self.vgname])
188 return dict(name=self.name, size=size, device=f"/dev/{self.vgname}/{self.name}")
190 def create_thin(self, size: int, thinpool: str) -> dict:
191 assert self.name is not None
192 runcmd(["lvcreate", "--thin", "-V", f"{size}b", "-n", self.name, f"{self.vgname}/{thinpool}"])
193 return dict(name=self.name, size=size, device=f"/dev/{self.vgname}/{self.name}")
195 @override
196 def delete(self) -> None:
197 runcmd(["lvremove", self.volname])
199 @override
200 def scan(self) -> list[dict]:
201 runcmd(["lvscan"], True)
202 return self.getlist()
204 def volume_list(self):
205 vols = self.getlist()
206 res = []
207 for vol in vols:
208 created = datetime.datetime.strptime(
209 vol["LV Creation host, time"].split(",", 1)[-1].strip(), "%Y-%m-%d %H:%M:%S %z"
210 )
211 size = int(vol["LV Size"].removesuffix(" B"))
212 res.append(dict(name=vol["LV Name"], created=created.isoformat(), size=size, used=int(vol["# open"])))
213 return res
215 def volume_read(self):
216 vols = self.getlist()
217 for vol in vols:
218 if vol["LV Name"] != self.name:
219 continue
220 created = datetime.datetime.strptime(
221 vol["LV Creation host, time"].split(",", 1)[-1].strip(), "%Y-%m-%d %H:%M:%S %z"
222 )
223 size = int(vol["LV Size"].removesuffix(" B"))
224 return dict(name=vol["LV Name"], created=created.isoformat(), size=size, used=int(vol["# open"]))
225 return None
227 def volume_vol2path(self):
228 return f"/dev/{self.vgname}/{self.name}"
230 def volume_path2vol(self, name: str):
231 if not name.startswith(f"/dev/{self.vgname}/"): 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true
232 raise Exception(f"invalid format: {name}, vg={self.vgname}")
233 return name.removeprefix(f"/dev/{self.vgname}/")
235 def read_only(self, readonly: bool):
236 if readonly:
237 runcmd(["lvchange", "--permission", "r", self.volname])
238 else:
239 runcmd(["lvchange", "--permission", "rw", self.volname])
241 def resize(self, newsize: int):
242 assert self.name is not None
243 runcmd(["lvresize", "--size", str(newsize), self.volname])