Coverage for cogapp / cogapp.py: 100.00%

381 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-01-25 07:14 -0500

1"""Cog content generation tool.""" 

2 

3import difflib 

4import glob 

5import io 

6import linecache 

7import os 

8import re 

9import shlex 

10import sys 

11import traceback 

12import types 

13 

14from .errors import ( 

15 CogCheckFailed, 

16 CogError, 

17 CogGeneratedError, 

18 CogUsageError, 

19 CogUserException, 

20) 

21from .options import CogOptions 

22from .whiteutils import common_prefix, reindent_block, white_prefix 

23from .utils import NumberedFileReader, Redirectable, change_dir, md5 

24from .hashhandler import HashHandler 

25 

26__version__ = "3.6.0" 

27 

28 

29class CogGenerator(Redirectable): 

30 """A generator pulled from a source file.""" 

31 

32 def __init__(self, options=None): 

33 super().__init__() 

34 self.markers = [] 

35 self.lines = [] 

36 self.options = options or CogOptions() 

37 

38 def parse_marker(self, line): 

39 self.markers.append(line) 

40 

41 def parse_line(self, line): 

42 self.lines.append(line.strip("\n")) 

43 

44 def get_code(self): 

45 """Extract the executable Python code from the generator.""" 

46 # If the markers and lines all have the same prefix 

47 # (end-of-line comment chars, for example), 

48 # then remove it from all the lines. 

49 pref_in = common_prefix(self.markers + self.lines) 

50 if pref_in: 

51 self.markers = [line.replace(pref_in, "", 1) for line in self.markers] 

52 self.lines = [line.replace(pref_in, "", 1) for line in self.lines] 

53 

54 return reindent_block(self.lines, "") 

55 

56 def evaluate(self, cog, globals, fname): 

57 # figure out the right whitespace prefix for the output 

58 pref_out = white_prefix(self.markers) 

59 

60 intext = self.get_code() 

61 if not intext: 

62 return "" 

63 

64 prologue = "import " + cog.cogmodulename + " as cog\n" 

65 if self.options.prologue: 

66 prologue += self.options.prologue + "\n" 

67 code = compile(prologue + intext, str(fname), "exec") 

68 

69 # Make sure the "cog" module has our state. 

70 cog.cogmodule.msg = self.msg 

71 cog.cogmodule.out = self.out 

72 cog.cogmodule.outl = self.outl 

73 cog.cogmodule.error = self.error 

74 

75 real_stdout = sys.stdout 

76 if self.options.print_output: 

77 sys.stdout = captured_stdout = io.StringIO() 

78 

79 self.outstring = "" 

80 try: 

81 eval(code, globals) 

82 except CogError: 

83 raise 

84 except: # noqa: E722 (we're just wrapping in CogUserException and rethrowing) 

85 typ, err, tb = sys.exc_info() 

86 frames = (tuple(fr) for fr in traceback.extract_tb(tb.tb_next)) 

87 frames = find_cog_source(frames, prologue) 

88 msg = "".join(traceback.format_list(frames)) 

89 msg += f"{typ.__name__}: {err}" 

90 raise CogUserException(msg) 

91 finally: 

92 sys.stdout = real_stdout 

93 

94 if self.options.print_output: 

95 self.outstring = captured_stdout.getvalue() 

96 

97 # We need to make sure that the last line in the output 

98 # ends with a newline, or it will be joined to the 

99 # end-output line, ruining cog's idempotency. 

100 if self.outstring and self.outstring[-1] != "\n": 

101 self.outstring += "\n" 

102 

103 return reindent_block(self.outstring, pref_out) 

104 

105 def msg(self, s): 

106 self.prout("Message: " + s) 

107 

108 def out(self, sOut="", dedent=False, trimblanklines=False): 

109 """The cog.out function.""" 

110 if trimblanklines and ("\n" in sOut): 

111 lines = sOut.split("\n") 

112 if lines[0].strip() == "": 

113 del lines[0] 

114 if lines and lines[-1].strip() == "": 

115 del lines[-1] 

116 sOut = "\n".join(lines) + "\n" 

117 if dedent: 

118 sOut = reindent_block(sOut) 

119 self.outstring += sOut 

120 

121 def outl(self, sOut="", **kw): 

122 """The cog.outl function.""" 

123 self.out(sOut, **kw) 

124 self.out("\n") 

125 

126 def error(self, msg="Error raised by cog generator."): 

127 """The cog.error function. 

128 

129 Instead of raising standard python errors, cog generators can use 

130 this function. It will display the error without a scary Python 

131 traceback. 

132 

133 """ 

134 raise CogGeneratedError(msg) 

135 

136 

137class Cog(Redirectable): 

138 """The Cog engine.""" 

139 

140 def __init__(self): 

141 super().__init__() 

142 self.options = CogOptions() 

143 self.cogmodulename = "cog" 

144 self.create_cog_module() 

145 self.check_failed = False 

146 self.hash_handler = None 

147 self._fix_end_output_patterns() 

148 

149 def _fix_end_output_patterns(self): 

150 self.hash_handler = HashHandler(self.options.end_output) 

151 

152 def show_warning(self, msg): 

153 self.prout(f"Warning: {msg}") 

154 

155 def is_begin_spec_line(self, s): 

156 return self.options.begin_spec in s 

157 

158 def is_end_spec_line(self, s): 

159 return self.options.end_spec in s and not self.is_end_output_line(s) 

160 

161 def is_end_output_line(self, s): 

162 return self.options.end_output in s 

163 

164 def create_cog_module(self): 

165 """Make a cog "module" object. 

166 

167 Imported Python modules can use "import cog" to get our state. 

168 

169 """ 

170 self.cogmodule = types.SimpleNamespace() 

171 self.cogmodule.path = [] 

172 

173 def open_output_file(self, fname): 

174 """Open an output file, taking all the details into account.""" 

175 opts = {} 

176 mode = "w" 

177 opts["encoding"] = self.options.encoding 

178 opts["newline"] = self.options.newline 

179 fdir = os.path.dirname(fname) 

180 if os.path.dirname(fdir) and not os.path.exists(fdir): 

181 os.makedirs(fdir) 

182 return open(fname, mode, **opts) 

183 

184 def open_input_file(self, fname): 

185 """Open an input file.""" 

186 if fname == "-": 

187 return sys.stdin 

188 else: 

189 return open(fname, encoding=self.options.encoding) 

190 

191 def process_file(self, file_in, file_out, fname=None, globals=None): 

192 """Process an input file object to an output file object. 

193 

194 `fileIn` and `fileOut` can be file objects, or file names. 

195 

196 """ 

197 file_name_in = fname or "" 

198 file_name_out = fname or "" 

199 file_in_to_close = file_out_to_close = None 

200 # Convert filenames to files. 

201 if isinstance(file_in, (bytes, str)): 

202 # Open the input file. 

203 file_name_in = file_in 

204 file_in = file_in_to_close = self.open_input_file(file_in) 

205 if isinstance(file_out, (bytes, str)): 

206 # Open the output file. 

207 file_name_out = file_out 

208 file_out = file_out_to_close = self.open_output_file(file_out) 

209 

210 start_dir = os.getcwd() 

211 

212 try: 

213 file_in = NumberedFileReader(file_in) 

214 

215 saw_cog = False 

216 

217 self.cogmodule.inFile = file_name_in 

218 self.cogmodule.outFile = file_name_out 

219 self.cogmodulename = "cog_" + md5(file_name_out.encode()).hexdigest() 

220 sys.modules[self.cogmodulename] = self.cogmodule 

221 # if "import cog" explicitly done in code by user, note threading will cause clashes. 

222 sys.modules["cog"] = self.cogmodule 

223 

224 # The globals dict we'll use for this file. 

225 if globals is None: 

226 globals = {} 

227 

228 # If there are any global defines, put them in the globals. 

229 globals.update(self.options.defines) 

230 

231 # loop over generator chunks 

232 line = file_in.readline() 

233 while line: 

234 # Find the next spec begin 

235 while line and not self.is_begin_spec_line(line): 

236 if self.is_end_spec_line(line): 

237 raise CogError( 

238 f"Unexpected {self.options.end_spec!r}", 

239 file=file_name_in, 

240 line=file_in.linenumber(), 

241 ) 

242 if self.is_end_output_line(line): 

243 raise CogError( 

244 f"Unexpected {self.options.end_output!r}", 

245 file=file_name_in, 

246 line=file_in.linenumber(), 

247 ) 

248 file_out.write(line) 

249 line = file_in.readline() 

250 if not line: 

251 break 

252 if not self.options.delete_code: 

253 file_out.write(line) 

254 

255 # `line` is the begin spec 

256 gen = CogGenerator(options=self.options) 

257 gen.set_output(stdout=self.stdout) 

258 gen.parse_marker(line) 

259 first_line_num = file_in.linenumber() 

260 self.cogmodule.firstLineNum = first_line_num 

261 

262 # If the spec begin is also a spec end, then process the single 

263 # line of code inside. 

264 if self.is_end_spec_line(line): 

265 beg = line.find(self.options.begin_spec) 

266 end = line.find(self.options.end_spec) 

267 if beg > end: 

268 raise CogError( 

269 "Cog code markers inverted", 

270 file=file_name_in, 

271 line=first_line_num, 

272 ) 

273 else: 

274 code = line[beg + len(self.options.begin_spec) : end].strip() 

275 gen.parse_line(code) 

276 else: 

277 # Deal with an ordinary code block. 

278 line = file_in.readline() 

279 

280 # Get all the lines in the spec 

281 while line and not self.is_end_spec_line(line): 

282 if self.is_begin_spec_line(line): 

283 raise CogError( 

284 f"Unexpected {self.options.begin_spec!r}", 

285 file=file_name_in, 

286 line=file_in.linenumber(), 

287 ) 

288 if self.is_end_output_line(line): 

289 raise CogError( 

290 f"Unexpected {self.options.end_output!r}", 

291 file=file_name_in, 

292 line=file_in.linenumber(), 

293 ) 

294 if not self.options.delete_code: 

295 file_out.write(line) 

296 gen.parse_line(line) 

297 line = file_in.readline() 

298 if not line: 

299 raise CogError( 

300 "Cog block begun but never ended.", 

301 file=file_name_in, 

302 line=first_line_num, 

303 ) 

304 

305 if not self.options.delete_code: 

306 file_out.write(line) 

307 gen.parse_marker(line) 

308 

309 line = file_in.readline() 

310 

311 # Eat all the lines in the output section. While reading past 

312 # them, compute the md5 hash of the old output. 

313 previous = [] 

314 while line and not self.is_end_output_line(line): 

315 if self.is_begin_spec_line(line): 

316 raise CogError( 

317 f"Unexpected {self.options.begin_spec!r}", 

318 file=file_name_in, 

319 line=file_in.linenumber(), 

320 ) 

321 if self.is_end_spec_line(line): 

322 raise CogError( 

323 f"Unexpected {self.options.end_spec!r}", 

324 file=file_name_in, 

325 line=file_in.linenumber(), 

326 ) 

327 previous.append(line) 

328 line = file_in.readline() 

329 cur_hash = self.hash_handler.compute_lines_hash(previous) 

330 

331 if not line and not self.options.eof_can_be_end: 

332 # We reached end of file before we found the end output line. 

333 raise CogError( 

334 f"Missing {self.options.end_output!r} before end of file.", 

335 file=file_name_in, 

336 line=file_in.linenumber(), 

337 ) 

338 

339 # Make the previous output available to the current code 

340 self.cogmodule.previous = "".join(previous) 

341 

342 # Write the output of the spec to be the new output if we're 

343 # supposed to generate code. 

344 if not self.options.no_generate: 

345 fname = f"<cog {file_name_in}:{first_line_num}>" 

346 gen = gen.evaluate(cog=self, globals=globals, fname=fname) 

347 gen = self.suffix_lines(gen) 

348 new_hash = self.hash_handler.compute_hash(gen) 

349 file_out.write(gen) 

350 else: 

351 new_hash = "" 

352 

353 saw_cog = True 

354 

355 # Write the ending output line 

356 if self.options.hash_output: 

357 try: 

358 self.hash_handler.validate_hash(line, cur_hash) 

359 except ValueError as e: 

360 raise CogError( 

361 str(e), 

362 file=file_name_in, 

363 line=file_in.linenumber(), 

364 ) 

365 line = self.hash_handler.format_end_line_with_hash( 

366 line, 

367 new_hash, 

368 add_hash=True, 

369 preserve_format=self.options.check, 

370 ) 

371 else: 

372 line = self.hash_handler.format_end_line_with_hash( 

373 line, new_hash, add_hash=False 

374 ) 

375 

376 if not self.options.delete_code: 

377 file_out.write(line) 

378 line = file_in.readline() 

379 

380 if not saw_cog and self.options.warn_empty: 

381 self.show_warning(f"no cog code found in {file_name_in}") 

382 finally: 

383 if file_in_to_close: 

384 file_in_to_close.close() 

385 if file_out_to_close: 

386 file_out_to_close.close() 

387 os.chdir(start_dir) 

388 

389 # A regex for non-empty lines, used by suffixLines. 

390 re_non_empty_lines = re.compile(r"^\s*\S+.*$", re.MULTILINE) 

391 

392 def suffix_lines(self, text): 

393 """Add suffixes to the lines in text, if our options desire it. 

394 

395 `text` is many lines, as a single string. 

396 

397 """ 

398 if self.options.suffix: 

399 # Find all non-blank lines, and add the suffix to the end. 

400 repl = r"\g<0>" + self.options.suffix.replace("\\", "\\\\") 

401 text = self.re_non_empty_lines.sub(repl, text) 

402 return text 

403 

404 def process_string(self, input, fname=None): 

405 """Process `input` as the text to cog. 

406 

407 Return the cogged output as a string. 

408 

409 """ 

410 file_old = io.StringIO(input) 

411 file_new = io.StringIO() 

412 self.process_file(file_old, file_new, fname=fname) 

413 return file_new.getvalue() 

414 

415 def replace_file(self, old_path, new_text): 

416 """Replace file oldPath with the contents newText""" 

417 if not os.access(old_path, os.W_OK): 

418 # Need to ensure we can write. 

419 if self.options.make_writable_cmd: 

420 # Use an external command to make the file writable. 

421 cmd = self.options.make_writable_cmd.replace("%s", old_path) 

422 with os.popen(cmd) as cmdout: 

423 self.stdout.write(cmdout.read()) 

424 if not os.access(old_path, os.W_OK): 

425 raise CogError(f"Couldn't make {old_path} writable") 

426 else: 

427 # Can't write! 

428 raise CogError(f"Can't overwrite {old_path}") 

429 f = self.open_output_file(old_path) 

430 f.write(new_text) 

431 f.close() 

432 

433 def save_include_path(self): 

434 self.saved_include = self.options.include_path[:] 

435 self.saved_sys_path = sys.path[:] 

436 

437 def restore_include_path(self): 

438 self.options.include_path = self.saved_include 

439 self.cogmodule.path = self.options.include_path 

440 sys.path = self.saved_sys_path 

441 

442 def add_to_include_path(self, include_path): 

443 self.cogmodule.path.extend(include_path) 

444 sys.path.extend(include_path) 

445 

446 def process_one_file(self, fname): 

447 """Process one filename through cog.""" 

448 

449 self.save_include_path() 

450 need_newline = False 

451 

452 try: 

453 self.add_to_include_path(self.options.include_path) 

454 # Since we know where the input file came from, 

455 # push its directory onto the include path. 

456 self.add_to_include_path([os.path.dirname(fname)]) 

457 

458 # How we process the file depends on where the output is going. 

459 if self.options.output_name: 

460 self.process_file(fname, self.options.output_name, fname) 

461 elif self.options.replace or self.options.check: 

462 # We want to replace the cog file with the output, 

463 # but only if they differ. 

464 verb = "Cogging" if self.options.replace else "Checking" 

465 if self.options.verbosity >= 2: 

466 self.prout(f"{verb} {fname}", end="") 

467 need_newline = True 

468 

469 try: 

470 file_old_file = self.open_input_file(fname) 

471 old_text = file_old_file.read() 

472 file_old_file.close() 

473 new_text = self.process_string(old_text, fname=fname) 

474 if old_text != new_text: 

475 if self.options.verbosity >= 1: 

476 if self.options.verbosity < 2: 

477 self.prout(f"{verb} {fname}", end="") 

478 self.prout(" (changed)") 

479 need_newline = False 

480 if self.options.replace: 

481 self.replace_file(fname, new_text) 

482 else: 

483 assert self.options.check 

484 self.check_failed = True 

485 if self.options.diff: 

486 old_lines = old_text.splitlines() 

487 new_lines = new_text.splitlines() 

488 diff = difflib.unified_diff( 

489 old_lines, 

490 new_lines, 

491 fromfile=f"current {fname}", 

492 tofile=f"changed {fname}", 

493 lineterm="", 

494 ) 

495 for diff_line in diff: 

496 self.prout(diff_line) 

497 finally: 

498 # The try-finally block is so we can print a partial line 

499 # with the name of the file, and print (changed) on the 

500 # same line, but also make sure to break the line before 

501 # any traceback. 

502 if need_newline: 

503 self.prout("") 

504 else: 

505 self.process_file(fname, self.stdout, fname) 

506 finally: 

507 self.restore_include_path() 

508 

509 def process_wildcards(self, fname): 

510 files = glob.glob(fname) 

511 if files: 

512 for matching_file in files: 

513 self.process_one_file(matching_file) 

514 else: 

515 self.process_one_file(fname) 

516 

517 def process_file_list(self, file_name_list): 

518 """Process the files in a file list.""" 

519 flist = self.open_input_file(file_name_list) 

520 lines = flist.readlines() 

521 flist.close() 

522 for line in lines: 

523 # Use shlex to parse the line like a shell. 

524 lex = shlex.shlex(line, posix=True) 

525 lex.whitespace_split = True 

526 lex.commenters = "#" 

527 # No escapes, so that backslash can be part of the path 

528 lex.escape = "" 

529 args = list(lex) 

530 if args: 

531 self.process_arguments(args) 

532 

533 def process_arguments(self, args): 

534 """Process one command-line.""" 

535 saved_options = self.options 

536 self.options = self.options.clone() 

537 

538 self.options.parse_args(args[1:]) 

539 

540 if args[0][0] == "@": 

541 if self.options.output_name: 

542 raise CogUsageError("Can't use -o with @file") 

543 self.process_file_list(args[0][1:]) 

544 elif args[0][0] == "&": 

545 if self.options.output_name: 

546 raise CogUsageError("Can't use -o with &file") 

547 file_list = args[0][1:] 

548 with change_dir(os.path.dirname(file_list)): 

549 self.process_file_list(os.path.basename(file_list)) 

550 else: 

551 self.process_wildcards(args[0]) 

552 

553 self.options = saved_options 

554 

555 def callable_main(self, argv): 

556 """All of command-line cog, but in a callable form. 

557 

558 This is used by main. `argv` is the equivalent of sys.argv. 

559 

560 """ 

561 argv = argv[1:] 

562 

563 # Provide help if asked for anywhere in the command line. 

564 if "-?" in argv or "-h" in argv or "--help" in argv: 

565 self.prerr(self.options.format_help(), end="") 

566 return 

567 

568 self.options.parse_args(argv) 

569 self._fix_end_output_patterns() 

570 

571 if self.options.show_version: 

572 self.prout(f"Cog version {__version__}") 

573 return 

574 

575 if self.options.args: 

576 for a in self.options.args: 

577 self.process_arguments([a]) 

578 else: 

579 raise CogUsageError("No files to process") 

580 

581 if self.check_failed: 

582 msg = "Check failed" 

583 if self.options.check_fail_msg: 

584 msg = f"{msg}: {self.options.check_fail_msg}" 

585 raise CogCheckFailed(msg) 

586 

587 def main(self, argv): 

588 """Handle the command-line execution for cog.""" 

589 

590 try: 

591 self.callable_main(argv) 

592 return 0 

593 except CogUsageError as err: 

594 self.prerr(err) 

595 self.prerr("(for help use --help)") 

596 return 2 

597 except CogGeneratedError as err: 

598 self.prerr(f"Error: {err}") 

599 return 3 

600 except CogUserException as err: 

601 self.prerr("Traceback (most recent call last):") 

602 self.prerr(err.args[0]) 

603 return 4 

604 except CogCheckFailed as err: 

605 self.prerr(err) 

606 return 5 

607 except CogError as err: 

608 self.prerr(err) 

609 return 1 

610 

611 

612def find_cog_source(frame_summary, prologue): 

613 """Find cog source lines in a frame summary list, for printing tracebacks. 

614 

615 Arguments: 

616 frame_summary: a list of 4-item tuples, as returned by traceback.extract_tb. 

617 prologue: the text of the code prologue. 

618 

619 Returns 

620 A list of 4-item tuples, updated to correct the cog entries. 

621 

622 """ 

623 prolines = prologue.splitlines() 

624 for filename, lineno, funcname, source in frame_summary: 

625 if not source: 

626 m = re.search(r"^<cog ([^:]+):(\d+)>$", filename) 

627 if m: 

628 if lineno <= len(prolines): 

629 filename = "<prologue>" 

630 source = prolines[lineno - 1] 

631 lineno -= ( 

632 1 # Because "import cog" is the first line in the prologue 

633 ) 

634 else: 

635 filename, coglineno = m.groups() 

636 coglineno = int(coglineno) 

637 lineno += coglineno - len(prolines) 

638 source = linecache.getline(filename, lineno).strip() 

639 yield filename, lineno, funcname, source 

640 

641 

642def main(): 

643 """Main function for entry_points to use.""" 

644 return Cog().main(sys.argv)