Coverage for cogapp/cogapp.py: 45.23%

496 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-21 15:20 -0400

1"""Cog content generation tool.""" 

2 

3import copy 

4import difflib 

5import getopt 

6import glob 

7import io 

8import linecache 

9import os 

10import re 

11import shlex 

12import sys 

13import traceback 

14import types 

15 

16from .whiteutils import common_prefix, reindent_block, white_prefix 

17from .utils import NumberedFileReader, Redirectable, change_dir, md5 

18from .hashhandler import HashHandler 

19 

20__version__ = "3.6.0" 

21 

22usage = """\ 

23cog - generate content with inlined Python code. 

24 

25cog [OPTIONS] [INFILE | @FILELIST | &FILELIST] ... 

26 

27INFILE is the name of an input file, '-' will read from stdin. 

28FILELIST is the name of a text file containing file names or 

29other @FILELISTs. 

30 

31For @FILELIST, paths in the file list are relative to the working 

32directory where cog was called. For &FILELIST, paths in the file 

33list are relative to the file list location. 

34 

35OPTIONS: 

36 -c Checksum the output to protect it against accidental change. 

37 -d Delete the Python code from the output file. 

38 -D name=val Define a global string available to your Python code. 

39 -e Warn if a file has no cog code in it. 

40 -I PATH Add PATH to the list of directories for data files and modules. 

41 -n ENCODING Use ENCODING when reading and writing files. 

42 -o OUTNAME Write the output to OUTNAME. 

43 -p PROLOGUE Prepend the Python source with PROLOGUE. Useful to insert an 

44 import line. Example: -p "import math" 

45 -P Use print() instead of cog.outl() for code output. 

46 -r Replace the input file with the output. 

47 -s STRING Suffix all generated output lines with STRING. 

48 -U Write the output with Unix newlines (only LF line-endings). 

49 -w CMD Use CMD if the output file needs to be made writable. 

50 A %s in the CMD will be filled with the filename. 

51 -x Excise all the generated output without running the Python. 

52 -z The end-output marker can be omitted, and is assumed at eof. 

53 -v Print the version of cog and exit. 

54 --check Check that the files would not change if run again. 

55 --check-fail-msg='MSG' 

56 If --check fails, include MSG in the output to help devs 

57 understand how to run cog in your project. 

58 --diff With --check, show a diff of what failed the check. 

59 --markers='START END END-OUTPUT' 

60 The patterns surrounding cog inline instructions. Should 

61 include three values separated by spaces, the start, end, 

62 and end-output markers. Defaults to '[[[cog ]]] [[[end]]]'. 

63 --verbosity=VERBOSITY 

64 Control the amount of output. 2 (the default) lists all files, 

65 1 lists only changed files, 0 lists no files. 

66 -h, --help Print this help. 

67""" 

68 

69 

70class CogError(Exception): 

71 """Any exception raised by Cog.""" 

72 

73 def __init__(self, msg, file="", line=0): 

74 if file: 

75 super().__init__(f"{file}({line}): {msg}") 

76 else: 

77 super().__init__(msg) 

78 

79 

80class CogUsageError(CogError): 

81 """An error in usage of command-line arguments in cog.""" 

82 

83 pass 

84 

85 

86class CogInternalError(CogError): 

87 """An error in the coding of Cog. Should never happen.""" 

88 

89 pass 

90 

91 

92class CogGeneratedError(CogError): 

93 """An error raised by a user's Python code.""" 

94 

95 pass 

96 

97 

98class CogUserException(CogError): 

99 """An exception caught when running a user's Python code. 

100 

101 The argument is the traceback message to print. 

102 

103 """ 

104 

105 pass 

106 

107 

108class CogCheckFailed(CogError): 

109 """A --check failed.""" 

110 

111 pass 

112 

113 

114class CogGenerator(Redirectable): 

115 """A generator pulled from a source file.""" 

116 

117 def __init__(self, options=None): 

118 super().__init__() 

119 self.markers = [] 

120 self.lines = [] 

121 self.options = options or CogOptions() 

122 

123 def parse_marker(self, line): 

124 self.markers.append(line) 

125 

126 def parse_line(self, line): 

127 self.lines.append(line.strip("\n")) 

128 

129 def get_code(self): 

130 """Extract the executable Python code from the generator.""" 

131 # If the markers and lines all have the same prefix 

132 # (end-of-line comment chars, for example), 

133 # then remove it from all the lines. 

134 pref_in = common_prefix(self.markers + self.lines) 

135 if pref_in: 

136 self.markers = [line.replace(pref_in, "", 1) for line in self.markers] 

137 self.lines = [line.replace(pref_in, "", 1) for line in self.lines] 

138 

139 return reindent_block(self.lines, "") 

140 

141 def evaluate(self, cog, globals, fname): 

142 # figure out the right whitespace prefix for the output 

143 pref_out = white_prefix(self.markers) 

144 

145 intext = self.get_code() 

146 if not intext: 

147 return "" 

148 

149 prologue = "import " + cog.cogmodulename + " as cog\n" 

150 if self.options.prologue: 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true

151 prologue += self.options.prologue + "\n" 

152 code = compile(prologue + intext, str(fname), "exec") 

153 

154 # Make sure the "cog" module has our state. 

155 cog.cogmodule.msg = self.msg 

156 cog.cogmodule.out = self.out 

157 cog.cogmodule.outl = self.outl 

158 cog.cogmodule.error = self.error 

159 

160 real_stdout = sys.stdout 

161 if self.options.print_output: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true

162 sys.stdout = captured_stdout = io.StringIO() 

163 

164 self.outstring = "" 

165 try: 

166 eval(code, globals) 

167 except CogError: 

168 raise 

169 except: # noqa: E722 (we're just wrapping in CogUserException and rethrowing) 

170 typ, err, tb = sys.exc_info() 

171 frames = (tuple(fr) for fr in traceback.extract_tb(tb.tb_next)) 

172 frames = find_cog_source(frames, prologue) 

173 msg = "".join(traceback.format_list(frames)) 

174 msg += f"{typ.__name__}: {err}" 

175 raise CogUserException(msg) 

176 finally: 

177 sys.stdout = real_stdout 

178 

179 if self.options.print_output: 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true

180 self.outstring = captured_stdout.getvalue() 

181 

182 # We need to make sure that the last line in the output 

183 # ends with a newline, or it will be joined to the 

184 # end-output line, ruining cog's idempotency. 

185 if self.outstring and self.outstring[-1] != "\n": 

186 self.outstring += "\n" 

187 

188 return reindent_block(self.outstring, pref_out) 

189 

190 def msg(self, s): 

191 self.prout("Message: " + s) 

192 

193 def out(self, sOut="", dedent=False, trimblanklines=False): 

194 """The cog.out function.""" 

195 if trimblanklines and ("\n" in sOut): 

196 lines = sOut.split("\n") 

197 if lines[0].strip() == "": 

198 del lines[0] 

199 if lines and lines[-1].strip() == "": 

200 del lines[-1] 

201 sOut = "\n".join(lines) + "\n" 

202 if dedent: 

203 sOut = reindent_block(sOut) 

204 self.outstring += sOut 

205 

206 def outl(self, sOut="", **kw): 

207 """The cog.outl function.""" 

208 self.out(sOut, **kw) 

209 self.out("\n") 

210 

211 def error(self, msg="Error raised by cog generator."): 

212 """The cog.error function. 

213 

214 Instead of raising standard python errors, cog generators can use 

215 this function. It will display the error without a scary Python 

216 traceback. 

217 

218 """ 

219 raise CogGeneratedError(msg) 

220 

221 

222class CogOptions: 

223 """Options for a run of cog.""" 

224 

225 def __init__(self): 

226 # Defaults for argument values. 

227 self.args = [] 

228 self.include_path = [] 

229 self.defines = {} 

230 self.show_version = False 

231 self.make_writable_cmd = None 

232 self.replace = False 

233 self.no_generate = False 

234 self.output_name = None 

235 self.warn_empty = False 

236 self.hash_output = False 

237 self.delete_code = False 

238 self.eof_can_be_end = False 

239 self.suffix = None 

240 self.newlines = False 

241 self.begin_spec = "[[[cog" 

242 self.end_spec = "]]]" 

243 self.end_output = "[[[end]]]" 

244 self.encoding = "utf-8" 

245 self.verbosity = 2 

246 self.prologue = "" 

247 self.print_output = False 

248 self.check = False 

249 self.check_fail_msg = None 

250 self.diff = False 

251 

252 def __eq__(self, other): 

253 """Comparison operator for tests to use.""" 

254 return self.__dict__ == other.__dict__ 

255 

256 def clone(self): 

257 """Make a clone of these options, for further refinement.""" 

258 return copy.deepcopy(self) 

259 

260 def add_to_include_path(self, dirs): 

261 """Add directories to the include path.""" 

262 dirs = dirs.split(os.pathsep) 

263 self.include_path.extend(dirs) 

264 

265 def parse_args(self, argv): 

266 # Parse the command line arguments. 

267 try: 

268 opts, self.args = getopt.getopt( 

269 argv, 

270 "cdD:eI:n:o:rs:p:PUvw:xz", 

271 [ 

272 "check", 

273 "check-fail-msg=", 

274 "diff", 

275 "markers=", 

276 "verbosity=", 

277 ], 

278 ) 

279 except getopt.error as msg: 

280 raise CogUsageError(msg) 

281 

282 # Handle the command line arguments. 

283 for o, a in opts: 

284 if o == "-c": 

285 self.hash_output = True 

286 elif o == "-d": 

287 self.delete_code = True 

288 elif o == "-D": 

289 if a.count("=") < 1: 

290 raise CogUsageError("-D takes a name=value argument") 

291 name, value = a.split("=", 1) 

292 self.defines[name] = value 

293 elif o == "-e": 

294 self.warn_empty = True 

295 elif o == "-I": 

296 self.add_to_include_path(os.path.abspath(a)) 

297 elif o == "-n": 

298 self.encoding = a 

299 elif o == "-o": 

300 self.output_name = a 

301 elif o == "-r": 

302 self.replace = True 

303 elif o == "-s": 

304 self.suffix = a 

305 elif o == "-p": 

306 self.prologue = a 

307 elif o == "-P": 

308 self.print_output = True 

309 elif o == "-U": 

310 self.newlines = True 

311 elif o == "-v": 

312 self.show_version = True 

313 elif o == "-w": 

314 self.make_writable_cmd = a 

315 elif o == "-x": 

316 self.no_generate = True 

317 elif o == "-z": 

318 self.eof_can_be_end = True 

319 elif o == "--check": 

320 self.check = True 

321 elif o == "--check-fail-msg": 

322 self.check_fail_msg = a 

323 elif o == "--diff": 

324 self.diff = True 

325 elif o == "--markers": 

326 self._parse_markers(a) 

327 elif o == "--verbosity": 

328 self.verbosity = int(a) 

329 else: 

330 # Since getopt.getopt is given a list of possible flags, 

331 # this is an internal error. 

332 raise CogInternalError(f"Don't understand argument {o}") 

333 

334 def _parse_markers(self, val): 

335 try: 

336 self.begin_spec, self.end_spec, self.end_output = val.split(" ") 

337 except ValueError: 

338 raise CogUsageError( 

339 f"--markers requires 3 values separated by spaces, could not parse {val!r}" 

340 ) 

341 

342 def validate(self): 

343 """Does nothing if everything is OK, raises CogError's if it's not.""" 

344 if self.replace and self.delete_code: 

345 raise CogUsageError( 

346 "Can't use -d with -r (or you would delete all your source!)" 

347 ) 

348 

349 if self.replace and self.output_name: 

350 raise CogUsageError("Can't use -o with -r (they are opposites)") 

351 

352 if self.diff and not self.check: 

353 raise CogUsageError("Can't use --diff without --check") 

354 

355 

356class Cog(Redirectable): 

357 """The Cog engine.""" 

358 

359 def __init__(self): 

360 super().__init__() 

361 self.options = CogOptions() 

362 self.cogmodulename = "cog" 

363 self.create_cog_module() 

364 self.check_failed = False 

365 self.hash_handler = None 

366 self._fix_end_output_patterns() 

367 

368 def _fix_end_output_patterns(self): 

369 self.hash_handler = HashHandler(self.options.end_output) 

370 

371 def show_warning(self, msg): 

372 self.prout(f"Warning: {msg}") 

373 

374 def is_begin_spec_line(self, s): 

375 return self.options.begin_spec in s 

376 

377 def is_end_spec_line(self, s): 

378 return self.options.end_spec in s and not self.is_end_output_line(s) 

379 

380 def is_end_output_line(self, s): 

381 return self.options.end_output in s 

382 

383 def create_cog_module(self): 

384 """Make a cog "module" object. 

385 

386 Imported Python modules can use "import cog" to get our state. 

387 

388 """ 

389 self.cogmodule = types.SimpleNamespace() 

390 self.cogmodule.path = [] 

391 

392 def open_output_file(self, fname): 

393 """Open an output file, taking all the details into account.""" 

394 opts = {} 

395 mode = "w" 

396 opts["encoding"] = self.options.encoding 

397 if self.options.newlines: 

398 opts["newline"] = "\n" 

399 fdir = os.path.dirname(fname) 

400 if os.path.dirname(fdir) and not os.path.exists(fdir): 

401 os.makedirs(fdir) 

402 return open(fname, mode, **opts) 

403 

404 def open_input_file(self, fname): 

405 """Open an input file.""" 

406 if fname == "-": 

407 return sys.stdin 

408 else: 

409 return open(fname, encoding=self.options.encoding) 

410 

411 def process_file(self, file_in, file_out, fname=None, globals=None): 

412 """Process an input file object to an output file object. 

413 

414 `fileIn` and `fileOut` can be file objects, or file names. 

415 

416 """ 

417 file_name_in = fname or "" 

418 file_name_out = fname or "" 

419 file_in_to_close = file_out_to_close = None 

420 # Convert filenames to files. 

421 if isinstance(file_in, (bytes, str)): 421 ↛ 423line 421 didn't jump to line 423 because the condition on line 421 was never true

422 # Open the input file. 

423 file_name_in = file_in 

424 file_in = file_in_to_close = self.open_input_file(file_in) 

425 if isinstance(file_out, (bytes, str)): 425 ↛ 427line 425 didn't jump to line 427 because the condition on line 425 was never true

426 # Open the output file. 

427 file_name_out = file_out 

428 file_out = file_out_to_close = self.open_output_file(file_out) 

429 

430 start_dir = os.getcwd() 

431 

432 try: 

433 file_in = NumberedFileReader(file_in) 

434 

435 saw_cog = False 

436 

437 self.cogmodule.inFile = file_name_in 

438 self.cogmodule.outFile = file_name_out 

439 self.cogmodulename = "cog_" + md5(file_name_out.encode()).hexdigest() 

440 sys.modules[self.cogmodulename] = self.cogmodule 

441 # if "import cog" explicitly done in code by user, note threading will cause clashes. 

442 sys.modules["cog"] = self.cogmodule 

443 

444 # The globals dict we'll use for this file. 

445 if globals is None: 445 ↛ 449line 445 didn't jump to line 449 because the condition on line 445 was always true

446 globals = {} 

447 

448 # If there are any global defines, put them in the globals. 

449 globals.update(self.options.defines) 

450 

451 # loop over generator chunks 

452 line = file_in.readline() 

453 while line: 

454 # Find the next spec begin 

455 while line and not self.is_begin_spec_line(line): 

456 if self.is_end_spec_line(line): 456 ↛ 457line 456 didn't jump to line 457 because the condition on line 456 was never true

457 raise CogError( 

458 f"Unexpected {self.options.end_spec!r}", 

459 file=file_name_in, 

460 line=file_in.linenumber(), 

461 ) 

462 if self.is_end_output_line(line): 462 ↛ 463line 462 didn't jump to line 463 because the condition on line 462 was never true

463 raise CogError( 

464 f"Unexpected {self.options.end_output!r}", 

465 file=file_name_in, 

466 line=file_in.linenumber(), 

467 ) 

468 file_out.write(line) 

469 line = file_in.readline() 

470 if not line: 

471 break 

472 if not self.options.delete_code: 472 ↛ 476line 472 didn't jump to line 476 because the condition on line 472 was always true

473 file_out.write(line) 

474 

475 # `line` is the begin spec 

476 gen = CogGenerator(options=self.options) 

477 gen.set_output(stdout=self.stdout) 

478 gen.parse_marker(line) 

479 first_line_num = file_in.linenumber() 

480 self.cogmodule.firstLineNum = first_line_num 

481 

482 # If the spec begin is also a spec end, then process the single 

483 # line of code inside. 

484 if self.is_end_spec_line(line): 

485 beg = line.find(self.options.begin_spec) 

486 end = line.find(self.options.end_spec) 

487 if beg > end: 

488 raise CogError( 

489 "Cog code markers inverted", 

490 file=file_name_in, 

491 line=first_line_num, 

492 ) 

493 else: 

494 code = line[beg + len(self.options.begin_spec) : end].strip() 

495 gen.parse_line(code) 

496 else: 

497 # Deal with an ordinary code block. 

498 line = file_in.readline() 

499 

500 # Get all the lines in the spec 

501 while line and not self.is_end_spec_line(line): 

502 if self.is_begin_spec_line(line): 502 ↛ 503line 502 didn't jump to line 503 because the condition on line 502 was never true

503 raise CogError( 

504 f"Unexpected {self.options.begin_spec!r}", 

505 file=file_name_in, 

506 line=file_in.linenumber(), 

507 ) 

508 if self.is_end_output_line(line): 508 ↛ 509line 508 didn't jump to line 509 because the condition on line 508 was never true

509 raise CogError( 

510 f"Unexpected {self.options.end_output!r}", 

511 file=file_name_in, 

512 line=file_in.linenumber(), 

513 ) 

514 if not self.options.delete_code: 514 ↛ 516line 514 didn't jump to line 516 because the condition on line 514 was always true

515 file_out.write(line) 

516 gen.parse_line(line) 

517 line = file_in.readline() 

518 if not line: 518 ↛ 519line 518 didn't jump to line 519 because the condition on line 518 was never true

519 raise CogError( 

520 "Cog block begun but never ended.", 

521 file=file_name_in, 

522 line=first_line_num, 

523 ) 

524 

525 if not self.options.delete_code: 525 ↛ 527line 525 didn't jump to line 527 because the condition on line 525 was always true

526 file_out.write(line) 

527 gen.parse_marker(line) 

528 

529 line = file_in.readline() 

530 

531 # Eat all the lines in the output section. While reading past 

532 # them, compute the md5 hash of the old output. 

533 previous = [] 

534 while line and not self.is_end_output_line(line): 

535 if self.is_begin_spec_line(line): 535 ↛ 536line 535 didn't jump to line 536 because the condition on line 535 was never true

536 raise CogError( 

537 f"Unexpected {self.options.begin_spec!r}", 

538 file=file_name_in, 

539 line=file_in.linenumber(), 

540 ) 

541 if self.is_end_spec_line(line): 541 ↛ 542line 541 didn't jump to line 542 because the condition on line 541 was never true

542 raise CogError( 

543 f"Unexpected {self.options.end_spec!r}", 

544 file=file_name_in, 

545 line=file_in.linenumber(), 

546 ) 

547 previous.append(line) 

548 line = file_in.readline() 

549 cur_hash = self.hash_handler.compute_lines_hash(previous) 

550 

551 if not line and not self.options.eof_can_be_end: 551 ↛ 553line 551 didn't jump to line 553 because the condition on line 551 was never true

552 # We reached end of file before we found the end output line. 

553 raise CogError( 

554 f"Missing {self.options.end_output!r} before end of file.", 

555 file=file_name_in, 

556 line=file_in.linenumber(), 

557 ) 

558 

559 # Make the previous output available to the current code 

560 self.cogmodule.previous = "".join(previous) 

561 

562 # Write the output of the spec to be the new output if we're 

563 # supposed to generate code. 

564 if not self.options.no_generate: 564 ↛ 571line 564 didn't jump to line 571 because the condition on line 564 was always true

565 fname = f"<cog {file_name_in}:{first_line_num}>" 

566 gen = gen.evaluate(cog=self, globals=globals, fname=fname) 

567 gen = self.suffix_lines(gen) 

568 new_hash = self.hash_handler.compute_hash(gen) 

569 file_out.write(gen) 

570 else: 

571 new_hash = "" 

572 

573 saw_cog = True 

574 

575 # Write the ending output line 

576 if self.options.hash_output: 576 ↛ 577line 576 didn't jump to line 577 because the condition on line 576 was never true

577 try: 

578 self.hash_handler.validate_hash(line, cur_hash) 

579 except ValueError as e: 

580 raise CogError( 

581 str(e), 

582 file=file_name_in, 

583 line=file_in.linenumber(), 

584 ) 

585 line = self.hash_handler.format_end_line_with_hash( 

586 line, 

587 new_hash, 

588 add_hash=True, 

589 preserve_format=self.options.check, 

590 ) 

591 else: 

592 line = self.hash_handler.format_end_line_with_hash( 

593 line, new_hash, add_hash=False 

594 ) 

595 

596 if not self.options.delete_code: 596 ↛ 598line 596 didn't jump to line 598 because the condition on line 596 was always true

597 file_out.write(line) 

598 line = file_in.readline() 

599 

600 if not saw_cog and self.options.warn_empty: 600 ↛ 601line 600 didn't jump to line 601 because the condition on line 600 was never true

601 self.show_warning(f"no cog code found in {file_name_in}") 

602 finally: 

603 if file_in_to_close: 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true

604 file_in_to_close.close() 

605 if file_out_to_close: 605 ↛ 606line 605 didn't jump to line 606 because the condition on line 605 was never true

606 file_out_to_close.close() 

607 os.chdir(start_dir) 

608 

609 # A regex for non-empty lines, used by suffixLines. 

610 re_non_empty_lines = re.compile(r"^\s*\S+.*$", re.MULTILINE) 

611 

612 def suffix_lines(self, text): 

613 """Add suffixes to the lines in text, if our options desire it. 

614 

615 `text` is many lines, as a single string. 

616 

617 """ 

618 if self.options.suffix: 618 ↛ 620line 618 didn't jump to line 620 because the condition on line 618 was never true

619 # Find all non-blank lines, and add the suffix to the end. 

620 repl = r"\g<0>" + self.options.suffix.replace("\\", "\\\\") 

621 text = self.re_non_empty_lines.sub(repl, text) 

622 return text 

623 

624 def process_string(self, input, fname=None): 

625 """Process `input` as the text to cog. 

626 

627 Return the cogged output as a string. 

628 

629 """ 

630 file_old = io.StringIO(input) 

631 file_new = io.StringIO() 

632 self.process_file(file_old, file_new, fname=fname) 

633 return file_new.getvalue() 

634 

635 def replace_file(self, old_path, new_text): 

636 """Replace file oldPath with the contents newText""" 

637 if not os.access(old_path, os.W_OK): 

638 # Need to ensure we can write. 

639 if self.options.make_writable_cmd: 

640 # Use an external command to make the file writable. 

641 cmd = self.options.make_writable_cmd.replace("%s", old_path) 

642 with os.popen(cmd) as cmdout: 

643 self.stdout.write(cmdout.read()) 

644 if not os.access(old_path, os.W_OK): 

645 raise CogError(f"Couldn't make {old_path} writable") 

646 else: 

647 # Can't write! 

648 raise CogError(f"Can't overwrite {old_path}") 

649 f = self.open_output_file(old_path) 

650 f.write(new_text) 

651 f.close() 

652 

653 def save_include_path(self): 

654 self.saved_include = self.options.include_path[:] 

655 self.saved_sys_path = sys.path[:] 

656 

657 def restore_include_path(self): 

658 self.options.include_path = self.saved_include 

659 self.cogmodule.path = self.options.include_path 

660 sys.path = self.saved_sys_path 

661 

662 def add_to_include_path(self, include_path): 

663 self.cogmodule.path.extend(include_path) 

664 sys.path.extend(include_path) 

665 

666 def process_one_file(self, fname): 

667 """Process one filename through cog.""" 

668 

669 self.save_include_path() 

670 need_newline = False 

671 

672 try: 

673 self.add_to_include_path(self.options.include_path) 

674 # Since we know where the input file came from, 

675 # push its directory onto the include path. 

676 self.add_to_include_path([os.path.dirname(fname)]) 

677 

678 # How we process the file depends on where the output is going. 

679 if self.options.output_name: 

680 self.process_file(fname, self.options.output_name, fname) 

681 elif self.options.replace or self.options.check: 

682 # We want to replace the cog file with the output, 

683 # but only if they differ. 

684 verb = "Cogging" if self.options.replace else "Checking" 

685 if self.options.verbosity >= 2: 

686 self.prout(f"{verb} {fname}", end="") 

687 need_newline = True 

688 

689 try: 

690 file_old_file = self.open_input_file(fname) 

691 old_text = file_old_file.read() 

692 file_old_file.close() 

693 new_text = self.process_string(old_text, fname=fname) 

694 if old_text != new_text: 

695 if self.options.verbosity >= 1: 

696 if self.options.verbosity < 2: 

697 self.prout(f"{verb} {fname}", end="") 

698 self.prout(" (changed)") 

699 need_newline = False 

700 if self.options.replace: 

701 self.replace_file(fname, new_text) 

702 else: 

703 assert self.options.check 

704 self.check_failed = True 

705 if self.options.diff: 

706 old_lines = old_text.splitlines() 

707 new_lines = new_text.splitlines() 

708 diff = difflib.unified_diff( 

709 old_lines, 

710 new_lines, 

711 fromfile=f"current {fname}", 

712 tofile=f"changed {fname}", 

713 lineterm="", 

714 ) 

715 for diff_line in diff: 

716 self.prout(diff_line) 

717 finally: 

718 # The try-finally block is so we can print a partial line 

719 # with the name of the file, and print (changed) on the 

720 # same line, but also make sure to break the line before 

721 # any traceback. 

722 if need_newline: 

723 self.prout("") 

724 else: 

725 self.process_file(fname, self.stdout, fname) 

726 finally: 

727 self.restore_include_path() 

728 

729 def process_wildcards(self, fname): 

730 files = glob.glob(fname) 

731 if files: 

732 for matching_file in files: 

733 self.process_one_file(matching_file) 

734 else: 

735 self.process_one_file(fname) 

736 

737 def process_file_list(self, file_name_list): 

738 """Process the files in a file list.""" 

739 flist = self.open_input_file(file_name_list) 

740 lines = flist.readlines() 

741 flist.close() 

742 for line in lines: 

743 # Use shlex to parse the line like a shell. 

744 lex = shlex.shlex(line, posix=True) 

745 lex.whitespace_split = True 

746 lex.commenters = "#" 

747 # No escapes, so that backslash can be part of the path 

748 lex.escape = "" 

749 args = list(lex) 

750 if args: 

751 self.process_arguments(args) 

752 

753 def process_arguments(self, args): 

754 """Process one command-line.""" 

755 saved_options = self.options 

756 self.options = self.options.clone() 

757 

758 self.options.parse_args(args[1:]) 

759 self.options.validate() 

760 

761 if args[0][0] == "@": 

762 if self.options.output_name: 

763 raise CogUsageError("Can't use -o with @file") 

764 self.process_file_list(args[0][1:]) 

765 elif args[0][0] == "&": 

766 if self.options.output_name: 

767 raise CogUsageError("Can't use -o with &file") 

768 file_list = args[0][1:] 

769 with change_dir(os.path.dirname(file_list)): 

770 self.process_file_list(os.path.basename(file_list)) 

771 else: 

772 self.process_wildcards(args[0]) 

773 

774 self.options = saved_options 

775 

776 def callable_main(self, argv): 

777 """All of command-line cog, but in a callable form. 

778 

779 This is used by main. `argv` is the equivalent of sys.argv. 

780 

781 """ 

782 argv = argv[1:] 

783 

784 # Provide help if asked for anywhere in the command line. 

785 if "-?" in argv or "-h" in argv or "--help" in argv: 

786 self.prerr(usage, end="") 

787 return 

788 

789 self.options.parse_args(argv) 

790 self.options.validate() 

791 self._fix_end_output_patterns() 

792 

793 if self.options.show_version: 

794 self.prout(f"Cog version {__version__}") 

795 return 

796 

797 if self.options.args: 

798 for a in self.options.args: 

799 self.process_arguments([a]) 

800 else: 

801 raise CogUsageError("No files to process") 

802 

803 if self.check_failed: 

804 msg = "Check failed" 

805 if self.options.check_fail_msg: 

806 msg = f"{msg}: {self.options.check_fail_msg}" 

807 raise CogCheckFailed(msg) 

808 

809 def main(self, argv): 

810 """Handle the command-line execution for cog.""" 

811 

812 try: 

813 self.callable_main(argv) 

814 return 0 

815 except CogUsageError as err: 

816 self.prerr(err) 

817 self.prerr("(for help use --help)") 

818 return 2 

819 except CogGeneratedError as err: 

820 self.prerr(f"Error: {err}") 

821 return 3 

822 except CogUserException as err: 

823 self.prerr("Traceback (most recent call last):") 

824 self.prerr(err.args[0]) 

825 return 4 

826 except CogCheckFailed as err: 

827 self.prerr(err) 

828 return 5 

829 except CogError as err: 

830 self.prerr(err) 

831 return 1 

832 

833 

834def find_cog_source(frame_summary, prologue): 

835 """Find cog source lines in a frame summary list, for printing tracebacks. 

836 

837 Arguments: 

838 frame_summary: a list of 4-item tuples, as returned by traceback.extract_tb. 

839 prologue: the text of the code prologue. 

840 

841 Returns 

842 A list of 4-item tuples, updated to correct the cog entries. 

843 

844 """ 

845 prolines = prologue.splitlines() 

846 for filename, lineno, funcname, source in frame_summary: 

847 if not source: 847 ↛ 861line 847 didn't jump to line 861 because the condition on line 847 was always true

848 m = re.search(r"^<cog ([^:]+):(\d+)>$", filename) 

849 if m: 849 ↛ 850line 849 didn't jump to line 850 because the condition on line 849 was never true

850 if lineno <= len(prolines): 

851 filename = "<prologue>" 

852 source = prolines[lineno - 1] 

853 lineno -= ( 

854 1 # Because "import cog" is the first line in the prologue 

855 ) 

856 else: 

857 filename, coglineno = m.groups() 

858 coglineno = int(coglineno) 

859 lineno += coglineno - len(prolines) 

860 source = linecache.getline(filename, lineno).strip() 

861 yield filename, lineno, funcname, source 

862 

863 

864def main(): 

865 """Main function for entry_points to use.""" 

866 return Cog().main(sys.argv)