Coverage for cogapp/cogapp.py: 45.75%

490 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-13 08:29 -0400

1"""Cog content generation tool.""" 

2 

3import copy 

4import difflib 

5import getopt 

6import glob 

7import io 

8import linecache 

9import os 

10import re 

11import shlex 

12import sys 

13import traceback 

14import types 

15 

16from .whiteutils import common_prefix, reindent_block, white_prefix 

17from .utils import NumberedFileReader, Redirectable, change_dir, md5 

18from .hashhandler import HashHandler 

19 

20__version__ = "3.5.1" 

21 

22usage = """\ 

23cog - generate content with inlined Python code. 

24 

25cog [OPTIONS] [INFILE | @FILELIST | &FILELIST] ... 

26 

27INFILE is the name of an input file, '-' will read from stdin. 

28FILELIST is the name of a text file containing file names or 

29other @FILELISTs. 

30 

31For @FILELIST, paths in the file list are relative to the working 

32directory where cog was called. For &FILELIST, paths in the file 

33list are relative to the file list location. 

34 

35OPTIONS: 

36 -c Checksum the output to protect it against accidental change. 

37 -d Delete the Python code from the output file. 

38 -D name=val Define a global string available to your Python code. 

39 -e Warn if a file has no cog code in it. 

40 -I PATH Add PATH to the list of directories for data files and modules. 

41 -n ENCODING Use ENCODING when reading and writing files. 

42 -o OUTNAME Write the output to OUTNAME. 

43 -p PROLOGUE Prepend the Python source with PROLOGUE. Useful to insert an 

44 import line. Example: -p "import math" 

45 -P Use print() instead of cog.outl() for code output. 

46 -r Replace the input file with the output. 

47 -s STRING Suffix all generated output lines with STRING. 

48 -U Write the output with Unix newlines (only LF line-endings). 

49 -w CMD Use CMD if the output file needs to be made writable. 

50 A %s in the CMD will be filled with the filename. 

51 -x Excise all the generated output without running the Python. 

52 -z The end-output marker can be omitted, and is assumed at eof. 

53 -v Print the version of cog and exit. 

54 --check Check that the files would not change if run again. 

55 --diff With --check, show a diff of what failed the check. 

56 --markers='START END END-OUTPUT' 

57 The patterns surrounding cog inline instructions. Should 

58 include three values separated by spaces, the start, end, 

59 and end-output markers. Defaults to '[[[cog ]]] [[[end]]]'. 

60 --verbosity=VERBOSITY 

61 Control the amount of output. 2 (the default) lists all files, 

62 1 lists only changed files, 0 lists no files. 

63 -h, --help Print this help. 

64""" 

65 

66 

67class CogError(Exception): 

68 """Any exception raised by Cog.""" 

69 

70 def __init__(self, msg, file="", line=0): 

71 if file: 

72 super().__init__(f"{file}({line}): {msg}") 

73 else: 

74 super().__init__(msg) 

75 

76 

77class CogUsageError(CogError): 

78 """An error in usage of command-line arguments in cog.""" 

79 

80 pass 

81 

82 

83class CogInternalError(CogError): 

84 """An error in the coding of Cog. Should never happen.""" 

85 

86 pass 

87 

88 

89class CogGeneratedError(CogError): 

90 """An error raised by a user's Python code.""" 

91 

92 pass 

93 

94 

95class CogUserException(CogError): 

96 """An exception caught when running a user's Python code. 

97 

98 The argument is the traceback message to print. 

99 

100 """ 

101 

102 pass 

103 

104 

105class CogCheckFailed(CogError): 

106 """A --check failed.""" 

107 

108 pass 

109 

110 

111class CogGenerator(Redirectable): 

112 """A generator pulled from a source file.""" 

113 

114 def __init__(self, options=None): 

115 super().__init__() 

116 self.markers = [] 

117 self.lines = [] 

118 self.options = options or CogOptions() 

119 

120 def parse_marker(self, line): 

121 self.markers.append(line) 

122 

123 def parse_line(self, line): 

124 self.lines.append(line.strip("\n")) 

125 

126 def get_code(self): 

127 """Extract the executable Python code from the generator.""" 

128 # If the markers and lines all have the same prefix 

129 # (end-of-line comment chars, for example), 

130 # then remove it from all the lines. 

131 pref_in = common_prefix(self.markers + self.lines) 

132 if pref_in: 

133 self.markers = [line.replace(pref_in, "", 1) for line in self.markers] 

134 self.lines = [line.replace(pref_in, "", 1) for line in self.lines] 

135 

136 return reindent_block(self.lines, "") 

137 

138 def evaluate(self, cog, globals, fname): 

139 # figure out the right whitespace prefix for the output 

140 pref_out = white_prefix(self.markers) 

141 

142 intext = self.get_code() 

143 if not intext: 

144 return "" 

145 

146 prologue = "import " + cog.cogmodulename + " as cog\n" 

147 if self.options.prologue: 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true

148 prologue += self.options.prologue + "\n" 

149 code = compile(prologue + intext, str(fname), "exec") 

150 

151 # Make sure the "cog" module has our state. 

152 cog.cogmodule.msg = self.msg 

153 cog.cogmodule.out = self.out 

154 cog.cogmodule.outl = self.outl 

155 cog.cogmodule.error = self.error 

156 

157 real_stdout = sys.stdout 

158 if self.options.print_output: 158 ↛ 159line 158 didn't jump to line 159 because the condition on line 158 was never true

159 sys.stdout = captured_stdout = io.StringIO() 

160 

161 self.outstring = "" 

162 try: 

163 eval(code, globals) 

164 except CogError: 

165 raise 

166 except: # noqa: E722 (we're just wrapping in CogUserException and rethrowing) 

167 typ, err, tb = sys.exc_info() 

168 frames = (tuple(fr) for fr in traceback.extract_tb(tb.tb_next)) 

169 frames = find_cog_source(frames, prologue) 

170 msg = "".join(traceback.format_list(frames)) 

171 msg += f"{typ.__name__}: {err}" 

172 raise CogUserException(msg) 

173 finally: 

174 sys.stdout = real_stdout 

175 

176 if self.options.print_output: 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true

177 self.outstring = captured_stdout.getvalue() 

178 

179 # We need to make sure that the last line in the output 

180 # ends with a newline, or it will be joined to the 

181 # end-output line, ruining cog's idempotency. 

182 if self.outstring and self.outstring[-1] != "\n": 

183 self.outstring += "\n" 

184 

185 return reindent_block(self.outstring, pref_out) 

186 

187 def msg(self, s): 

188 self.prout("Message: " + s) 

189 

190 def out(self, sOut="", dedent=False, trimblanklines=False): 

191 """The cog.out function.""" 

192 if trimblanklines and ("\n" in sOut): 

193 lines = sOut.split("\n") 

194 if lines[0].strip() == "": 

195 del lines[0] 

196 if lines and lines[-1].strip() == "": 

197 del lines[-1] 

198 sOut = "\n".join(lines) + "\n" 

199 if dedent: 

200 sOut = reindent_block(sOut) 

201 self.outstring += sOut 

202 

203 def outl(self, sOut="", **kw): 

204 """The cog.outl function.""" 

205 self.out(sOut, **kw) 

206 self.out("\n") 

207 

208 def error(self, msg="Error raised by cog generator."): 

209 """The cog.error function. 

210 

211 Instead of raising standard python errors, cog generators can use 

212 this function. It will display the error without a scary Python 

213 traceback. 

214 

215 """ 

216 raise CogGeneratedError(msg) 

217 

218 

219class CogOptions: 

220 """Options for a run of cog.""" 

221 

222 def __init__(self): 

223 # Defaults for argument values. 

224 self.args = [] 

225 self.include_path = [] 

226 self.defines = {} 

227 self.show_version = False 

228 self.make_writable_cmd = None 

229 self.replace = False 

230 self.no_generate = False 

231 self.output_name = None 

232 self.warn_empty = False 

233 self.hash_output = False 

234 self.delete_code = False 

235 self.eof_can_be_end = False 

236 self.suffix = None 

237 self.newlines = False 

238 self.begin_spec = "[[[cog" 

239 self.end_spec = "]]]" 

240 self.end_output = "[[[end]]]" 

241 self.encoding = "utf-8" 

242 self.verbosity = 2 

243 self.prologue = "" 

244 self.print_output = False 

245 self.check = False 

246 self.diff = False 

247 

248 def __eq__(self, other): 

249 """Comparison operator for tests to use.""" 

250 return self.__dict__ == other.__dict__ 

251 

252 def clone(self): 

253 """Make a clone of these options, for further refinement.""" 

254 return copy.deepcopy(self) 

255 

256 def add_to_include_path(self, dirs): 

257 """Add directories to the include path.""" 

258 dirs = dirs.split(os.pathsep) 

259 self.include_path.extend(dirs) 

260 

261 def parse_args(self, argv): 

262 # Parse the command line arguments. 

263 try: 

264 opts, self.args = getopt.getopt( 

265 argv, 

266 "cdD:eI:n:o:rs:p:PUvw:xz", 

267 [ 

268 "check", 

269 "diff", 

270 "markers=", 

271 "verbosity=", 

272 ], 

273 ) 

274 except getopt.error as msg: 

275 raise CogUsageError(msg) 

276 

277 # Handle the command line arguments. 

278 for o, a in opts: 

279 if o == "-c": 

280 self.hash_output = True 

281 elif o == "-d": 

282 self.delete_code = True 

283 elif o == "-D": 

284 if a.count("=") < 1: 

285 raise CogUsageError("-D takes a name=value argument") 

286 name, value = a.split("=", 1) 

287 self.defines[name] = value 

288 elif o == "-e": 

289 self.warn_empty = True 

290 elif o == "-I": 

291 self.add_to_include_path(os.path.abspath(a)) 

292 elif o == "-n": 

293 self.encoding = a 

294 elif o == "-o": 

295 self.output_name = a 

296 elif o == "-r": 

297 self.replace = True 

298 elif o == "-s": 

299 self.suffix = a 

300 elif o == "-p": 

301 self.prologue = a 

302 elif o == "-P": 

303 self.print_output = True 

304 elif o == "-U": 

305 self.newlines = True 

306 elif o == "-v": 

307 self.show_version = True 

308 elif o == "-w": 

309 self.make_writable_cmd = a 

310 elif o == "-x": 

311 self.no_generate = True 

312 elif o == "-z": 

313 self.eof_can_be_end = True 

314 elif o == "--check": 

315 self.check = True 

316 elif o == "--diff": 

317 self.diff = True 

318 elif o == "--markers": 

319 self._parse_markers(a) 

320 elif o == "--verbosity": 

321 self.verbosity = int(a) 

322 else: 

323 # Since getopt.getopt is given a list of possible flags, 

324 # this is an internal error. 

325 raise CogInternalError(f"Don't understand argument {o}") 

326 

327 def _parse_markers(self, val): 

328 try: 

329 self.begin_spec, self.end_spec, self.end_output = val.split(" ") 

330 except ValueError: 

331 raise CogUsageError( 

332 f"--markers requires 3 values separated by spaces, could not parse {val!r}" 

333 ) 

334 

335 def validate(self): 

336 """Does nothing if everything is OK, raises CogError's if it's not.""" 

337 if self.replace and self.delete_code: 

338 raise CogUsageError( 

339 "Can't use -d with -r (or you would delete all your source!)" 

340 ) 

341 

342 if self.replace and self.output_name: 

343 raise CogUsageError("Can't use -o with -r (they are opposites)") 

344 

345 if self.diff and not self.check: 

346 raise CogUsageError("Can't use --diff without --check") 

347 

348 

349class Cog(Redirectable): 

350 """The Cog engine.""" 

351 

352 def __init__(self): 

353 super().__init__() 

354 self.options = CogOptions() 

355 self.cogmodulename = "cog" 

356 self.create_cog_module() 

357 self.check_failed = False 

358 self.hash_handler = None 

359 self._fix_end_output_patterns() 

360 

361 def _fix_end_output_patterns(self): 

362 self.hash_handler = HashHandler(self.options.end_output) 

363 

364 def show_warning(self, msg): 

365 self.prout(f"Warning: {msg}") 

366 

367 def is_begin_spec_line(self, s): 

368 return self.options.begin_spec in s 

369 

370 def is_end_spec_line(self, s): 

371 return self.options.end_spec in s and not self.is_end_output_line(s) 

372 

373 def is_end_output_line(self, s): 

374 return self.options.end_output in s 

375 

376 def create_cog_module(self): 

377 """Make a cog "module" object. 

378 

379 Imported Python modules can use "import cog" to get our state. 

380 

381 """ 

382 self.cogmodule = types.SimpleNamespace() 

383 self.cogmodule.path = [] 

384 

385 def open_output_file(self, fname): 

386 """Open an output file, taking all the details into account.""" 

387 opts = {} 

388 mode = "w" 

389 opts["encoding"] = self.options.encoding 

390 if self.options.newlines: 

391 opts["newline"] = "\n" 

392 fdir = os.path.dirname(fname) 

393 if os.path.dirname(fdir) and not os.path.exists(fdir): 

394 os.makedirs(fdir) 

395 return open(fname, mode, **opts) 

396 

397 def open_input_file(self, fname): 

398 """Open an input file.""" 

399 if fname == "-": 

400 return sys.stdin 

401 else: 

402 return open(fname, encoding=self.options.encoding) 

403 

404 def process_file(self, file_in, file_out, fname=None, globals=None): 

405 """Process an input file object to an output file object. 

406 

407 `fileIn` and `fileOut` can be file objects, or file names. 

408 

409 """ 

410 file_name_in = fname or "" 

411 file_name_out = fname or "" 

412 file_in_to_close = file_out_to_close = None 

413 # Convert filenames to files. 

414 if isinstance(file_in, (bytes, str)): 414 ↛ 416line 414 didn't jump to line 416 because the condition on line 414 was never true

415 # Open the input file. 

416 file_name_in = file_in 

417 file_in = file_in_to_close = self.open_input_file(file_in) 

418 if isinstance(file_out, (bytes, str)): 418 ↛ 420line 418 didn't jump to line 420 because the condition on line 418 was never true

419 # Open the output file. 

420 file_name_out = file_out 

421 file_out = file_out_to_close = self.open_output_file(file_out) 

422 

423 start_dir = os.getcwd() 

424 

425 try: 

426 file_in = NumberedFileReader(file_in) 

427 

428 saw_cog = False 

429 

430 self.cogmodule.inFile = file_name_in 

431 self.cogmodule.outFile = file_name_out 

432 self.cogmodulename = "cog_" + md5(file_name_out.encode()).hexdigest() 

433 sys.modules[self.cogmodulename] = self.cogmodule 

434 # if "import cog" explicitly done in code by user, note threading will cause clashes. 

435 sys.modules["cog"] = self.cogmodule 

436 

437 # The globals dict we'll use for this file. 

438 if globals is None: 438 ↛ 442line 438 didn't jump to line 442 because the condition on line 438 was always true

439 globals = {} 

440 

441 # If there are any global defines, put them in the globals. 

442 globals.update(self.options.defines) 

443 

444 # loop over generator chunks 

445 line = file_in.readline() 

446 while line: 

447 # Find the next spec begin 

448 while line and not self.is_begin_spec_line(line): 

449 if self.is_end_spec_line(line): 449 ↛ 450line 449 didn't jump to line 450 because the condition on line 449 was never true

450 raise CogError( 

451 f"Unexpected {self.options.end_spec!r}", 

452 file=file_name_in, 

453 line=file_in.linenumber(), 

454 ) 

455 if self.is_end_output_line(line): 455 ↛ 456line 455 didn't jump to line 456 because the condition on line 455 was never true

456 raise CogError( 

457 f"Unexpected {self.options.end_output!r}", 

458 file=file_name_in, 

459 line=file_in.linenumber(), 

460 ) 

461 file_out.write(line) 

462 line = file_in.readline() 

463 if not line: 

464 break 

465 if not self.options.delete_code: 465 ↛ 469line 465 didn't jump to line 469 because the condition on line 465 was always true

466 file_out.write(line) 

467 

468 # `line` is the begin spec 

469 gen = CogGenerator(options=self.options) 

470 gen.set_output(stdout=self.stdout) 

471 gen.parse_marker(line) 

472 first_line_num = file_in.linenumber() 

473 self.cogmodule.firstLineNum = first_line_num 

474 

475 # If the spec begin is also a spec end, then process the single 

476 # line of code inside. 

477 if self.is_end_spec_line(line): 

478 beg = line.find(self.options.begin_spec) 

479 end = line.find(self.options.end_spec) 

480 if beg > end: 

481 raise CogError( 

482 "Cog code markers inverted", 

483 file=file_name_in, 

484 line=first_line_num, 

485 ) 

486 else: 

487 code = line[beg + len(self.options.begin_spec) : end].strip() 

488 gen.parse_line(code) 

489 else: 

490 # Deal with an ordinary code block. 

491 line = file_in.readline() 

492 

493 # Get all the lines in the spec 

494 while line and not self.is_end_spec_line(line): 

495 if self.is_begin_spec_line(line): 495 ↛ 496line 495 didn't jump to line 496 because the condition on line 495 was never true

496 raise CogError( 

497 f"Unexpected {self.options.begin_spec!r}", 

498 file=file_name_in, 

499 line=file_in.linenumber(), 

500 ) 

501 if self.is_end_output_line(line): 501 ↛ 502line 501 didn't jump to line 502 because the condition on line 501 was never true

502 raise CogError( 

503 f"Unexpected {self.options.end_output!r}", 

504 file=file_name_in, 

505 line=file_in.linenumber(), 

506 ) 

507 if not self.options.delete_code: 507 ↛ 509line 507 didn't jump to line 509 because the condition on line 507 was always true

508 file_out.write(line) 

509 gen.parse_line(line) 

510 line = file_in.readline() 

511 if not line: 511 ↛ 512line 511 didn't jump to line 512 because the condition on line 511 was never true

512 raise CogError( 

513 "Cog block begun but never ended.", 

514 file=file_name_in, 

515 line=first_line_num, 

516 ) 

517 

518 if not self.options.delete_code: 518 ↛ 520line 518 didn't jump to line 520 because the condition on line 518 was always true

519 file_out.write(line) 

520 gen.parse_marker(line) 

521 

522 line = file_in.readline() 

523 

524 # Eat all the lines in the output section. While reading past 

525 # them, compute the md5 hash of the old output. 

526 previous = [] 

527 while line and not self.is_end_output_line(line): 

528 if self.is_begin_spec_line(line): 528 ↛ 529line 528 didn't jump to line 529 because the condition on line 528 was never true

529 raise CogError( 

530 f"Unexpected {self.options.begin_spec!r}", 

531 file=file_name_in, 

532 line=file_in.linenumber(), 

533 ) 

534 if self.is_end_spec_line(line): 534 ↛ 535line 534 didn't jump to line 535 because the condition on line 534 was never true

535 raise CogError( 

536 f"Unexpected {self.options.end_spec!r}", 

537 file=file_name_in, 

538 line=file_in.linenumber(), 

539 ) 

540 previous.append(line) 

541 line = file_in.readline() 

542 cur_hash = self.hash_handler.compute_lines_hash(previous) 

543 

544 if not line and not self.options.eof_can_be_end: 544 ↛ 546line 544 didn't jump to line 546 because the condition on line 544 was never true

545 # We reached end of file before we found the end output line. 

546 raise CogError( 

547 f"Missing {self.options.end_output!r} before end of file.", 

548 file=file_name_in, 

549 line=file_in.linenumber(), 

550 ) 

551 

552 # Make the previous output available to the current code 

553 self.cogmodule.previous = "".join(previous) 

554 

555 # Write the output of the spec to be the new output if we're 

556 # supposed to generate code. 

557 if not self.options.no_generate: 557 ↛ 564line 557 didn't jump to line 564 because the condition on line 557 was always true

558 fname = f"<cog {file_name_in}:{first_line_num}>" 

559 gen = gen.evaluate(cog=self, globals=globals, fname=fname) 

560 gen = self.suffix_lines(gen) 

561 new_hash = self.hash_handler.compute_hash(gen) 

562 file_out.write(gen) 

563 else: 

564 new_hash = "" 

565 

566 saw_cog = True 

567 

568 # Write the ending output line 

569 if self.options.hash_output: 569 ↛ 570line 569 didn't jump to line 570 because the condition on line 569 was never true

570 try: 

571 self.hash_handler.validate_hash(line, cur_hash) 

572 except ValueError as e: 

573 raise CogError( 

574 str(e), 

575 file=file_name_in, 

576 line=file_in.linenumber(), 

577 ) 

578 line = self.hash_handler.format_end_line_with_hash( 

579 line, 

580 new_hash, 

581 add_hash=True, 

582 preserve_format=self.options.check, 

583 ) 

584 else: 

585 line = self.hash_handler.format_end_line_with_hash( 

586 line, new_hash, add_hash=False 

587 ) 

588 

589 if not self.options.delete_code: 589 ↛ 591line 589 didn't jump to line 591 because the condition on line 589 was always true

590 file_out.write(line) 

591 line = file_in.readline() 

592 

593 if not saw_cog and self.options.warn_empty: 593 ↛ 594line 593 didn't jump to line 594 because the condition on line 593 was never true

594 self.show_warning(f"no cog code found in {file_name_in}") 

595 finally: 

596 if file_in_to_close: 596 ↛ 597line 596 didn't jump to line 597 because the condition on line 596 was never true

597 file_in_to_close.close() 

598 if file_out_to_close: 598 ↛ 599line 598 didn't jump to line 599 because the condition on line 598 was never true

599 file_out_to_close.close() 

600 os.chdir(start_dir) 

601 

602 # A regex for non-empty lines, used by suffixLines. 

603 re_non_empty_lines = re.compile(r"^\s*\S+.*$", re.MULTILINE) 

604 

605 def suffix_lines(self, text): 

606 """Add suffixes to the lines in text, if our options desire it. 

607 

608 `text` is many lines, as a single string. 

609 

610 """ 

611 if self.options.suffix: 611 ↛ 613line 611 didn't jump to line 613 because the condition on line 611 was never true

612 # Find all non-blank lines, and add the suffix to the end. 

613 repl = r"\g<0>" + self.options.suffix.replace("\\", "\\\\") 

614 text = self.re_non_empty_lines.sub(repl, text) 

615 return text 

616 

617 def process_string(self, input, fname=None): 

618 """Process `input` as the text to cog. 

619 

620 Return the cogged output as a string. 

621 

622 """ 

623 file_old = io.StringIO(input) 

624 file_new = io.StringIO() 

625 self.process_file(file_old, file_new, fname=fname) 

626 return file_new.getvalue() 

627 

628 def replace_file(self, old_path, new_text): 

629 """Replace file oldPath with the contents newText""" 

630 if not os.access(old_path, os.W_OK): 

631 # Need to ensure we can write. 

632 if self.options.make_writable_cmd: 

633 # Use an external command to make the file writable. 

634 cmd = self.options.make_writable_cmd.replace("%s", old_path) 

635 with os.popen(cmd) as cmdout: 

636 self.stdout.write(cmdout.read()) 

637 if not os.access(old_path, os.W_OK): 

638 raise CogError(f"Couldn't make {old_path} writable") 

639 else: 

640 # Can't write! 

641 raise CogError(f"Can't overwrite {old_path}") 

642 f = self.open_output_file(old_path) 

643 f.write(new_text) 

644 f.close() 

645 

646 def save_include_path(self): 

647 self.saved_include = self.options.include_path[:] 

648 self.saved_sys_path = sys.path[:] 

649 

650 def restore_include_path(self): 

651 self.options.include_path = self.saved_include 

652 self.cogmodule.path = self.options.include_path 

653 sys.path = self.saved_sys_path 

654 

655 def add_to_include_path(self, include_path): 

656 self.cogmodule.path.extend(include_path) 

657 sys.path.extend(include_path) 

658 

659 def process_one_file(self, fname): 

660 """Process one filename through cog.""" 

661 

662 self.save_include_path() 

663 need_newline = False 

664 

665 try: 

666 self.add_to_include_path(self.options.include_path) 

667 # Since we know where the input file came from, 

668 # push its directory onto the include path. 

669 self.add_to_include_path([os.path.dirname(fname)]) 

670 

671 # How we process the file depends on where the output is going. 

672 if self.options.output_name: 

673 self.process_file(fname, self.options.output_name, fname) 

674 elif self.options.replace or self.options.check: 

675 # We want to replace the cog file with the output, 

676 # but only if they differ. 

677 verb = "Cogging" if self.options.replace else "Checking" 

678 if self.options.verbosity >= 2: 

679 self.prout(f"{verb} {fname}", end="") 

680 need_newline = True 

681 

682 try: 

683 file_old_file = self.open_input_file(fname) 

684 old_text = file_old_file.read() 

685 file_old_file.close() 

686 new_text = self.process_string(old_text, fname=fname) 

687 if old_text != new_text: 

688 if self.options.verbosity >= 1: 

689 if self.options.verbosity < 2: 

690 self.prout(f"{verb} {fname}", end="") 

691 self.prout(" (changed)") 

692 need_newline = False 

693 if self.options.replace: 

694 self.replace_file(fname, new_text) 

695 else: 

696 assert self.options.check 

697 self.check_failed = True 

698 if self.options.diff: 

699 old_lines = old_text.splitlines() 

700 new_lines = new_text.splitlines() 

701 diff = difflib.unified_diff( 

702 old_lines, 

703 new_lines, 

704 fromfile=f"current {fname}", 

705 tofile=f"changed {fname}", 

706 lineterm="", 

707 ) 

708 for diff_line in diff: 

709 self.prout(diff_line) 

710 finally: 

711 # The try-finally block is so we can print a partial line 

712 # with the name of the file, and print (changed) on the 

713 # same line, but also make sure to break the line before 

714 # any traceback. 

715 if need_newline: 

716 self.prout("") 

717 else: 

718 self.process_file(fname, self.stdout, fname) 

719 finally: 

720 self.restore_include_path() 

721 

722 def process_wildcards(self, fname): 

723 files = glob.glob(fname) 

724 if files: 

725 for matching_file in files: 

726 self.process_one_file(matching_file) 

727 else: 

728 self.process_one_file(fname) 

729 

730 def process_file_list(self, file_name_list): 

731 """Process the files in a file list.""" 

732 flist = self.open_input_file(file_name_list) 

733 lines = flist.readlines() 

734 flist.close() 

735 for line in lines: 

736 # Use shlex to parse the line like a shell. 

737 lex = shlex.shlex(line, posix=True) 

738 lex.whitespace_split = True 

739 lex.commenters = "#" 

740 # No escapes, so that backslash can be part of the path 

741 lex.escape = "" 

742 args = list(lex) 

743 if args: 

744 self.process_arguments(args) 

745 

746 def process_arguments(self, args): 

747 """Process one command-line.""" 

748 saved_options = self.options 

749 self.options = self.options.clone() 

750 

751 self.options.parse_args(args[1:]) 

752 self.options.validate() 

753 

754 if args[0][0] == "@": 

755 if self.options.output_name: 

756 raise CogUsageError("Can't use -o with @file") 

757 self.process_file_list(args[0][1:]) 

758 elif args[0][0] == "&": 

759 if self.options.output_name: 

760 raise CogUsageError("Can't use -o with &file") 

761 file_list = args[0][1:] 

762 with change_dir(os.path.dirname(file_list)): 

763 self.process_file_list(os.path.basename(file_list)) 

764 else: 

765 self.process_wildcards(args[0]) 

766 

767 self.options = saved_options 

768 

769 def callable_main(self, argv): 

770 """All of command-line cog, but in a callable form. 

771 

772 This is used by main. `argv` is the equivalent of sys.argv. 

773 

774 """ 

775 argv = argv[1:] 

776 

777 # Provide help if asked for anywhere in the command line. 

778 if "-?" in argv or "-h" in argv or "--help" in argv: 

779 self.prerr(usage, end="") 

780 return 

781 

782 self.options.parse_args(argv) 

783 self.options.validate() 

784 self._fix_end_output_patterns() 

785 

786 if self.options.show_version: 

787 self.prout(f"Cog version {__version__}") 

788 return 

789 

790 if self.options.args: 

791 for a in self.options.args: 

792 self.process_arguments([a]) 

793 else: 

794 raise CogUsageError("No files to process") 

795 

796 if self.check_failed: 

797 raise CogCheckFailed("Check failed") 

798 

799 def main(self, argv): 

800 """Handle the command-line execution for cog.""" 

801 

802 try: 

803 self.callable_main(argv) 

804 return 0 

805 except CogUsageError as err: 

806 self.prerr(err) 

807 self.prerr("(for help use --help)") 

808 return 2 

809 except CogGeneratedError as err: 

810 self.prerr(f"Error: {err}") 

811 return 3 

812 except CogUserException as err: 

813 self.prerr("Traceback (most recent call last):") 

814 self.prerr(err.args[0]) 

815 return 4 

816 except CogCheckFailed as err: 

817 self.prerr(err) 

818 return 5 

819 except CogError as err: 

820 self.prerr(err) 

821 return 1 

822 

823 

824def find_cog_source(frame_summary, prologue): 

825 """Find cog source lines in a frame summary list, for printing tracebacks. 

826 

827 Arguments: 

828 frame_summary: a list of 4-item tuples, as returned by traceback.extract_tb. 

829 prologue: the text of the code prologue. 

830 

831 Returns 

832 A list of 4-item tuples, updated to correct the cog entries. 

833 

834 """ 

835 prolines = prologue.splitlines() 

836 for filename, lineno, funcname, source in frame_summary: 

837 if not source: 837 ↛ 851line 837 didn't jump to line 851 because the condition on line 837 was always true

838 m = re.search(r"^<cog ([^:]+):(\d+)>$", filename) 

839 if m: 839 ↛ 840line 839 didn't jump to line 840 because the condition on line 839 was never true

840 if lineno <= len(prolines): 

841 filename = "<prologue>" 

842 source = prolines[lineno - 1] 

843 lineno -= ( 

844 1 # Because "import cog" is the first line in the prologue 

845 ) 

846 else: 

847 filename, coglineno = m.groups() 

848 coglineno = int(coglineno) 

849 lineno += coglineno - len(prolines) 

850 source = linecache.getline(filename, lineno).strip() 

851 yield filename, lineno, funcname, source 

852 

853 

854def main(): 

855 """Main function for entry_points to use.""" 

856 return Cog().main(sys.argv)