Coverage for editrest/main.py: 90%

184 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-10-01 12:16 +0000

1import click 

2import io 

3import editor 

4import requests 

5import json 

6import yaml 

7import toml 

8import pprint 

9import ast 

10import functools 

11import jsonpatch 

12from unittest.mock import patch 

13from logging import getLogger 

14from typing import Optional 

15from .version import VERSION 

16 

17_log = getLogger(__name__) 

18 

19 

20class NotChanged(Exception): 

21 pass 

22 

23 

24@click.version_option(version=VERSION, prog_name="editrest") 

25@click.group(invoke_without_command=True) 

26@click.pass_context 

27def cli(ctx): 

28 """REST read-modify-write""" 

29 if ctx.invoked_subcommand is None: 29 ↛ 30line 29 didn't jump to line 30, because the condition on line 29 was never true

30 click.echo(ctx.get_help()) 

31 

32 

33def parse(b: bytes | str, format: str = "json"): 

34 if isinstance(b, str): 

35 b = b.encode("utf-8") 

36 if format == "json": 

37 return json.loads(b) 

38 elif format == "yaml": 

39 return yaml.safe_load(io.BytesIO(b)) 

40 elif format == "toml": 

41 return toml.loads(b.decode()) 

42 elif format == "pprint": 

43 return ast.literal_eval(b.decode()) 

44 elif format == "raw": 

45 return b 

46 raise NotImplementedError(f"invalid format {format}") 

47 

48 

49def encode(d, format: str = "json") -> bytes: 

50 if format == "json": 

51 return json.dumps(d, ensure_ascii=False, sort_keys=True, indent=2) 

52 elif format == "yaml": 

53 out = io.BytesIO() 

54 yaml.safe_dump(d, out, default_flow_style=False, encoding="utf-8") 

55 return out.getvalue() 

56 elif format == "toml": 

57 return toml.dumps(d).encode() 

58 elif format == "pprint": 

59 return pprint.pformat(d) 

60 elif format == "raw": 

61 return d 

62 raise NotImplementedError(f"invalid format {format}") 

63 

64 

65def do1(url: str, read_method: str = "GET", write_method: str = "PUT", 

66 format: str = "json", session: Optional[requests.Session] = None, 

67 dry: bool = False) -> requests.Response: 

68 if session is None: 68 ↛ 69line 68 didn't jump to line 69, because the condition on line 68 was never true

69 session = requests.Session() 

70 old_res = session.request(read_method, url) 

71 _log.debug("response %s, headers=%s", old_res, str(old_res.headers)) 

72 old_res.raise_for_status() 

73 try: 

74 data = old_res.json() 

75 except ValueError: 

76 data = old_res.content 

77 new_msg = editor.edit(contents=encode(data, format), suffix=f".{format}") 

78 new_data = parse(new_msg, format) 

79 if data == new_data: 

80 raise NotChanged("data not changed") 

81 p = jsonpatch.make_patch(data, new_data) 

82 click.echo(f"change: {p.patch}") 

83 if not dry: 

84 click.confirm('Do you want to continue?', abort=True) 

85 if isinstance(new_data, (bytes, str)): 85 ↛ 86line 85 didn't jump to line 86, because the condition on line 85 was never true

86 res = session.request(write_method, url, data=new_data) 

87 else: 

88 res = session.request(write_method, url, json=new_data) 

89 _log.debug("response %s, headers=%s", res, str(res.headers)) 

90 try: 

91 out = res.json() 

92 click.echo( 

93 f"{write_method} {url} {new_data} -> {res.status_code} {out}") 

94 except Exception: 

95 click.echo( 

96 f"{write_method} {url} {new_data} -> {res.status_code} {res.content}") 

97 res.raise_for_status() 

98 return res 

99 click.echo(f"(dry) {write_method} {encode(new_data, 'json')}") 

100 return None 

101 

102 

103def base_options(func): 

104 @click.option("--format", default="json", type=click.Choice(["json", "yaml", "toml", "pprint"]), show_default=True) 

105 @click.option("--dry/--no-dry", default=False, show_default=True) 

106 @click.option("--user", "-u", help="user:password") 

107 @click.option("--bearer", help="bearer token") 

108 @click.option("--insecure/--verify", "-k", default=False, show_default=True) 

109 @click.option("--content-type", default="application/json", show_default=True) 

110 @click.option("--accept", default="application/json", show_default=True) 

111 @click.option("--headers", "-H", multiple=True, help="'Header: value'") 

112 @click.option("--params", multiple=True, help="param=value") 

113 @click.option("--verbose/--quiet", default=None) 

114 @click.option("--proxy", "-x", help="http/https proxy") 

115 @click.option("--cacert", type=click.Path(exists=True, file_okay=True, dir_okay=False), 

116 help="CA root certificate") 

117 @click.option("--cert", type=click.Path(exists=True, file_okay=True, dir_okay=False), 

118 help="mTLS client side certificate") 

119 @click.option("--resolve", multiple=True, help="hostname:port:ipaddress") 

120 @click.option("--location", "-L", type=bool) 

121 @click.argument("url") 

122 @functools.wraps(func) 

123 def _(url, format, dry, user, bearer, insecure, content_type, accept, 

124 headers, verbose, params, proxy, cacert, cert, resolve, location, 

125 *args, **kwargs): 

126 import logging 

127 fmt = "%(asctime)s %(levelname)s %(name)s %(message)s" 

128 if verbose is None: 

129 logging.basicConfig(format=fmt, level="INFO") 

130 elif verbose: 

131 logging.basicConfig(format=fmt, level="DEBUG") 

132 else: 

133 logging.basicConfig(format=fmt, level="WARNING") 

134 session = requests.Session() 

135 session.verify = not insecure 

136 if not location: 136 ↛ 138line 136 didn't jump to line 138, because the condition on line 136 was never false

137 session.max_redirects = 0 

138 if cacert: 

139 session.verify = cacert 

140 session.headers["content-type"] = content_type 

141 session.headers["accept"] = accept 

142 for h in headers: 142 ↛ 143line 142 didn't jump to line 143, because the loop on line 142 never started

143 k, v = h.split(":", 1) 

144 session.headers[k] = v.strip() 

145 if user: 145 ↛ 146line 145 didn't jump to line 146, because the condition on line 145 was never true

146 session.auth = user.split(":", 1) 

147 if bearer: 147 ↛ 148line 147 didn't jump to line 148, because the condition on line 147 was never true

148 session.headers["Authorization"] = f"Bearer {bearer}" 

149 if proxy: 149 ↛ 150line 149 didn't jump to line 150, because the condition on line 149 was never true

150 session.proxies.update({"http": proxy, "https": proxy}) 

151 if cert: 151 ↛ 152line 151 didn't jump to line 152, because the condition on line 151 was never true

152 session.cert = cert 

153 for p in params: 153 ↛ 154line 153 didn't jump to line 154, because the loop on line 153 never started

154 k, v = p.split("=", 1) 

155 session.params[k] = v 

156 if resolve: 

157 resolve_map = {} 

158 for resolve1 in resolve: 

159 ll = resolve1.split(":", 2) 

160 if len(ll) == 3: 

161 k = (ll[0], int(ll[1])) 

162 v = ll[2] 

163 elif len(ll) == 2: 163 ↛ 167line 163 didn't jump to line 167, because the condition on line 163 was never false

164 k = ll[0] 

165 v = ll[1] 

166 else: 

167 raise Exception(r"invalid resolve option: {l}") 

168 resolve_map[k] = v 

169 # https://stackoverflow.com/questions/22609385/python-requests-library-define-specific-dns 

170 from urllib3.util import connection 

171 _orig_create_connection = connection.create_connection 

172 

173 def patched_create_connection(address, *args, **kwargs): 

174 host, port = address 

175 if (host, port) in resolve_map: 

176 hostname = resolve_map.get((host, port)) 

177 _log.debug("hostport resolve %s -> %s", address, hostname) 

178 elif host in resolve_map: 178 ↛ 182line 178 didn't jump to line 182, because the condition on line 178 was never false

179 hostname = resolve_map.get(host) 

180 _log.debug("host resolve %s -> %s", address, hostname) 

181 else: 

182 hostname = host 

183 _log.debug("raw resolve %s -> %s", address, hostname) 

184 return _orig_create_connection((hostname, port), *args, **kwargs) 

185 with patch.object(connection, "create_connection", side_effect=patched_create_connection): 

186 return func(url=url, format=format, dry=dry, session=session, *args, **kwargs) 

187 return func(url=url, format=format, dry=dry, session=session, *args, **kwargs) 

188 return _ 

189 

190 

191@cli.command() 

192@base_options 

193def get_put(url, format, dry, session): 

194 """GET url and PUT""" 

195 do1(url=url, read_method="GET", write_method="PUT", 

196 format=format, dry=dry, session=session) 

197 

198 

199@cli.command() 

200@base_options 

201def get_delete(url, format, dry, session): 

202 """GET url and DELETE""" 

203 do1(url=url, read_method="GET", write_method="DELETE", 

204 format=format, dry=dry, session=session) 

205 

206 

207@cli.command() 

208@base_options 

209def get_post(url, format, dry, session): 

210 """GET url and POST""" 

211 do1(url=url, read_method="GET", write_method="POST", 

212 format=format, dry=dry, session=session) 

213 

214 

215@cli.command() 

216@base_options 

217def get_patch(url, format, dry, session): 

218 """GET url and PATCH""" 

219 do1(url=url, read_method="GET", write_method="PATCH", 

220 format=format, dry=dry, session=session) 

221 

222 

223@cli.command() 

224@base_options 

225@click.option("--read-method", default="GET", show_default=True) 

226@click.option("--write-method", default="PUT", show_default=True) 

227def run(url, format, dry, session, read_method, write_method): 

228 """{read-method} url and {write-method}""" 

229 do1(url=url, read_method=read_method, write_method=write_method, 

230 format=format, dry=dry, session=session) 

231 

232 

233if __name__ == "__main__": 233 ↛ 234line 233 didn't jump to line 234, because the condition on line 233 was never true

234 cli()