Coverage for volexport/lvm2.py: 66%

194 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-20 14:19 +0000

1import shlex 

2import datetime 

3from abc import abstractmethod 

4from .util import runcmd 

5from .config import config 

6from logging import getLogger 

7from typing import Sequence, override 

8 

9_log = getLogger(__name__) 

10 

11 

12def parse(input: Sequence[str], indent: int, width: int) -> list[dict]: 

13 res = [] 

14 ent = {} 

15 for i in input: 

16 if len(i) < indent: 

17 _log.debug("short line: %s", i) 

18 continue 

19 if not i.startswith(" " * indent): 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true

20 _log.debug("no indent: %s", i) 

21 continue 

22 if i[indent : indent + 3] == "---": 

23 _log.debug("separator: %s", i) 

24 if ent: 

25 _log.debug("append: %s", i) 

26 res.append(ent) 

27 ent = {} 

28 continue 

29 if len(i) <= indent + width: 

30 _log.debug("short width: %s", i) 

31 continue 

32 if i[indent + width] != " ": 32 ↛ 33line 32 didn't jump to line 33 because the condition on line 32 was never true

33 _log.debug("not split: %s / %s", repr(i[indent + width]), i) 

34 continue 

35 name = i[indent : indent + width].strip() 

36 val = i[indent + width + 1 :].strip() 

37 ent[name] = val 

38 if ent: 

39 res.append(ent) 

40 return res 

41 

42 

43def runparse(cmd: list[str], indent: int, width: int) -> list[dict]: 

44 if config.LVM_BIN: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true

45 cmd[0:0] = shlex.split(config.LVM_BIN) 

46 res = runcmd(cmd, root=True) 

47 return parse(res.stdout.splitlines(keepends=False), indent, width) 

48 

49 

50class Base: 

51 def __init__(self, name: str | None = None): 

52 self.name = name 

53 

54 def find_by(self, data: list[dict], keyname: str, value: str): 

55 for i in data: 

56 if i.get(keyname) == value: 

57 return i 

58 return None 

59 

60 @abstractmethod 

61 def get(self) -> dict | None: 

62 raise NotImplementedError("get") 

63 

64 @abstractmethod 

65 def getlist(self) -> list[dict]: 

66 raise NotImplementedError("list") 

67 

68 @abstractmethod 

69 def create(self) -> dict: 

70 raise NotImplementedError("create") 

71 

72 @abstractmethod 

73 def delete(self) -> None: 

74 raise NotImplementedError("delete") 

75 

76 @abstractmethod 

77 def scan(self) -> list[dict]: 

78 raise NotImplementedError("scan") 

79 

80 

81class PV(Base): 

82 @override 

83 def get(self) -> dict | None: 

84 if self.name is None: 

85 return None 

86 return self.find_by(self.getlist(), "PV Name", self.name) 

87 

88 @override 

89 def getlist(self) -> list[dict]: 

90 return runparse(["pvdisplay", "--unit", "b"], 2, 21) 

91 

92 @override 

93 def create(self) -> dict: 

94 assert self.name is not None 

95 runcmd(["pvcreate", self.name], True) 

96 res = self.get() 

97 assert res is not None 

98 return res 

99 

100 @override 

101 def delete(self) -> None: 

102 assert self.name is not None 

103 runcmd(["pvremove", self.name, "-y"], True) 

104 

105 @override 

106 def scan(self) -> list[dict]: 

107 runcmd(["pvscan"], True) 

108 return self.getlist() 

109 

110 

111class VG(Base): 

112 @override 

113 def get(self) -> dict | None: 

114 if self.name is None: 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true

115 return None 

116 res = runparse(["vgdisplay", "--unit", "b", self.name], indent=2, width=21) 

117 if len(res) == 0: 

118 return None 

119 return res[0] 

120 

121 @override 

122 def getlist(self) -> list[dict]: 

123 return runparse(["vgdisplay", "--unit", "b"], 2, 21) 

124 

125 @override 

126 def create(self, pvs: list[PV]) -> dict: 

127 assert self.name is not None 

128 runcmd(["vgcreate", self.name, *[x.name for x in pvs if x.name is not None]], True) 

129 res = self.get() 

130 assert res is not None 

131 return res 

132 

133 @override 

134 def delete(self) -> None: 

135 assert self.name is not None 

136 runcmd(["vgremove", self.name, "-y"], True) 

137 

138 @override 

139 def scan(self) -> list[dict]: 

140 runcmd(["vgscan"], True) 

141 return self.getlist() 

142 

143 def addpv(self, pv: PV): 

144 assert self.name is not None 

145 assert pv.name is not None 

146 runcmd(["vgextend", self.name, pv.name], True) 

147 

148 def delpv(self, pv: PV): 

149 assert self.name is not None 

150 assert pv.name is not None 

151 runcmd(["vgreduce", self.name, pv.name], True) 

152 

153 

154class LV(Base): 

155 def __init__(self, vgname: str, name: str | None = None): 

156 super().__init__(name) 

157 self.vgname = vgname 

158 

159 @property 

160 def volname(self): 

161 assert self.name is not None 

162 return self.vgname + "/" + self.name 

163 

164 @override 

165 def get(self) -> dict | None: 

166 if self.name is None: 

167 return None 

168 return self.find_by(self.getlist(), "LV Name", self.name) 

169 

170 @override 

171 def getlist(self) -> list[dict]: 

172 return runparse(["lvdisplay", "--unit", "b", self.vgname], 2, 22) 

173 

174 @override 

175 def create(self, size: int) -> dict: 

176 assert self.name is not None 

177 runcmd(["lvcreate", "--size", f"{size}b", self.vgname, "--name", self.name]) 

178 return dict(name=self.name, size=size, device=f"/dev/{self.vgname}/{self.name}") 

179 

180 def create_snapshot(self, size: int, parent: str) -> dict: 

181 assert self.name is not None 

182 runcmd(["lvcreate", "--snapshot", "--size", f"{size}b", "--name", self.name, f"/dev/{self.vgname}/{parent}"]) 

183 return dict(name=self.name, size=size, device=f"/dev/{self.vgname}/{self.name}") 

184 

185 def create_thinpool(self, size: int) -> dict: 

186 assert self.name is not None 

187 runcmd(["lvcreate", "--thinpool", self.name, "-L", f"{size}b", self.vgname]) 

188 return dict(name=self.name, size=size, device=f"/dev/{self.vgname}/{self.name}") 

189 

190 def create_thin(self, size: int, thinpool: str) -> dict: 

191 assert self.name is not None 

192 runcmd(["lvcreate", "--thin", "-V", f"{size}b", "-n", self.name, f"{self.vgname}/{thinpool}"]) 

193 return dict(name=self.name, size=size, device=f"/dev/{self.vgname}/{self.name}") 

194 

195 @override 

196 def delete(self) -> None: 

197 runcmd(["lvremove", self.volname]) 

198 

199 @override 

200 def scan(self) -> list[dict]: 

201 runcmd(["lvscan"], True) 

202 return self.getlist() 

203 

204 def volume_list(self): 

205 vols = self.getlist() 

206 res = [] 

207 for vol in vols: 

208 created = datetime.datetime.strptime( 

209 vol["LV Creation host, time"].split(",", 1)[-1].strip(), "%Y-%m-%d %H:%M:%S %z" 

210 ) 

211 size = int(vol["LV Size"].removesuffix(" B")) 

212 res.append(dict(name=vol["LV Name"], created=created.isoformat(), size=size, used=int(vol["# open"]))) 

213 return res 

214 

215 def volume_read(self): 

216 vols = self.getlist() 

217 for vol in vols: 

218 if vol["LV Name"] != self.name: 

219 continue 

220 created = datetime.datetime.strptime( 

221 vol["LV Creation host, time"].split(",", 1)[-1].strip(), "%Y-%m-%d %H:%M:%S %z" 

222 ) 

223 size = int(vol["LV Size"].removesuffix(" B")) 

224 return dict(name=vol["LV Name"], created=created.isoformat(), size=size, used=int(vol["# open"])) 

225 return None 

226 

227 def volume_vol2path(self): 

228 return f"/dev/{self.vgname}/{self.name}" 

229 

230 def volume_path2vol(self, name: str): 

231 if not name.startswith(f"/dev/{self.vgname}/"): 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true

232 raise Exception(f"invalid format: {name}, vg={self.vgname}") 

233 return name.removeprefix(f"/dev/{self.vgname}/") 

234 

235 def read_only(self, readonly: bool): 

236 if readonly: 

237 runcmd(["lvchange", "--permission", "r", self.volname]) 

238 else: 

239 runcmd(["lvchange", "--permission", "rw", self.volname]) 

240 

241 def resize(self, newsize: int): 

242 assert self.name is not None 

243 runcmd(["lvresize", "--size", str(newsize), self.volname])