Coverage for cogapp/cogapp.py: 46.66%

483 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2024-12-26 11:29 -0500

1"""Cog content generation tool.""" 

2 

3import copy 

4import getopt 

5import glob 

6import io 

7import linecache 

8import os 

9import re 

10import shlex 

11import sys 

12import traceback 

13import types 

14 

15from .whiteutils import common_prefix, reindent_block, white_prefix 

16from .utils import NumberedFileReader, Redirectable, change_dir, md5 

17 

18__version__ = "3.4.1" 

19 

20usage = """\ 

21cog - generate content with inlined Python code. 

22 

23cog [OPTIONS] [INFILE | @FILELIST | &FILELIST] ... 

24 

25INFILE is the name of an input file, '-' will read from stdin. 

26FILELIST is the name of a text file containing file names or 

27other @FILELISTs. 

28 

29For @FILELIST, paths in the file list are relative to the working 

30directory where cog was called. For &FILELIST, paths in the file 

31list are relative to the file list location. 

32 

33OPTIONS: 

34 -c Checksum the output to protect it against accidental change. 

35 -d Delete the generator code from the output file. 

36 -D name=val Define a global string available to your generator code. 

37 -e Warn if a file has no cog code in it. 

38 -I PATH Add PATH to the list of directories for data files and modules. 

39 -n ENCODING Use ENCODING when reading and writing files. 

40 -o OUTNAME Write the output to OUTNAME. 

41 -p PROLOGUE Prepend the generator source with PROLOGUE. Useful to insert an 

42 import line. Example: -p "import math" 

43 -P Use print() instead of cog.outl() for code output. 

44 -r Replace the input file with the output. 

45 -s STRING Suffix all generated output lines with STRING. 

46 -U Write the output with Unix newlines (only LF line-endings). 

47 -w CMD Use CMD if the output file needs to be made writable. 

48 A %s in the CMD will be filled with the filename. 

49 -x Excise all the generated output without running the generators. 

50 -z The end-output marker can be omitted, and is assumed at eof. 

51 -v Print the version of cog and exit. 

52 --check Check that the files would not change if run again. 

53 --markers='START END END-OUTPUT' 

54 The patterns surrounding cog inline instructions. Should 

55 include three values separated by spaces, the start, end, 

56 and end-output markers. Defaults to '[[[cog ]]] [[[end]]]'. 

57 --verbosity=VERBOSITY 

58 Control the amount of output. 2 (the default) lists all files, 

59 1 lists only changed files, 0 lists no files. 

60 -h Print this help. 

61""" 

62 

63 

64class CogError(Exception): 

65 """Any exception raised by Cog.""" 

66 

67 def __init__(self, msg, file="", line=0): 

68 if file: 

69 super().__init__(f"{file}({line}): {msg}") 

70 else: 

71 super().__init__(msg) 

72 

73 

74class CogUsageError(CogError): 

75 """An error in usage of command-line arguments in cog.""" 

76 

77 pass 

78 

79 

80class CogInternalError(CogError): 

81 """An error in the coding of Cog. Should never happen.""" 

82 

83 pass 

84 

85 

86class CogGeneratedError(CogError): 

87 """An error raised by a user's cog generator.""" 

88 

89 pass 

90 

91 

92class CogUserException(CogError): 

93 """An exception caught when running a user's cog generator. 

94 

95 The argument is the traceback message to print. 

96 

97 """ 

98 

99 pass 

100 

101 

102class CogCheckFailed(CogError): 

103 """A --check failed.""" 

104 

105 pass 

106 

107 

108class CogGenerator(Redirectable): 

109 """A generator pulled from a source file.""" 

110 

111 def __init__(self, options=None): 

112 super().__init__() 

113 self.markers = [] 

114 self.lines = [] 

115 self.options = options or CogOptions() 

116 

117 def parse_marker(self, line): 

118 self.markers.append(line) 

119 

120 def parse_line(self, line): 

121 self.lines.append(line.strip("\n")) 

122 

123 def get_code(self): 

124 """Extract the executable Python code from the generator.""" 

125 # If the markers and lines all have the same prefix 

126 # (end-of-line comment chars, for example), 

127 # then remove it from all the lines. 

128 pref_in = common_prefix(self.markers + self.lines) 

129 if pref_in: 

130 self.markers = [line.replace(pref_in, "", 1) for line in self.markers] 

131 self.lines = [line.replace(pref_in, "", 1) for line in self.lines] 

132 

133 return reindent_block(self.lines, "") 

134 

135 def evaluate(self, cog, globals, fname): 

136 # figure out the right whitespace prefix for the output 

137 pref_out = white_prefix(self.markers) 

138 

139 intext = self.get_code() 

140 if not intext: 

141 return "" 

142 

143 prologue = "import " + cog.cogmodulename + " as cog\n" 

144 if self.options.prologue: 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true

145 prologue += self.options.prologue + "\n" 

146 code = compile(prologue + intext, str(fname), "exec") 

147 

148 # Make sure the "cog" module has our state. 

149 cog.cogmodule.msg = self.msg 

150 cog.cogmodule.out = self.out 

151 cog.cogmodule.outl = self.outl 

152 cog.cogmodule.error = self.error 

153 

154 real_stdout = sys.stdout 

155 if self.options.print_output: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true

156 sys.stdout = captured_stdout = io.StringIO() 

157 

158 self.outstring = "" 

159 try: 

160 eval(code, globals) 

161 except CogError: 

162 raise 

163 except: # noqa: E722 (we're just wrapping in CogUserException and rethrowing) 

164 typ, err, tb = sys.exc_info() 

165 frames = (tuple(fr) for fr in traceback.extract_tb(tb.tb_next)) 

166 frames = find_cog_source(frames, prologue) 

167 msg = "".join(traceback.format_list(frames)) 

168 msg += f"{typ.__name__}: {err}" 

169 raise CogUserException(msg) 

170 finally: 

171 sys.stdout = real_stdout 

172 

173 if self.options.print_output: 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true

174 self.outstring = captured_stdout.getvalue() 

175 

176 # We need to make sure that the last line in the output 

177 # ends with a newline, or it will be joined to the 

178 # end-output line, ruining cog's idempotency. 

179 if self.outstring and self.outstring[-1] != "\n": 

180 self.outstring += "\n" 

181 

182 return reindent_block(self.outstring, pref_out) 

183 

184 def msg(self, s): 

185 self.prout("Message: " + s) 

186 

187 def out(self, sOut="", dedent=False, trimblanklines=False): 

188 """The cog.out function.""" 

189 if trimblanklines and ("\n" in sOut): 

190 lines = sOut.split("\n") 

191 if lines[0].strip() == "": 

192 del lines[0] 

193 if lines and lines[-1].strip() == "": 

194 del lines[-1] 

195 sOut = "\n".join(lines) + "\n" 

196 if dedent: 

197 sOut = reindent_block(sOut) 

198 self.outstring += sOut 

199 

200 def outl(self, sOut="", **kw): 

201 """The cog.outl function.""" 

202 self.out(sOut, **kw) 

203 self.out("\n") 

204 

205 def error(self, msg="Error raised by cog generator."): 

206 """The cog.error function. 

207 

208 Instead of raising standard python errors, cog generators can use 

209 this function. It will display the error without a scary Python 

210 traceback. 

211 

212 """ 

213 raise CogGeneratedError(msg) 

214 

215 

216class CogOptions: 

217 """Options for a run of cog.""" 

218 

219 def __init__(self): 

220 # Defaults for argument values. 

221 self.args = [] 

222 self.include_path = [] 

223 self.defines = {} 

224 self.show_version = False 

225 self.make_writable_cmd = None 

226 self.replace = False 

227 self.no_generate = False 

228 self.output_name = None 

229 self.warn_empty = False 

230 self.hash_output = False 

231 self.delete_code = False 

232 self.eof_can_be_end = False 

233 self.suffix = None 

234 self.newlines = False 

235 self.begin_spec = "[[[cog" 

236 self.end_spec = "]]]" 

237 self.end_output = "[[[end]]]" 

238 self.encoding = "utf-8" 

239 self.verbosity = 2 

240 self.prologue = "" 

241 self.print_output = False 

242 self.check = False 

243 

244 def __eq__(self, other): 

245 """Comparison operator for tests to use.""" 

246 return self.__dict__ == other.__dict__ 

247 

248 def clone(self): 

249 """Make a clone of these options, for further refinement.""" 

250 return copy.deepcopy(self) 

251 

252 def add_to_include_path(self, dirs): 

253 """Add directories to the include path.""" 

254 dirs = dirs.split(os.pathsep) 

255 self.include_path.extend(dirs) 

256 

257 def parse_args(self, argv): 

258 # Parse the command line arguments. 

259 try: 

260 opts, self.args = getopt.getopt( 

261 argv, 

262 "cdD:eI:n:o:rs:p:PUvw:xz", 

263 [ 

264 "check", 

265 "markers=", 

266 "verbosity=", 

267 ], 

268 ) 

269 except getopt.error as msg: 

270 raise CogUsageError(msg) 

271 

272 # Handle the command line arguments. 

273 for o, a in opts: 

274 if o == "-c": 

275 self.hash_output = True 

276 elif o == "-d": 

277 self.delete_code = True 

278 elif o == "-D": 

279 if a.count("=") < 1: 

280 raise CogUsageError("-D takes a name=value argument") 

281 name, value = a.split("=", 1) 

282 self.defines[name] = value 

283 elif o == "-e": 

284 self.warn_empty = True 

285 elif o == "-I": 

286 self.add_to_include_path(os.path.abspath(a)) 

287 elif o == "-n": 

288 self.encoding = a 

289 elif o == "-o": 

290 self.output_name = a 

291 elif o == "-r": 

292 self.replace = True 

293 elif o == "-s": 

294 self.suffix = a 

295 elif o == "-p": 

296 self.prologue = a 

297 elif o == "-P": 

298 self.print_output = True 

299 elif o == "-U": 

300 self.newlines = True 

301 elif o == "-v": 

302 self.show_version = True 

303 elif o == "-w": 

304 self.make_writable_cmd = a 

305 elif o == "-x": 

306 self.no_generate = True 

307 elif o == "-z": 

308 self.eof_can_be_end = True 

309 elif o == "--check": 

310 self.check = True 

311 elif o == "--markers": 

312 self._parse_markers(a) 

313 elif o == "--verbosity": 

314 self.verbosity = int(a) 

315 else: 

316 # Since getopt.getopt is given a list of possible flags, 

317 # this is an internal error. 

318 raise CogInternalError(f"Don't understand argument {o}") 

319 

320 def _parse_markers(self, val): 

321 try: 

322 self.begin_spec, self.end_spec, self.end_output = val.split(" ") 

323 except ValueError: 

324 raise CogUsageError( 

325 f"--markers requires 3 values separated by spaces, could not parse {val!r}" 

326 ) 

327 

328 def validate(self): 

329 """Does nothing if everything is OK, raises CogError's if it's not.""" 

330 if self.replace and self.delete_code: 

331 raise CogUsageError( 

332 "Can't use -d with -r (or you would delete all your source!)" 

333 ) 

334 

335 if self.replace and self.output_name: 

336 raise CogUsageError("Can't use -o with -r (they are opposites)") 

337 

338 

339class Cog(Redirectable): 

340 """The Cog engine.""" 

341 

342 def __init__(self): 

343 super().__init__() 

344 self.options = CogOptions() 

345 self._fix_end_output_patterns() 

346 self.cogmodulename = "cog" 

347 self.create_cog_module() 

348 self.check_failed = False 

349 

350 def _fix_end_output_patterns(self): 

351 end_output = re.escape(self.options.end_output) 

352 self.re_end_output = re.compile( 

353 end_output + r"(?P<hashsect> *\(checksum: (?P<hash>[a-f0-9]+)\))" 

354 ) 

355 self.end_format = self.options.end_output + " (checksum: %s)" 

356 

357 def show_warning(self, msg): 

358 self.prout(f"Warning: {msg}") 

359 

360 def is_begin_spec_line(self, s): 

361 return self.options.begin_spec in s 

362 

363 def is_end_spec_line(self, s): 

364 return self.options.end_spec in s and not self.is_end_output_line(s) 

365 

366 def is_end_output_line(self, s): 

367 return self.options.end_output in s 

368 

369 def create_cog_module(self): 

370 """Make a cog "module" object. 

371 

372 Imported Python modules can use "import cog" to get our state. 

373 

374 """ 

375 self.cogmodule = types.SimpleNamespace() 

376 self.cogmodule.path = [] 

377 

378 def open_output_file(self, fname): 

379 """Open an output file, taking all the details into account.""" 

380 opts = {} 

381 mode = "w" 

382 opts["encoding"] = self.options.encoding 

383 if self.options.newlines: 

384 opts["newline"] = "\n" 

385 fdir = os.path.dirname(fname) 

386 if os.path.dirname(fdir) and not os.path.exists(fdir): 

387 os.makedirs(fdir) 

388 return open(fname, mode, **opts) 

389 

390 def open_input_file(self, fname): 

391 """Open an input file.""" 

392 if fname == "-": 

393 return sys.stdin 

394 else: 

395 return open(fname, encoding=self.options.encoding) 

396 

397 def process_file(self, file_in, file_out, fname=None, globals=None): 

398 """Process an input file object to an output file object. 

399 

400 `fileIn` and `fileOut` can be file objects, or file names. 

401 

402 """ 

403 file_name_in = fname or "" 

404 file_name_out = fname or "" 

405 file_in_to_close = file_out_to_close = None 

406 # Convert filenames to files. 

407 if isinstance(file_in, (bytes, str)): 407 ↛ 409line 407 didn't jump to line 409 because the condition on line 407 was never true

408 # Open the input file. 

409 file_name_in = file_in 

410 file_in = file_in_to_close = self.open_input_file(file_in) 

411 if isinstance(file_out, (bytes, str)): 411 ↛ 413line 411 didn't jump to line 413 because the condition on line 411 was never true

412 # Open the output file. 

413 file_name_out = file_out 

414 file_out = file_out_to_close = self.open_output_file(file_out) 

415 

416 try: 

417 file_in = NumberedFileReader(file_in) 

418 

419 saw_cog = False 

420 

421 self.cogmodule.inFile = file_name_in 

422 self.cogmodule.outFile = file_name_out 

423 self.cogmodulename = "cog_" + md5(file_name_out.encode()).hexdigest() 

424 sys.modules[self.cogmodulename] = self.cogmodule 

425 # if "import cog" explicitly done in code by user, note threading will cause clashes. 

426 sys.modules["cog"] = self.cogmodule 

427 

428 # The globals dict we'll use for this file. 

429 if globals is None: 429 ↛ 433line 429 didn't jump to line 433 because the condition on line 429 was always true

430 globals = {} 

431 

432 # If there are any global defines, put them in the globals. 

433 globals.update(self.options.defines) 

434 

435 # loop over generator chunks 

436 line = file_in.readline() 

437 while line: 

438 # Find the next spec begin 

439 while line and not self.is_begin_spec_line(line): 

440 if self.is_end_spec_line(line): 440 ↛ 441line 440 didn't jump to line 441 because the condition on line 440 was never true

441 raise CogError( 

442 f"Unexpected {self.options.end_spec!r}", 

443 file=file_name_in, 

444 line=file_in.linenumber(), 

445 ) 

446 if self.is_end_output_line(line): 446 ↛ 447line 446 didn't jump to line 447 because the condition on line 446 was never true

447 raise CogError( 

448 f"Unexpected {self.options.end_output!r}", 

449 file=file_name_in, 

450 line=file_in.linenumber(), 

451 ) 

452 file_out.write(line) 

453 line = file_in.readline() 

454 if not line: 

455 break 

456 if not self.options.delete_code: 456 ↛ 460line 456 didn't jump to line 460 because the condition on line 456 was always true

457 file_out.write(line) 

458 

459 # l is the begin spec 

460 gen = CogGenerator(options=self.options) 

461 gen.set_output(stdout=self.stdout) 

462 gen.parse_marker(line) 

463 first_line_num = file_in.linenumber() 

464 self.cogmodule.firstLineNum = first_line_num 

465 

466 # If the spec begin is also a spec end, then process the single 

467 # line of code inside. 

468 if self.is_end_spec_line(line): 

469 beg = line.find(self.options.begin_spec) 

470 end = line.find(self.options.end_spec) 

471 if beg > end: 

472 raise CogError( 

473 "Cog code markers inverted", 

474 file=file_name_in, 

475 line=first_line_num, 

476 ) 

477 else: 

478 code = line[beg + len(self.options.begin_spec) : end].strip() 

479 gen.parse_line(code) 

480 else: 

481 # Deal with an ordinary code block. 

482 line = file_in.readline() 

483 

484 # Get all the lines in the spec 

485 while line and not self.is_end_spec_line(line): 

486 if self.is_begin_spec_line(line): 486 ↛ 487line 486 didn't jump to line 487 because the condition on line 486 was never true

487 raise CogError( 

488 f"Unexpected {self.options.begin_spec!r}", 

489 file=file_name_in, 

490 line=file_in.linenumber(), 

491 ) 

492 if self.is_end_output_line(line): 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true

493 raise CogError( 

494 f"Unexpected {self.options.end_output!r}", 

495 file=file_name_in, 

496 line=file_in.linenumber(), 

497 ) 

498 if not self.options.delete_code: 498 ↛ 500line 498 didn't jump to line 500 because the condition on line 498 was always true

499 file_out.write(line) 

500 gen.parse_line(line) 

501 line = file_in.readline() 

502 if not line: 502 ↛ 503line 502 didn't jump to line 503 because the condition on line 502 was never true

503 raise CogError( 

504 "Cog block begun but never ended.", 

505 file=file_name_in, 

506 line=first_line_num, 

507 ) 

508 

509 if not self.options.delete_code: 509 ↛ 511line 509 didn't jump to line 511 because the condition on line 509 was always true

510 file_out.write(line) 

511 gen.parse_marker(line) 

512 

513 line = file_in.readline() 

514 

515 # Eat all the lines in the output section. While reading past 

516 # them, compute the md5 hash of the old output. 

517 previous = [] 

518 hasher = md5() 

519 while line and not self.is_end_output_line(line): 

520 if self.is_begin_spec_line(line): 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true

521 raise CogError( 

522 f"Unexpected {self.options.begin_spec!r}", 

523 file=file_name_in, 

524 line=file_in.linenumber(), 

525 ) 

526 if self.is_end_spec_line(line): 526 ↛ 527line 526 didn't jump to line 527 because the condition on line 526 was never true

527 raise CogError( 

528 f"Unexpected {self.options.end_spec!r}", 

529 file=file_name_in, 

530 line=file_in.linenumber(), 

531 ) 

532 previous.append(line) 

533 hasher.update(line.encode("utf-8")) 

534 line = file_in.readline() 

535 cur_hash = hasher.hexdigest() 

536 

537 if not line and not self.options.eof_can_be_end: 537 ↛ 539line 537 didn't jump to line 539 because the condition on line 537 was never true

538 # We reached end of file before we found the end output line. 

539 raise CogError( 

540 f"Missing {self.options.end_output!r} before end of file.", 

541 file=file_name_in, 

542 line=file_in.linenumber(), 

543 ) 

544 

545 # Make the previous output available to the current code 

546 self.cogmodule.previous = "".join(previous) 

547 

548 # Write the output of the spec to be the new output if we're 

549 # supposed to generate code. 

550 hasher = md5() 

551 if not self.options.no_generate: 551 ↛ 557line 551 didn't jump to line 557 because the condition on line 551 was always true

552 fname = f"<cog {file_name_in}:{first_line_num}>" 

553 gen = gen.evaluate(cog=self, globals=globals, fname=fname) 

554 gen = self.suffix_lines(gen) 

555 hasher.update(gen.encode("utf-8")) 

556 file_out.write(gen) 

557 new_hash = hasher.hexdigest() 

558 

559 saw_cog = True 

560 

561 # Write the ending output line 

562 hash_match = self.re_end_output.search(line) 

563 if self.options.hash_output: 563 ↛ 564line 563 didn't jump to line 564 because the condition on line 563 was never true

564 if hash_match: 

565 old_hash = hash_match["hash"] 

566 if old_hash != cur_hash: 

567 raise CogError( 

568 "Output has been edited! Delete old checksum to unprotect.", 

569 file=file_name_in, 

570 line=file_in.linenumber(), 

571 ) 

572 # Create a new end line with the correct hash. 

573 endpieces = line.split(hash_match.group(0), 1) 

574 else: 

575 # There was no old hash, but we want a new hash. 

576 endpieces = line.split(self.options.end_output, 1) 

577 line = (self.end_format % new_hash).join(endpieces) 

578 else: 

579 # We don't want hashes output, so if there was one, get rid of 

580 # it. 

581 if hash_match: 581 ↛ 582line 581 didn't jump to line 582 because the condition on line 581 was never true

582 line = line.replace(hash_match["hashsect"], "", 1) 

583 

584 if not self.options.delete_code: 584 ↛ 586line 584 didn't jump to line 586 because the condition on line 584 was always true

585 file_out.write(line) 

586 line = file_in.readline() 

587 

588 if not saw_cog and self.options.warn_empty: 588 ↛ 589line 588 didn't jump to line 589 because the condition on line 588 was never true

589 self.show_warning(f"no cog code found in {file_name_in}") 

590 finally: 

591 if file_in_to_close: 591 ↛ 592line 591 didn't jump to line 592 because the condition on line 591 was never true

592 file_in_to_close.close() 

593 if file_out_to_close: 593 ↛ 594line 593 didn't jump to line 594 because the condition on line 593 was never true

594 file_out_to_close.close() 

595 

596 # A regex for non-empty lines, used by suffixLines. 

597 re_non_empty_lines = re.compile(r"^\s*\S+.*$", re.MULTILINE) 

598 

599 def suffix_lines(self, text): 

600 """Add suffixes to the lines in text, if our options desire it. 

601 

602 `text` is many lines, as a single string. 

603 

604 """ 

605 if self.options.suffix: 605 ↛ 607line 605 didn't jump to line 607 because the condition on line 605 was never true

606 # Find all non-blank lines, and add the suffix to the end. 

607 repl = r"\g<0>" + self.options.suffix.replace("\\", "\\\\") 

608 text = self.re_non_empty_lines.sub(repl, text) 

609 return text 

610 

611 def process_string(self, input, fname=None): 

612 """Process `input` as the text to cog. 

613 

614 Return the cogged output as a string. 

615 

616 """ 

617 file_old = io.StringIO(input) 

618 file_new = io.StringIO() 

619 self.process_file(file_old, file_new, fname=fname) 

620 return file_new.getvalue() 

621 

622 def replace_file(self, old_path, new_text): 

623 """Replace file oldPath with the contents newText""" 

624 if not os.access(old_path, os.W_OK): 

625 # Need to ensure we can write. 

626 if self.options.make_writable_cmd: 

627 # Use an external command to make the file writable. 

628 cmd = self.options.make_writable_cmd.replace("%s", old_path) 

629 with os.popen(cmd) as cmdout: 

630 self.stdout.write(cmdout.read()) 

631 if not os.access(old_path, os.W_OK): 

632 raise CogError(f"Couldn't make {old_path} writable") 

633 else: 

634 # Can't write! 

635 raise CogError(f"Can't overwrite {old_path}") 

636 f = self.open_output_file(old_path) 

637 f.write(new_text) 

638 f.close() 

639 

640 def save_include_path(self): 

641 self.saved_include = self.options.include_path[:] 

642 self.saved_sys_path = sys.path[:] 

643 

644 def restore_include_path(self): 

645 self.options.include_path = self.saved_include 

646 self.cogmodule.path = self.options.include_path 

647 sys.path = self.saved_sys_path 

648 

649 def add_to_include_path(self, include_path): 

650 self.cogmodule.path.extend(include_path) 

651 sys.path.extend(include_path) 

652 

653 def process_one_file(self, fname): 

654 """Process one filename through cog.""" 

655 

656 self.save_include_path() 

657 need_newline = False 

658 

659 try: 

660 self.add_to_include_path(self.options.include_path) 

661 # Since we know where the input file came from, 

662 # push its directory onto the include path. 

663 self.add_to_include_path([os.path.dirname(fname)]) 

664 

665 # How we process the file depends on where the output is going. 

666 if self.options.output_name: 

667 self.process_file(fname, self.options.output_name, fname) 

668 elif self.options.replace or self.options.check: 

669 # We want to replace the cog file with the output, 

670 # but only if they differ. 

671 verb = "Cogging" if self.options.replace else "Checking" 

672 if self.options.verbosity >= 2: 

673 self.prout(f"{verb} {fname}", end="") 

674 need_newline = True 

675 

676 try: 

677 file_old_file = self.open_input_file(fname) 

678 old_text = file_old_file.read() 

679 file_old_file.close() 

680 new_text = self.process_string(old_text, fname=fname) 

681 if old_text != new_text: 

682 if self.options.verbosity >= 1: 

683 if self.options.verbosity < 2: 

684 self.prout(f"{verb} {fname}", end="") 

685 self.prout(" (changed)") 

686 need_newline = False 

687 if self.options.replace: 

688 self.replace_file(fname, new_text) 

689 else: 

690 assert self.options.check 

691 self.check_failed = True 

692 finally: 

693 # The try-finally block is so we can print a partial line 

694 # with the name of the file, and print (changed) on the 

695 # same line, but also make sure to break the line before 

696 # any traceback. 

697 if need_newline: 

698 self.prout("") 

699 else: 

700 self.process_file(fname, self.stdout, fname) 

701 finally: 

702 self.restore_include_path() 

703 

704 def process_wildcards(self, fname): 

705 files = glob.glob(fname) 

706 if files: 

707 for matching_file in files: 

708 self.process_one_file(matching_file) 

709 else: 

710 self.process_one_file(fname) 

711 

712 def process_file_list(self, file_name_list): 

713 """Process the files in a file list.""" 

714 flist = self.open_input_file(file_name_list) 

715 lines = flist.readlines() 

716 flist.close() 

717 for line in lines: 

718 # Use shlex to parse the line like a shell. 

719 lex = shlex.shlex(line, posix=True) 

720 lex.whitespace_split = True 

721 lex.commenters = "#" 

722 # No escapes, so that backslash can be part of the path 

723 lex.escape = "" 

724 args = list(lex) 

725 if args: 

726 self.process_arguments(args) 

727 

728 def process_arguments(self, args): 

729 """Process one command-line.""" 

730 saved_options = self.options 

731 self.options = self.options.clone() 

732 

733 self.options.parse_args(args[1:]) 

734 self.options.validate() 

735 

736 if args[0][0] == "@": 

737 if self.options.output_name: 

738 raise CogUsageError("Can't use -o with @file") 

739 self.process_file_list(args[0][1:]) 

740 elif args[0][0] == "&": 

741 if self.options.output_name: 

742 raise CogUsageError("Can't use -o with &file") 

743 file_list = args[0][1:] 

744 with change_dir(os.path.dirname(file_list)): 

745 self.process_file_list(os.path.basename(file_list)) 

746 else: 

747 self.process_wildcards(args[0]) 

748 

749 self.options = saved_options 

750 

751 def callable_main(self, argv): 

752 """All of command-line cog, but in a callable form. 

753 

754 This is used by main. `argv` is the equivalent of sys.argv. 

755 

756 """ 

757 argv = argv[1:] 

758 

759 # Provide help if asked for anywhere in the command line. 

760 if "-?" in argv or "-h" in argv: 

761 self.prerr(usage, end="") 

762 return 

763 

764 self.options.parse_args(argv) 

765 self.options.validate() 

766 self._fix_end_output_patterns() 

767 

768 if self.options.show_version: 

769 self.prout(f"Cog version {__version__}") 

770 return 

771 

772 if self.options.args: 

773 for a in self.options.args: 

774 self.process_arguments([a]) 

775 else: 

776 raise CogUsageError("No files to process") 

777 

778 if self.check_failed: 

779 raise CogCheckFailed("Check failed") 

780 

781 def main(self, argv): 

782 """Handle the command-line execution for cog.""" 

783 

784 try: 

785 self.callable_main(argv) 

786 return 0 

787 except CogUsageError as err: 

788 self.prerr(err) 

789 self.prerr("(for help use -h)") 

790 return 2 

791 except CogGeneratedError as err: 

792 self.prerr(f"Error: {err}") 

793 return 3 

794 except CogUserException as err: 

795 self.prerr("Traceback (most recent call last):") 

796 self.prerr(err.args[0]) 

797 return 4 

798 except CogCheckFailed as err: 

799 self.prerr(err) 

800 return 5 

801 except CogError as err: 

802 self.prerr(err) 

803 return 1 

804 

805 

806def find_cog_source(frame_summary, prologue): 

807 """Find cog source lines in a frame summary list, for printing tracebacks. 

808 

809 Arguments: 

810 frame_summary: a list of 4-item tuples, as returned by traceback.extract_tb. 

811 prologue: the text of the code prologue. 

812 

813 Returns 

814 A list of 4-item tuples, updated to correct the cog entries. 

815 

816 """ 

817 prolines = prologue.splitlines() 

818 for filename, lineno, funcname, source in frame_summary: 

819 if not source: 819 ↛ 833line 819 didn't jump to line 833 because the condition on line 819 was always true

820 m = re.search(r"^<cog ([^:]+):(\d+)>$", filename) 

821 if m: 821 ↛ 822line 821 didn't jump to line 822 because the condition on line 821 was never true

822 if lineno <= len(prolines): 

823 filename = "<prologue>" 

824 source = prolines[lineno - 1] 

825 lineno -= ( 

826 1 # Because "import cog" is the first line in the prologue 

827 ) 

828 else: 

829 filename, coglineno = m.groups() 

830 coglineno = int(coglineno) 

831 lineno += coglineno - len(prolines) 

832 source = linecache.getline(filename, lineno).strip() 

833 yield filename, lineno, funcname, source 

834 

835 

836def main(): 

837 """Main function for entry_points to use.""" 

838 return Cog().main(sys.argv)