Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import fnmatch
2import copy
3import braceexpand
4from typing import Generator, Tuple
5from logging import getLogger
7log = getLogger(__name__)
10class Input:
11 def __init__(self, ifn: str, **kwargs):
12 self.ifn = ifn
14 def walk(self) -> Generator[Tuple[str, int, float], None, None]:
15 raise NotImplementedError("walk")
17 def readfile(self, fn: str) -> str:
18 raise NotImplementedError("readfile")
21class Filter:
22 def __init__(self, **kwargs):
23 pass
25 def render(self, s: str, vals: dict) -> str:
26 raise NotImplementedError("render")
28 def renderfn(self, s: str, vals: dict) -> Generator[str, None, None]:
29 return braceexpand.braceexpand(self.render(s, vals))
31 def strtr(self, strng: str, replace: dict) -> str:
32 # https://stackoverflow.com/questions/10931150/phps-strtr-for-python
33 buffer = []
34 i, n = 0, len(strng)
35 while i < n:
36 match = False
37 for s, r in replace.items():
38 if strng[i:len(s) + i] == s:
39 buffer.append(r)
40 i = i + len(s)
41 match = True
42 break
43 if not match:
44 buffer.append(strng[i])
45 i = i + 1
46 return ''.join(buffer)
49class Output:
50 def __init__(self, ofn: str, **kwargs):
51 self.ofn = ofn
53 def writefile(self, fn: str, content: str, mode: int, ts: float = None):
54 raise NotImplementedError("writefile")
56 def finish(self):
57 log.debug("finished %s", self.ofn)
60class Pipeline:
61 def __init__(self, inp: Input, filt: Filter, outp: Output, passpat: str = None):
62 self.inp = inp
63 self.filt = filt
64 self.outp = outp
65 self.passpat = passpat
67 def render(self, vals: dict):
68 for fnpat, mode, ts in self.inp.walk():
69 log.debug("walk %s (%o)", fnpat, mode)
70 content = self.inp.readfile(fnpat)
71 for fn in self.filt.renderfn(fnpat, vals):
72 if self.passpat is not None and fnmatch.fnmatch(fn, self.passpat):
73 self.outp.writefile(fnpat, content, mode, ts)
74 else:
75 v = copy.deepcopy(vals)
76 v["fname"] = fn
77 ocont = self.filt.render(content, v)
78 log.debug("write %s", fn)
79 self.outp.writefile(fn, ocont, mode, ts)
80 self.outp.finish()