Coverage for volexport/lvm2.py: 64%
260 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-28 12:48 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-28 12:48 +0000
1import shlex
2import datetime
3import shutil
4import string
5import uuid
6import json
7from subprocess import CalledProcessError
8from abc import abstractmethod
9from .util import runcmd
10from .config import config
11from .exceptions import InvalidArgument
12from logging import getLogger
13from typing import override
15_log = getLogger(__name__)
18def runparse_report(mode: str, filter: str | None = None) -> list[dict]:
19 """Run LVM command and parse the output"""
20 cmd = [
21 mode + "s",
22 "-o",
23 f"{mode}_all",
24 "--reportformat",
25 "json",
26 "--unit",
27 "b",
28 "--nosuffix",
29 ]
30 if filter:
31 cmd.extend(["-S", filter])
32 if config.LVM_BIN: 32 ↛ 33line 32 didn't jump to line 33 because the condition on line 32 was never true
33 cmd[0:0] = shlex.split(config.LVM_BIN)
34 res0 = runcmd(cmd, root=True)
35 res = []
36 for i in json.loads(res0.stdout).get("report", []):
37 res.extend(i.get(mode, []))
38 return res
41class Base:
42 ACCEPT_CHARS = string.ascii_letters + string.digits + "-_"
43 mode: str = "DUMMY"
45 def __init__(self, name: str | None = None):
46 if name is not None and any(x not in self.ACCEPT_CHARS for x in name):
47 raise ValueError(f"invalid name: {name}")
48 self.name = name
50 def get(self) -> dict | None:
51 """Get a single entry by name"""
52 if self.name is None: 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true
53 return None
54 res = runparse_report(mode=self.mode, filter=f'{self.mode}_name="{self.name}"')
55 if len(res) == 0:
56 return None
57 return res[0]
59 def getlist(self) -> list[dict]:
60 """Get a list of entries"""
61 return runparse_report(mode=self.mode)
63 def find_by(self, data: list[dict], keyname: str, value: str):
64 """Find an entry in a list of dictionaries by key and value"""
65 for i in data:
66 if i.get(keyname) == value:
67 return i
68 return None
70 @abstractmethod
71 def create(self) -> dict:
72 """Create a new entry"""
73 raise NotImplementedError("create")
75 @abstractmethod
76 def delete(self) -> None:
77 """Delete an entry"""
78 raise NotImplementedError("delete")
80 @abstractmethod
81 def scan(self) -> list[dict]:
82 """Scan for entries"""
83 raise NotImplementedError("scan")
86class PV(Base):
87 """Class to manage physical volumes in LVM"""
89 mode = "pv"
91 @override
92 def create(self) -> dict:
93 assert self.name is not None
94 runcmd(["pvcreate", self.name], True)
95 res = self.get()
96 assert res is not None
97 return res
99 @override
100 def delete(self) -> None:
101 assert self.name is not None
102 runcmd(["pvremove", self.name, "--yes"], True)
104 @override
105 def scan(self) -> list[dict]:
106 runcmd(["pvscan"], True)
107 return self.getlist()
110class VG(Base):
111 """Class to manage volume groups in LVM"""
113 mode = "vg"
115 @override
116 def create(self, pvs: list[PV]) -> dict:
117 assert self.name is not None
118 runcmd(["vgcreate", self.name, *[x.name for x in pvs if x.name is not None]], True)
119 res = self.get()
120 assert res is not None
121 return res
123 @override
124 def delete(self) -> None:
125 assert self.name is not None
126 runcmd(["vgremove", self.name, "--yes"], True)
128 @override
129 def scan(self) -> list[dict]:
130 runcmd(["vgscan"], True)
131 return self.getlist()
133 def addpv(self, pv: PV):
134 """Add a physical volume to the volume group"""
135 assert self.name is not None
136 assert pv.name is not None
137 runcmd(["vgextend", self.name, pv.name], True)
139 def delpv(self, pv: PV):
140 """Remove a physical volume from the volume group"""
141 assert self.name is not None
142 assert pv.name is not None
143 runcmd(["vgreduce", self.name, pv.name], True)
146class LV(Base):
147 """Class to manage logical volumes in LVM"""
149 mode = "lv"
150 nametag_prefix = "volname."
152 def __init__(self, vgname: str, name: str | None = None):
153 super().__init__(name)
154 self.vgname = vgname
156 @property
157 def tagname(self):
158 assert self.name is not None
159 return self.nametag_prefix + self.name
161 @property
162 def volname(self):
163 assert self.name is not None
164 info = self.get()
165 if info is None: 165 ↛ 166line 165 didn't jump to line 166 because the condition on line 165 was never true
166 raise FileNotFoundError(f"volume does not exists: {self.name}")
167 return info["lv_full_name"]
169 @override
170 def get(self) -> dict | None:
171 if self.name is None: 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true
172 return None
173 res = runparse_report(mode=self.mode, filter=f"tags={self.tagname}")
174 if len(res) == 1:
175 return res[0]
176 return None
178 def getbydev(self, devname) -> dict | None:
179 res = runparse_report(mode=self.mode, filter=f"lv_path={devname}")
180 if len(res) == 1: 180 ↛ 182line 180 didn't jump to line 182 because the condition on line 180 was always true
181 return res[0]
182 return None
184 @override
185 def getlist(self, volname: str | None = None) -> list[dict]:
186 if volname: 186 ↛ 187line 186 didn't jump to line 187 because the condition on line 186 was never true
187 try:
188 return runparse_report(mode=self.mode, filter=f"tags={self.tagname}")
189 except CalledProcessError as e:
190 if "Failed to find logical volume" in e.stderr:
191 raise FileNotFoundError(f"volume does not exists: {volname}")
192 return runparse_report(mode=self.mode)
194 @override
195 def create(self, size: int) -> dict:
196 assert self.name is not None
197 name = str(uuid.uuid4())
198 try:
199 runcmd(
200 [
201 "lvcreate",
202 "--size",
203 f"{size}b",
204 self.vgname,
205 "--name",
206 name,
207 "--addtag",
208 self.tagname,
209 ]
210 )
211 res = self.volume_read()
212 assert res is not None
213 return res
214 except CalledProcessError as e:
215 if e.returncode == 3 and "Size is not a multiple" in e.stderr:
216 raise InvalidArgument(f"invalid size: {size}")
217 raise
219 def create_snapshot(self, size: int, parent: str) -> dict | None:
220 """Create a snapshot of a logical volume"""
221 assert self.name is not None
222 name = str(uuid.uuid4())
223 runcmd(
224 [
225 "lvcreate",
226 "--snapshot",
227 "--size",
228 f"{size}b",
229 "--name",
230 name,
231 "--addtag",
232 self.tagname,
233 f"/dev/{self.vgname}/{parent}",
234 ]
235 )
236 return self.volume_read()
238 def create_thinpool(self, size: int) -> dict:
239 """Create a thin pool logical volume"""
240 assert self.name is not None
241 runcmd(["lvcreate", "--thinpool", self.name, "--size", f"{size}b", self.vgname])
242 return dict(name=self.name, size=size, device=self.volume_vol2path())
244 def create_thin(self, size: int, thinpool: str) -> dict | None:
245 """Create a thin logical volume in a thin pool"""
246 assert self.name is not None
247 name = str(uuid.uuid4())
248 runcmd(
249 [
250 "lvcreate",
251 "--thin",
252 "--virtualsize",
253 f"{size}b",
254 "--name",
255 name,
256 "--addtag",
257 self.tagname,
258 f"{self.vgname}/{thinpool}",
259 ]
260 )
261 return self.volume_read()
263 def create_thinsnap(self, parent: str) -> dict | None:
264 """Create a snapshot volume in a thin pool"""
265 assert self.name is not None
266 name = str(uuid.uuid4())
267 runcmd(
268 [
269 "lvcreate",
270 "--snapshot",
271 "--name",
272 name,
273 "--addtag",
274 self.tagname,
275 f"{self.vgname}/{parent}",
276 ]
277 )
278 runcmd(["lvchange", "--activate", "y", f"/dev/{self.vgname}/{self.name}", "--ignoreactivationskip"])
279 return self.volume_read()
281 def rollback_snapshot(self) -> dict | None:
282 assert self.name is not None
283 parent = self.get_parent()
284 runcmd(["lvconvert", "--merge", self.volname])
285 return LV(self.vgname, parent).volume_read()
287 def get_parent(self):
288 vol = self.get()
289 if vol is None:
290 return None
291 res = vol["lv_parent"]
292 if not res:
293 return None
294 return res
296 @override
297 def delete(self) -> None:
298 try:
299 runcmd(["lvremove", self.volname, "--yes"])
300 except CalledProcessError as e:
301 if e.returncode == 5 and "Failed to find" in e.stderr:
302 pass
303 else:
304 raise
306 @override
307 def scan(self) -> list[dict]:
308 runcmd(["lvscan"], True)
309 return self.getlist()
311 def vol2dict(self, vol: dict):
312 created = datetime.datetime.strptime(vol["lv_time"], "%Y-%m-%d %H:%M:%S %z")
313 if not vol["lv_path"]: 313 ↛ 315line 313 didn't jump to line 315 because the condition on line 313 was never true
314 # thin pool? no device
315 _log.debug("no device: %s", vol["lv_name"])
316 return None
317 if vol.get("lv_active") not in ("active",): 317 ↛ 319line 317 didn't jump to line 319 because the condition on line 317 was never true
318 # not available
319 _log.debug("not active: %s", vol["lv_name"])
320 return None
321 size = int(vol["lv_size"])
322 readonly = vol["lv_permissions"] != "writeable"
323 parent = vol["origin"]
324 thin = bool(vol["pool_lv"])
325 used = bool(vol["lv_device_open"])
326 tags = vol["lv_tags"]
327 for tag in tags.split(","): 327 ↛ 332line 327 didn't jump to line 332 because the loop on line 327 didn't complete
328 if tag.startswith(self.nametag_prefix): 328 ↛ 327line 328 didn't jump to line 327 because the condition on line 328 was always true
329 name = tag.removeprefix(self.nametag_prefix)
330 break
331 else:
332 name = vol["lv_name"]
333 return dict(
334 name=name,
335 created=created.isoformat(),
336 size=size,
337 used=used,
338 readonly=readonly,
339 thin=thin,
340 parent=parent,
341 lvm_name=vol["lv_name"],
342 lvm_id=vol["lv_uuid"],
343 )
345 def volume_list(self):
346 """List all logical volumes in the volume group"""
347 vols = self.getlist()
348 res = []
349 for vol in vols:
350 ent = self.vol2dict(vol)
351 if ent: 351 ↛ 349line 351 didn't jump to line 349 because the condition on line 351 was always true
352 res.append(ent)
353 return res
355 def volume_read(self):
356 """Read details of a specific logical volume"""
357 vol = self.get()
358 if vol is None:
359 return None
360 return self.vol2dict(vol)
362 def volume_vol2path(self):
363 """Convert volume name to device path"""
364 return f"/dev/{self.volname}"
366 def volume_path2vol(self, name: str):
367 """Convert device path to volume name"""
368 if not name.startswith(f"/dev/{self.vgname}/"): 368 ↛ 369line 368 didn't jump to line 369 because the condition on line 368 was never true
369 raise Exception(f"invalid format: {name}, vg={self.vgname}")
370 vol = self.getbydev(name)
371 if vol is None: 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true
372 raise FileNotFoundError(f"volume does not exists: {name}")
373 tags = vol["lv_tags"]
374 for tag in tags.split(","): 374 ↛ 377line 374 didn't jump to line 377 because the loop on line 374 didn't complete
375 if tag.startswith(self.nametag_prefix): 375 ↛ 374line 375 didn't jump to line 374 because the condition on line 375 was always true
376 return tag.removeprefix(self.nametag_prefix)
377 return None
379 def read_only(self, readonly: bool):
380 """Set the logical volume to read-only or read-write"""
381 if readonly: 381 ↛ 384line 381 didn't jump to line 384 because the condition on line 381 was always true
382 runcmd(["lvchange", "--permission", "r", self.volname])
383 else:
384 runcmd(["lvchange", "--permission", "rw", self.volname])
386 def resize(self, newsize: int):
387 """Resize the logical volume to a new size in bytes"""
388 assert self.name is not None
389 runcmd(["lvresize", "--size", f"{newsize}b", self.volname, "--yes"])
391 def format_volume(self, filesystem: str, label: str | None):
392 """Format the logical volume to make filesystem"""
393 if shutil.which(f"mkfs.{filesystem}") is None:
394 _log.error("command does not found: mkfs.%s", filesystem)
395 raise NotImplementedError("not supported")
397 volpath = self.volume_vol2path()
398 if filesystem in ("ext4", "xfs", "exfat", "btrfs", "ntfs", "nilfs2"):
399 lbl = ["-L", label or self.name]
400 runcmd([f"mkfs.{filesystem}", *lbl, volpath])
401 elif filesystem in ("vfat",): 401 ↛ 405line 401 didn't jump to line 405 because the condition on line 401 was always true
402 lbl = ["-n", label or self.name]
403 runcmd([f"mkfs.{filesystem}", *lbl, volpath])
404 else:
405 _log.error("no such filesystem: %s", filesystem)
406 raise NotImplementedError("not supported")