Coverage for cogapp / cogapp.py: 50.29%

379 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-09 06:46 -0500

1"""Cog content generation tool.""" 

2 

3import difflib 

4import glob 

5import io 

6import linecache 

7import os 

8import re 

9import shlex 

10import sys 

11import traceback 

12import types 

13 

14from .errors import ( 

15 CogCheckFailed, 

16 CogError, 

17 CogGeneratedError, 

18 CogUsageError, 

19 CogUserException, 

20) 

21from .options import CogOptions 

22from .whiteutils import common_prefix, reindent_block, white_prefix 

23from .utils import NumberedFileReader, Redirectable, change_dir, md5 

24from .hashhandler import HashHandler 

25 

26__version__ = "3.6.0" 

27 

28 

29class CogGenerator(Redirectable): 

30 """A generator pulled from a source file.""" 

31 

32 def __init__(self, options=None): 

33 super().__init__() 

34 self.markers = [] 

35 self.lines = [] 

36 self.options = options or CogOptions() 

37 

38 def parse_marker(self, line): 

39 self.markers.append(line) 

40 

41 def parse_line(self, line): 

42 self.lines.append(line.strip("\n")) 

43 

44 def get_code(self): 

45 """Extract the executable Python code from the generator.""" 

46 # If the markers and lines all have the same prefix 

47 # (end-of-line comment chars, for example), 

48 # then remove it from all the lines. 

49 pref_in = common_prefix(self.markers + self.lines) 

50 if pref_in: 

51 self.markers = [line.replace(pref_in, "", 1) for line in self.markers] 

52 self.lines = [line.replace(pref_in, "", 1) for line in self.lines] 

53 

54 return reindent_block(self.lines, "") 

55 

56 def evaluate(self, cog, globals, fname): 

57 # figure out the right whitespace prefix for the output 

58 pref_out = white_prefix(self.markers) 

59 

60 intext = self.get_code() 

61 if not intext: 

62 return "" 

63 

64 prologue = "import " + cog.cogmodulename + " as cog\n" 

65 if self.options.prologue: 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true

66 prologue += self.options.prologue + "\n" 

67 code = compile(prologue + intext, str(fname), "exec") 

68 

69 # Make sure the "cog" module has our state. 

70 cog.cogmodule.msg = self.msg 

71 cog.cogmodule.out = self.out 

72 cog.cogmodule.outl = self.outl 

73 cog.cogmodule.error = self.error 

74 

75 real_stdout = sys.stdout 

76 if self.options.print_output: 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true

77 sys.stdout = captured_stdout = io.StringIO() 

78 

79 self.outstring = "" 

80 try: 

81 eval(code, globals) 

82 except CogError: 

83 raise 

84 except: # noqa: E722 (we're just wrapping in CogUserException and rethrowing) 

85 typ, err, tb = sys.exc_info() 

86 frames = (tuple(fr) for fr in traceback.extract_tb(tb.tb_next)) 

87 frames = find_cog_source(frames, prologue) 

88 msg = "".join(traceback.format_list(frames)) 

89 msg += f"{typ.__name__}: {err}" 

90 raise CogUserException(msg) 

91 finally: 

92 sys.stdout = real_stdout 

93 

94 if self.options.print_output: 94 ↛ 95line 94 didn't jump to line 95 because the condition on line 94 was never true

95 self.outstring = captured_stdout.getvalue() 

96 

97 # We need to make sure that the last line in the output 

98 # ends with a newline, or it will be joined to the 

99 # end-output line, ruining cog's idempotency. 

100 if self.outstring and self.outstring[-1] != "\n": 

101 self.outstring += "\n" 

102 

103 return reindent_block(self.outstring, pref_out) 

104 

105 def msg(self, s): 

106 self.prout("Message: " + s) 

107 

108 def out(self, sOut="", dedent=False, trimblanklines=False): 

109 """The cog.out function.""" 

110 if trimblanklines and ("\n" in sOut): 

111 lines = sOut.split("\n") 

112 if lines[0].strip() == "": 

113 del lines[0] 

114 if lines and lines[-1].strip() == "": 

115 del lines[-1] 

116 sOut = "\n".join(lines) + "\n" 

117 if dedent: 

118 sOut = reindent_block(sOut) 

119 self.outstring += sOut 

120 

121 def outl(self, sOut="", **kw): 

122 """The cog.outl function.""" 

123 self.out(sOut, **kw) 

124 self.out("\n") 

125 

126 def error(self, msg="Error raised by cog generator."): 

127 """The cog.error function. 

128 

129 Instead of raising standard python errors, cog generators can use 

130 this function. It will display the error without a scary Python 

131 traceback. 

132 

133 """ 

134 raise CogGeneratedError(msg) 

135 

136 

137class Cog(Redirectable): 

138 """The Cog engine.""" 

139 

140 def __init__(self): 

141 super().__init__() 

142 self.options = CogOptions() 

143 self.cogmodulename = "cog" 

144 self.create_cog_module() 

145 self.check_failed = False 

146 self.hash_handler = None 

147 self._fix_end_output_patterns() 

148 

149 def _fix_end_output_patterns(self): 

150 self.hash_handler = HashHandler(self.options.end_output) 

151 

152 def show_warning(self, msg): 

153 self.prout(f"Warning: {msg}") 

154 

155 def is_begin_spec_line(self, s): 

156 return self.options.begin_spec in s 

157 

158 def is_end_spec_line(self, s): 

159 return self.options.end_spec in s and not self.is_end_output_line(s) 

160 

161 def is_end_output_line(self, s): 

162 return self.options.end_output in s 

163 

164 def create_cog_module(self): 

165 """Make a cog "module" object. 

166 

167 Imported Python modules can use "import cog" to get our state. 

168 

169 """ 

170 self.cogmodule = types.SimpleNamespace() 

171 self.cogmodule.path = [] 

172 

173 def open_output_file(self, fname): 

174 """Open an output file, taking all the details into account.""" 

175 opts = {} 

176 mode = "w" 

177 opts["encoding"] = self.options.encoding 

178 opts["newline"] = self.options.newline 

179 fdir = os.path.dirname(fname) 

180 if os.path.dirname(fdir) and not os.path.exists(fdir): 

181 os.makedirs(fdir) 

182 return open(fname, mode, **opts) 

183 

184 def open_input_file(self, fname): 

185 """Open an input file.""" 

186 if fname == "-": 

187 return sys.stdin 

188 else: 

189 return open(fname, encoding=self.options.encoding) 

190 

191 def process_file(self, file_in, file_out, fname=None, globals=None): 

192 """Process an input file object to an output file object. 

193 

194 `fileIn` and `fileOut` can be file objects, or file names. 

195 

196 """ 

197 file_name_in = fname or "" 

198 file_name_out = fname or "" 

199 file_in_to_close = file_out_to_close = None 

200 # Convert filenames to files. 

201 if isinstance(file_in, (bytes, str)): 201 ↛ 203line 201 didn't jump to line 203 because the condition on line 201 was never true

202 # Open the input file. 

203 file_name_in = file_in 

204 file_in = file_in_to_close = self.open_input_file(file_in) 

205 if isinstance(file_out, (bytes, str)): 205 ↛ 207line 205 didn't jump to line 207 because the condition on line 205 was never true

206 # Open the output file. 

207 file_name_out = file_out 

208 file_out = file_out_to_close = self.open_output_file(file_out) 

209 

210 start_dir = os.getcwd() 

211 

212 try: 

213 file_in = NumberedFileReader(file_in) 

214 

215 saw_cog = False 

216 

217 self.cogmodule.inFile = file_name_in 

218 self.cogmodule.outFile = file_name_out 

219 self.cogmodulename = "cog_" + md5(file_name_out.encode()).hexdigest() 

220 sys.modules[self.cogmodulename] = self.cogmodule 

221 # if "import cog" explicitly done in code by user, note threading will cause clashes. 

222 sys.modules["cog"] = self.cogmodule 

223 

224 # The globals dict we'll use for this file. 

225 if globals is None: 225 ↛ 229line 225 didn't jump to line 229 because the condition on line 225 was always true

226 globals = {} 

227 

228 # If there are any global defines, put them in the globals. 

229 globals.update(self.options.defines) 

230 

231 # loop over generator chunks 

232 line = file_in.readline() 

233 while line: 

234 # Find the next spec begin 

235 while line and not self.is_begin_spec_line(line): 

236 if self.is_end_output_line(line): 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true

237 raise CogError( 

238 f"Unexpected {self.options.end_output!r}", 

239 file=file_name_in, 

240 line=file_in.linenumber(), 

241 ) 

242 file_out.write(line) 

243 line = file_in.readline() 

244 if not line: 

245 break 

246 if not self.options.delete_code: 246 ↛ 250line 246 didn't jump to line 250 because the condition on line 246 was always true

247 file_out.write(line) 

248 

249 # `line` is the begin spec 

250 gen = CogGenerator(options=self.options) 

251 gen.set_output(stdout=self.stdout) 

252 gen.parse_marker(line) 

253 first_line_num = file_in.linenumber() 

254 self.cogmodule.firstLineNum = first_line_num 

255 

256 # If the spec begin is also a spec end, then process the single 

257 # line of code inside. 

258 if self.is_end_spec_line(line): 

259 beg = line.find(self.options.begin_spec) 

260 end = line.find(self.options.end_spec) 

261 if beg > end: 

262 raise CogError( 

263 "Cog code markers inverted", 

264 file=file_name_in, 

265 line=first_line_num, 

266 ) 

267 else: 

268 code = line[beg + len(self.options.begin_spec) : end].strip() 

269 gen.parse_line(code) 

270 else: 

271 # Deal with an ordinary code block. 

272 line = file_in.readline() 

273 

274 # Get all the lines in the spec 

275 while line and not self.is_end_spec_line(line): 

276 if self.is_begin_spec_line(line): 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true

277 raise CogError( 

278 f"Unexpected {self.options.begin_spec!r}", 

279 file=file_name_in, 

280 line=file_in.linenumber(), 

281 ) 

282 if self.is_end_output_line(line): 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true

283 raise CogError( 

284 f"Unexpected {self.options.end_output!r}", 

285 file=file_name_in, 

286 line=file_in.linenumber(), 

287 ) 

288 if not self.options.delete_code: 288 ↛ 290line 288 didn't jump to line 290 because the condition on line 288 was always true

289 file_out.write(line) 

290 gen.parse_line(line) 

291 line = file_in.readline() 

292 if not line: 292 ↛ 293line 292 didn't jump to line 293 because the condition on line 292 was never true

293 raise CogError( 

294 "Cog block begun but never ended.", 

295 file=file_name_in, 

296 line=first_line_num, 

297 ) 

298 

299 if not self.options.delete_code: 299 ↛ 301line 299 didn't jump to line 301 because the condition on line 299 was always true

300 file_out.write(line) 

301 gen.parse_marker(line) 

302 

303 line = file_in.readline() 

304 

305 # Eat all the lines in the output section. While reading past 

306 # them, compute the md5 hash of the old output. 

307 previous = [] 

308 while line and not self.is_end_output_line(line): 

309 if self.is_begin_spec_line(line): 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true

310 raise CogError( 

311 f"Unexpected {self.options.begin_spec!r}", 

312 file=file_name_in, 

313 line=file_in.linenumber(), 

314 ) 

315 if self.is_end_spec_line(line): 315 ↛ 316line 315 didn't jump to line 316 because the condition on line 315 was never true

316 raise CogError( 

317 f"Unexpected {self.options.end_spec!r}", 

318 file=file_name_in, 

319 line=file_in.linenumber(), 

320 ) 

321 previous.append(line) 

322 line = file_in.readline() 

323 cur_hash = self.hash_handler.compute_lines_hash(previous) 

324 

325 if not line and not self.options.eof_can_be_end: 325 ↛ 327line 325 didn't jump to line 327 because the condition on line 325 was never true

326 # We reached end of file before we found the end output line. 

327 raise CogError( 

328 f"Missing {self.options.end_output!r} before end of file.", 

329 file=file_name_in, 

330 line=file_in.linenumber(), 

331 ) 

332 

333 # Make the previous output available to the current code 

334 self.cogmodule.previous = "".join(previous) 

335 

336 # Write the output of the spec to be the new output if we're 

337 # supposed to generate code. 

338 if not self.options.no_generate: 338 ↛ 345line 338 didn't jump to line 345 because the condition on line 338 was always true

339 fname = f"<cog {file_name_in}:{first_line_num}>" 

340 gen = gen.evaluate(cog=self, globals=globals, fname=fname) 

341 gen = self.suffix_lines(gen) 

342 new_hash = self.hash_handler.compute_hash(gen) 

343 file_out.write(gen) 

344 else: 

345 new_hash = "" 

346 

347 saw_cog = True 

348 

349 # Write the ending output line 

350 if self.options.hash_output: 350 ↛ 351line 350 didn't jump to line 351 because the condition on line 350 was never true

351 try: 

352 self.hash_handler.validate_hash(line, cur_hash) 

353 except ValueError as e: 

354 raise CogError( 

355 str(e), 

356 file=file_name_in, 

357 line=file_in.linenumber(), 

358 ) 

359 line = self.hash_handler.format_end_line_with_hash( 

360 line, 

361 new_hash, 

362 add_hash=True, 

363 preserve_format=self.options.check, 

364 ) 

365 else: 

366 line = self.hash_handler.format_end_line_with_hash( 

367 line, new_hash, add_hash=False 

368 ) 

369 

370 if not self.options.delete_code: 370 ↛ 372line 370 didn't jump to line 372 because the condition on line 370 was always true

371 file_out.write(line) 

372 line = file_in.readline() 

373 

374 if not saw_cog and self.options.warn_empty: 374 ↛ 375line 374 didn't jump to line 375 because the condition on line 374 was never true

375 self.show_warning(f"no cog code found in {file_name_in}") 

376 finally: 

377 if file_in_to_close: 377 ↛ 378line 377 didn't jump to line 378 because the condition on line 377 was never true

378 file_in_to_close.close() 

379 if file_out_to_close: 379 ↛ 380line 379 didn't jump to line 380 because the condition on line 379 was never true

380 file_out_to_close.close() 

381 os.chdir(start_dir) 

382 

383 # A regex for non-empty lines, used by suffixLines. 

384 re_non_empty_lines = re.compile(r"^\s*\S+.*$", re.MULTILINE) 

385 

386 def suffix_lines(self, text): 

387 """Add suffixes to the lines in text, if our options desire it. 

388 

389 `text` is many lines, as a single string. 

390 

391 """ 

392 if self.options.suffix: 392 ↛ 394line 392 didn't jump to line 394 because the condition on line 392 was never true

393 # Find all non-blank lines, and add the suffix to the end. 

394 repl = r"\g<0>" + self.options.suffix.replace("\\", "\\\\") 

395 text = self.re_non_empty_lines.sub(repl, text) 

396 return text 

397 

398 def process_string(self, input, fname=None): 

399 """Process `input` as the text to cog. 

400 

401 Return the cogged output as a string. 

402 

403 """ 

404 file_old = io.StringIO(input) 

405 file_new = io.StringIO() 

406 self.process_file(file_old, file_new, fname=fname) 

407 return file_new.getvalue() 

408 

409 def replace_file(self, old_path, new_text): 

410 """Replace file oldPath with the contents newText""" 

411 if not os.access(old_path, os.W_OK): 

412 # Need to ensure we can write. 

413 if self.options.make_writable_cmd: 

414 # Use an external command to make the file writable. 

415 cmd = self.options.make_writable_cmd.replace("%s", old_path) 

416 with os.popen(cmd) as cmdout: 

417 self.stdout.write(cmdout.read()) 

418 if not os.access(old_path, os.W_OK): 

419 raise CogError(f"Couldn't make {old_path} writable") 

420 else: 

421 # Can't write! 

422 raise CogError(f"Can't overwrite {old_path}") 

423 f = self.open_output_file(old_path) 

424 f.write(new_text) 

425 f.close() 

426 

427 def save_include_path(self): 

428 self.saved_include = self.options.include_path[:] 

429 self.saved_sys_path = sys.path[:] 

430 

431 def restore_include_path(self): 

432 self.options.include_path = self.saved_include 

433 self.cogmodule.path = self.options.include_path 

434 sys.path = self.saved_sys_path 

435 

436 def add_to_include_path(self, include_path): 

437 self.cogmodule.path.extend(include_path) 

438 sys.path.extend(include_path) 

439 

440 def process_one_file(self, fname): 

441 """Process one filename through cog.""" 

442 

443 self.save_include_path() 

444 need_newline = False 

445 

446 try: 

447 self.add_to_include_path(self.options.include_path) 

448 # Since we know where the input file came from, 

449 # push its directory onto the include path. 

450 self.add_to_include_path([os.path.dirname(fname)]) 

451 

452 # How we process the file depends on where the output is going. 

453 if self.options.output_name: 

454 self.process_file(fname, self.options.output_name, fname) 

455 elif self.options.replace or self.options.check: 

456 # We want to replace the cog file with the output, 

457 # but only if they differ. 

458 verb = "Cogging" if self.options.replace else "Checking" 

459 if self.options.verbosity >= 2: 

460 self.prout(f"{verb} {fname}", end="") 

461 need_newline = True 

462 

463 try: 

464 file_old_file = self.open_input_file(fname) 

465 old_text = file_old_file.read() 

466 file_old_file.close() 

467 new_text = self.process_string(old_text, fname=fname) 

468 if old_text != new_text: 

469 if self.options.verbosity >= 1: 

470 if self.options.verbosity < 2: 

471 self.prout(f"{verb} {fname}", end="") 

472 self.prout(" (changed)") 

473 need_newline = False 

474 if self.options.replace: 

475 self.replace_file(fname, new_text) 

476 else: 

477 assert self.options.check 

478 self.check_failed = True 

479 if self.options.diff: 

480 old_lines = old_text.splitlines() 

481 new_lines = new_text.splitlines() 

482 diff = difflib.unified_diff( 

483 old_lines, 

484 new_lines, 

485 fromfile=f"current {fname}", 

486 tofile=f"changed {fname}", 

487 lineterm="", 

488 ) 

489 for diff_line in diff: 

490 self.prout(diff_line) 

491 finally: 

492 # The try-finally block is so we can print a partial line 

493 # with the name of the file, and print (changed) on the 

494 # same line, but also make sure to break the line before 

495 # any traceback. 

496 if need_newline: 

497 self.prout("") 

498 else: 

499 self.process_file(fname, self.stdout, fname) 

500 finally: 

501 self.restore_include_path() 

502 

503 def process_wildcards(self, fname): 

504 files = glob.glob(fname) 

505 if files: 

506 for matching_file in files: 

507 self.process_one_file(matching_file) 

508 else: 

509 self.process_one_file(fname) 

510 

511 def process_file_list(self, file_name_list): 

512 """Process the files in a file list.""" 

513 flist = self.open_input_file(file_name_list) 

514 lines = flist.readlines() 

515 flist.close() 

516 for line in lines: 

517 # Use shlex to parse the line like a shell. 

518 lex = shlex.shlex(line, posix=True) 

519 lex.whitespace_split = True 

520 lex.commenters = "#" 

521 # No escapes, so that backslash can be part of the path 

522 lex.escape = "" 

523 args = list(lex) 

524 if args: 

525 self.process_arguments(args) 

526 

527 def process_arguments(self, args): 

528 """Process one command-line.""" 

529 saved_options = self.options 

530 self.options = self.options.clone() 

531 

532 self.options.parse_args(args[1:]) 

533 

534 if args[0][0] == "@": 

535 if self.options.output_name: 

536 raise CogUsageError("Can't use -o with @file") 

537 self.process_file_list(args[0][1:]) 

538 elif args[0][0] == "&": 

539 if self.options.output_name: 

540 raise CogUsageError("Can't use -o with &file") 

541 file_list = args[0][1:] 

542 with change_dir(os.path.dirname(file_list)): 

543 self.process_file_list(os.path.basename(file_list)) 

544 else: 

545 self.process_wildcards(args[0]) 

546 

547 self.options = saved_options 

548 

549 def callable_main(self, argv): 

550 """All of command-line cog, but in a callable form. 

551 

552 This is used by main. `argv` is the equivalent of sys.argv. 

553 

554 """ 

555 argv = argv[1:] 

556 

557 # Provide help if asked for anywhere in the command line. 

558 if "-?" in argv or "-h" in argv or "--help" in argv: 

559 self.prerr(self.options.format_help(), end="") 

560 return 

561 

562 self.options.parse_args(argv) 

563 self._fix_end_output_patterns() 

564 

565 if self.options.show_version: 

566 self.prout(f"Cog version {__version__}") 

567 return 

568 

569 if self.options.args: 

570 for a in self.options.args: 

571 self.process_arguments([a]) 

572 else: 

573 raise CogUsageError("No files to process") 

574 

575 if self.check_failed: 

576 msg = "Check failed" 

577 if self.options.check_fail_msg: 

578 msg = f"{msg}: {self.options.check_fail_msg}" 

579 raise CogCheckFailed(msg) 

580 

581 def main(self, argv): 

582 """Handle the command-line execution for cog.""" 

583 

584 try: 

585 self.callable_main(argv) 

586 return 0 

587 except CogUsageError as err: 

588 self.prerr(err) 

589 self.prerr("(for help use --help)") 

590 return 2 

591 except CogGeneratedError as err: 

592 self.prerr(f"Error: {err}") 

593 return 3 

594 except CogUserException as err: 

595 self.prerr("Traceback (most recent call last):") 

596 self.prerr(err.args[0]) 

597 return 4 

598 except CogCheckFailed as err: 

599 self.prerr(err) 

600 return 5 

601 except CogError as err: 

602 self.prerr(err) 

603 return 1 

604 

605 

606def find_cog_source(frame_summary, prologue): 

607 """Find cog source lines in a frame summary list, for printing tracebacks. 

608 

609 Arguments: 

610 frame_summary: a list of 4-item tuples, as returned by traceback.extract_tb. 

611 prologue: the text of the code prologue. 

612 

613 Returns 

614 A list of 4-item tuples, updated to correct the cog entries. 

615 

616 """ 

617 prolines = prologue.splitlines() 

618 for filename, lineno, funcname, source in frame_summary: 

619 if not source: 619 ↛ 633line 619 didn't jump to line 633 because the condition on line 619 was always true

620 m = re.search(r"^<cog ([^:]+):(\d+)>$", filename) 

621 if m: 621 ↛ 622line 621 didn't jump to line 622 because the condition on line 621 was never true

622 if lineno <= len(prolines): 

623 filename = "<prologue>" 

624 source = prolines[lineno - 1] 

625 lineno -= ( 

626 1 # Because "import cog" is the first line in the prologue 

627 ) 

628 else: 

629 filename, coglineno = m.groups() 

630 coglineno = int(coglineno) 

631 lineno += coglineno - len(prolines) 

632 source = linecache.getline(filename, lineno).strip() 

633 yield filename, lineno, funcname, source 

634 

635 

636def main(): 

637 """Main function for entry_points to use.""" 

638 return Cog().main(sys.argv)