Coverage for volexport/lvm2.py: 64%

260 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-28 12:48 +0000

1import shlex 

2import datetime 

3import shutil 

4import string 

5import uuid 

6import json 

7from subprocess import CalledProcessError 

8from abc import abstractmethod 

9from .util import runcmd 

10from .config import config 

11from .exceptions import InvalidArgument 

12from logging import getLogger 

13from typing import override 

14 

15_log = getLogger(__name__) 

16 

17 

18def runparse_report(mode: str, filter: str | None = None) -> list[dict]: 

19 """Run LVM command and parse the output""" 

20 cmd = [ 

21 mode + "s", 

22 "-o", 

23 f"{mode}_all", 

24 "--reportformat", 

25 "json", 

26 "--unit", 

27 "b", 

28 "--nosuffix", 

29 ] 

30 if filter: 

31 cmd.extend(["-S", filter]) 

32 if config.LVM_BIN: 32 ↛ 33line 32 didn't jump to line 33 because the condition on line 32 was never true

33 cmd[0:0] = shlex.split(config.LVM_BIN) 

34 res0 = runcmd(cmd, root=True) 

35 res = [] 

36 for i in json.loads(res0.stdout).get("report", []): 

37 res.extend(i.get(mode, [])) 

38 return res 

39 

40 

41class Base: 

42 ACCEPT_CHARS = string.ascii_letters + string.digits + "-_" 

43 mode: str = "DUMMY" 

44 

45 def __init__(self, name: str | None = None): 

46 if name is not None and any(x not in self.ACCEPT_CHARS for x in name): 

47 raise ValueError(f"invalid name: {name}") 

48 self.name = name 

49 

50 def get(self) -> dict | None: 

51 """Get a single entry by name""" 

52 if self.name is None: 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true

53 return None 

54 res = runparse_report(mode=self.mode, filter=f'{self.mode}_name="{self.name}"') 

55 if len(res) == 0: 

56 return None 

57 return res[0] 

58 

59 def getlist(self) -> list[dict]: 

60 """Get a list of entries""" 

61 return runparse_report(mode=self.mode) 

62 

63 def find_by(self, data: list[dict], keyname: str, value: str): 

64 """Find an entry in a list of dictionaries by key and value""" 

65 for i in data: 

66 if i.get(keyname) == value: 

67 return i 

68 return None 

69 

70 @abstractmethod 

71 def create(self) -> dict: 

72 """Create a new entry""" 

73 raise NotImplementedError("create") 

74 

75 @abstractmethod 

76 def delete(self) -> None: 

77 """Delete an entry""" 

78 raise NotImplementedError("delete") 

79 

80 @abstractmethod 

81 def scan(self) -> list[dict]: 

82 """Scan for entries""" 

83 raise NotImplementedError("scan") 

84 

85 

86class PV(Base): 

87 """Class to manage physical volumes in LVM""" 

88 

89 mode = "pv" 

90 

91 @override 

92 def create(self) -> dict: 

93 assert self.name is not None 

94 runcmd(["pvcreate", self.name], True) 

95 res = self.get() 

96 assert res is not None 

97 return res 

98 

99 @override 

100 def delete(self) -> None: 

101 assert self.name is not None 

102 runcmd(["pvremove", self.name, "--yes"], True) 

103 

104 @override 

105 def scan(self) -> list[dict]: 

106 runcmd(["pvscan"], True) 

107 return self.getlist() 

108 

109 

110class VG(Base): 

111 """Class to manage volume groups in LVM""" 

112 

113 mode = "vg" 

114 

115 @override 

116 def create(self, pvs: list[PV]) -> dict: 

117 assert self.name is not None 

118 runcmd(["vgcreate", self.name, *[x.name for x in pvs if x.name is not None]], True) 

119 res = self.get() 

120 assert res is not None 

121 return res 

122 

123 @override 

124 def delete(self) -> None: 

125 assert self.name is not None 

126 runcmd(["vgremove", self.name, "--yes"], True) 

127 

128 @override 

129 def scan(self) -> list[dict]: 

130 runcmd(["vgscan"], True) 

131 return self.getlist() 

132 

133 def addpv(self, pv: PV): 

134 """Add a physical volume to the volume group""" 

135 assert self.name is not None 

136 assert pv.name is not None 

137 runcmd(["vgextend", self.name, pv.name], True) 

138 

139 def delpv(self, pv: PV): 

140 """Remove a physical volume from the volume group""" 

141 assert self.name is not None 

142 assert pv.name is not None 

143 runcmd(["vgreduce", self.name, pv.name], True) 

144 

145 

146class LV(Base): 

147 """Class to manage logical volumes in LVM""" 

148 

149 mode = "lv" 

150 nametag_prefix = "volname." 

151 

152 def __init__(self, vgname: str, name: str | None = None): 

153 super().__init__(name) 

154 self.vgname = vgname 

155 

156 @property 

157 def tagname(self): 

158 assert self.name is not None 

159 return self.nametag_prefix + self.name 

160 

161 @property 

162 def volname(self): 

163 assert self.name is not None 

164 info = self.get() 

165 if info is None: 165 ↛ 166line 165 didn't jump to line 166 because the condition on line 165 was never true

166 raise FileNotFoundError(f"volume does not exists: {self.name}") 

167 return info["lv_full_name"] 

168 

169 @override 

170 def get(self) -> dict | None: 

171 if self.name is None: 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true

172 return None 

173 res = runparse_report(mode=self.mode, filter=f"tags={self.tagname}") 

174 if len(res) == 1: 

175 return res[0] 

176 return None 

177 

178 def getbydev(self, devname) -> dict | None: 

179 res = runparse_report(mode=self.mode, filter=f"lv_path={devname}") 

180 if len(res) == 1: 180 ↛ 182line 180 didn't jump to line 182 because the condition on line 180 was always true

181 return res[0] 

182 return None 

183 

184 @override 

185 def getlist(self, volname: str | None = None) -> list[dict]: 

186 if volname: 186 ↛ 187line 186 didn't jump to line 187 because the condition on line 186 was never true

187 try: 

188 return runparse_report(mode=self.mode, filter=f"tags={self.tagname}") 

189 except CalledProcessError as e: 

190 if "Failed to find logical volume" in e.stderr: 

191 raise FileNotFoundError(f"volume does not exists: {volname}") 

192 return runparse_report(mode=self.mode) 

193 

194 @override 

195 def create(self, size: int) -> dict: 

196 assert self.name is not None 

197 name = str(uuid.uuid4()) 

198 try: 

199 runcmd( 

200 [ 

201 "lvcreate", 

202 "--size", 

203 f"{size}b", 

204 self.vgname, 

205 "--name", 

206 name, 

207 "--addtag", 

208 self.tagname, 

209 ] 

210 ) 

211 res = self.volume_read() 

212 assert res is not None 

213 return res 

214 except CalledProcessError as e: 

215 if e.returncode == 3 and "Size is not a multiple" in e.stderr: 

216 raise InvalidArgument(f"invalid size: {size}") 

217 raise 

218 

219 def create_snapshot(self, size: int, parent: str) -> dict | None: 

220 """Create a snapshot of a logical volume""" 

221 assert self.name is not None 

222 name = str(uuid.uuid4()) 

223 runcmd( 

224 [ 

225 "lvcreate", 

226 "--snapshot", 

227 "--size", 

228 f"{size}b", 

229 "--name", 

230 name, 

231 "--addtag", 

232 self.tagname, 

233 f"/dev/{self.vgname}/{parent}", 

234 ] 

235 ) 

236 return self.volume_read() 

237 

238 def create_thinpool(self, size: int) -> dict: 

239 """Create a thin pool logical volume""" 

240 assert self.name is not None 

241 runcmd(["lvcreate", "--thinpool", self.name, "--size", f"{size}b", self.vgname]) 

242 return dict(name=self.name, size=size, device=self.volume_vol2path()) 

243 

244 def create_thin(self, size: int, thinpool: str) -> dict | None: 

245 """Create a thin logical volume in a thin pool""" 

246 assert self.name is not None 

247 name = str(uuid.uuid4()) 

248 runcmd( 

249 [ 

250 "lvcreate", 

251 "--thin", 

252 "--virtualsize", 

253 f"{size}b", 

254 "--name", 

255 name, 

256 "--addtag", 

257 self.tagname, 

258 f"{self.vgname}/{thinpool}", 

259 ] 

260 ) 

261 return self.volume_read() 

262 

263 def create_thinsnap(self, parent: str) -> dict | None: 

264 """Create a snapshot volume in a thin pool""" 

265 assert self.name is not None 

266 name = str(uuid.uuid4()) 

267 runcmd( 

268 [ 

269 "lvcreate", 

270 "--snapshot", 

271 "--name", 

272 name, 

273 "--addtag", 

274 self.tagname, 

275 f"{self.vgname}/{parent}", 

276 ] 

277 ) 

278 runcmd(["lvchange", "--activate", "y", f"/dev/{self.vgname}/{self.name}", "--ignoreactivationskip"]) 

279 return self.volume_read() 

280 

281 def rollback_snapshot(self) -> dict | None: 

282 assert self.name is not None 

283 parent = self.get_parent() 

284 runcmd(["lvconvert", "--merge", self.volname]) 

285 return LV(self.vgname, parent).volume_read() 

286 

287 def get_parent(self): 

288 vol = self.get() 

289 if vol is None: 

290 return None 

291 res = vol["lv_parent"] 

292 if not res: 

293 return None 

294 return res 

295 

296 @override 

297 def delete(self) -> None: 

298 try: 

299 runcmd(["lvremove", self.volname, "--yes"]) 

300 except CalledProcessError as e: 

301 if e.returncode == 5 and "Failed to find" in e.stderr: 

302 pass 

303 else: 

304 raise 

305 

306 @override 

307 def scan(self) -> list[dict]: 

308 runcmd(["lvscan"], True) 

309 return self.getlist() 

310 

311 def vol2dict(self, vol: dict): 

312 created = datetime.datetime.strptime(vol["lv_time"], "%Y-%m-%d %H:%M:%S %z") 

313 if not vol["lv_path"]: 313 ↛ 315line 313 didn't jump to line 315 because the condition on line 313 was never true

314 # thin pool? no device 

315 _log.debug("no device: %s", vol["lv_name"]) 

316 return None 

317 if vol.get("lv_active") not in ("active",): 317 ↛ 319line 317 didn't jump to line 319 because the condition on line 317 was never true

318 # not available 

319 _log.debug("not active: %s", vol["lv_name"]) 

320 return None 

321 size = int(vol["lv_size"]) 

322 readonly = vol["lv_permissions"] != "writeable" 

323 parent = vol["origin"] 

324 thin = bool(vol["pool_lv"]) 

325 used = bool(vol["lv_device_open"]) 

326 tags = vol["lv_tags"] 

327 for tag in tags.split(","): 327 ↛ 332line 327 didn't jump to line 332 because the loop on line 327 didn't complete

328 if tag.startswith(self.nametag_prefix): 328 ↛ 327line 328 didn't jump to line 327 because the condition on line 328 was always true

329 name = tag.removeprefix(self.nametag_prefix) 

330 break 

331 else: 

332 name = vol["lv_name"] 

333 return dict( 

334 name=name, 

335 created=created.isoformat(), 

336 size=size, 

337 used=used, 

338 readonly=readonly, 

339 thin=thin, 

340 parent=parent, 

341 lvm_name=vol["lv_name"], 

342 lvm_id=vol["lv_uuid"], 

343 ) 

344 

345 def volume_list(self): 

346 """List all logical volumes in the volume group""" 

347 vols = self.getlist() 

348 res = [] 

349 for vol in vols: 

350 ent = self.vol2dict(vol) 

351 if ent: 351 ↛ 349line 351 didn't jump to line 349 because the condition on line 351 was always true

352 res.append(ent) 

353 return res 

354 

355 def volume_read(self): 

356 """Read details of a specific logical volume""" 

357 vol = self.get() 

358 if vol is None: 

359 return None 

360 return self.vol2dict(vol) 

361 

362 def volume_vol2path(self): 

363 """Convert volume name to device path""" 

364 return f"/dev/{self.volname}" 

365 

366 def volume_path2vol(self, name: str): 

367 """Convert device path to volume name""" 

368 if not name.startswith(f"/dev/{self.vgname}/"): 368 ↛ 369line 368 didn't jump to line 369 because the condition on line 368 was never true

369 raise Exception(f"invalid format: {name}, vg={self.vgname}") 

370 vol = self.getbydev(name) 

371 if vol is None: 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true

372 raise FileNotFoundError(f"volume does not exists: {name}") 

373 tags = vol["lv_tags"] 

374 for tag in tags.split(","): 374 ↛ 377line 374 didn't jump to line 377 because the loop on line 374 didn't complete

375 if tag.startswith(self.nametag_prefix): 375 ↛ 374line 375 didn't jump to line 374 because the condition on line 375 was always true

376 return tag.removeprefix(self.nametag_prefix) 

377 return None 

378 

379 def read_only(self, readonly: bool): 

380 """Set the logical volume to read-only or read-write""" 

381 if readonly: 381 ↛ 384line 381 didn't jump to line 384 because the condition on line 381 was always true

382 runcmd(["lvchange", "--permission", "r", self.volname]) 

383 else: 

384 runcmd(["lvchange", "--permission", "rw", self.volname]) 

385 

386 def resize(self, newsize: int): 

387 """Resize the logical volume to a new size in bytes""" 

388 assert self.name is not None 

389 runcmd(["lvresize", "--size", f"{newsize}b", self.volname, "--yes"]) 

390 

391 def format_volume(self, filesystem: str, label: str | None): 

392 """Format the logical volume to make filesystem""" 

393 if shutil.which(f"mkfs.{filesystem}") is None: 

394 _log.error("command does not found: mkfs.%s", filesystem) 

395 raise NotImplementedError("not supported") 

396 

397 volpath = self.volume_vol2path() 

398 if filesystem in ("ext4", "xfs", "exfat", "btrfs", "ntfs", "nilfs2"): 

399 lbl = ["-L", label or self.name] 

400 runcmd([f"mkfs.{filesystem}", *lbl, volpath]) 

401 elif filesystem in ("vfat",): 401 ↛ 405line 401 didn't jump to line 405 because the condition on line 401 was always true

402 lbl = ["-n", label or self.name] 

403 runcmd([f"mkfs.{filesystem}", *lbl, volpath]) 

404 else: 

405 _log.error("no such filesystem: %s", filesystem) 

406 raise NotImplementedError("not supported")