Coverage for cogapp/cogapp.py: 46.74%

483 statements  

« prev     ^ index     » next       coverage.py v7.5.0a1.dev1, created at 2024-04-15 15:50 -0400

1""" Cog content generation tool. 

2""" 

3 

4import copy 

5import getopt 

6import glob 

7import io 

8import linecache 

9import os 

10import re 

11import shlex 

12import sys 

13import traceback 

14import types 

15 

16from .whiteutils import commonPrefix, reindentBlock, whitePrefix 

17from .utils import NumberedFileReader, Redirectable, change_dir, md5 

18 

19__version__ = "3.4.1" 

20 

21usage = """\ 

22cog - generate content with inlined Python code. 

23 

24cog [OPTIONS] [INFILE | @FILELIST | &FILELIST] ... 

25 

26INFILE is the name of an input file, '-' will read from stdin. 

27FILELIST is the name of a text file containing file names or 

28other @FILELISTs. 

29 

30For @FILELIST, paths in the file list are relative to the working 

31directory where cog was called. For &FILELIST, paths in the file 

32list are relative to the file list location. 

33 

34OPTIONS: 

35 -c Checksum the output to protect it against accidental change. 

36 -d Delete the generator code from the output file. 

37 -D name=val Define a global string available to your generator code. 

38 -e Warn if a file has no cog code in it. 

39 -I PATH Add PATH to the list of directories for data files and modules. 

40 -n ENCODING Use ENCODING when reading and writing files. 

41 -o OUTNAME Write the output to OUTNAME. 

42 -p PROLOGUE Prepend the generator source with PROLOGUE. Useful to insert an 

43 import line. Example: -p "import math" 

44 -P Use print() instead of cog.outl() for code output. 

45 -r Replace the input file with the output. 

46 -s STRING Suffix all generated output lines with STRING. 

47 -U Write the output with Unix newlines (only LF line-endings). 

48 -w CMD Use CMD if the output file needs to be made writable. 

49 A %s in the CMD will be filled with the filename. 

50 -x Excise all the generated output without running the generators. 

51 -z The end-output marker can be omitted, and is assumed at eof. 

52 -v Print the version of cog and exit. 

53 --check Check that the files would not change if run again. 

54 --markers='START END END-OUTPUT' 

55 The patterns surrounding cog inline instructions. Should 

56 include three values separated by spaces, the start, end, 

57 and end-output markers. Defaults to '[[[cog ]]] [[[end]]]'. 

58 --verbosity=VERBOSITY 

59 Control the amount of output. 2 (the default) lists all files, 

60 1 lists only changed files, 0 lists no files. 

61 -h Print this help. 

62""" 

63 

64class CogError(Exception): 

65 """ Any exception raised by Cog. 

66 """ 

67 def __init__(self, msg, file='', line=0): 

68 if file: 

69 super().__init__(f"{file}({line}): {msg}") 

70 else: 

71 super().__init__(msg) 

72 

73class CogUsageError(CogError): 

74 """ An error in usage of command-line arguments in cog. 

75 """ 

76 pass 

77 

78class CogInternalError(CogError): 

79 """ An error in the coding of Cog. Should never happen. 

80 """ 

81 pass 

82 

83class CogGeneratedError(CogError): 

84 """ An error raised by a user's cog generator. 

85 """ 

86 pass 

87 

88class CogUserException(CogError): 

89 """ An exception caught when running a user's cog generator. 

90 The argument is the traceback message to print. 

91 """ 

92 pass 

93 

94class CogCheckFailed(CogError): 

95 """ A --check failed. 

96 """ 

97 pass 

98 

99 

100class CogGenerator(Redirectable): 

101 """ A generator pulled from a source file. 

102 """ 

103 def __init__(self, options=None): 

104 super().__init__() 

105 self.markers = [] 

106 self.lines = [] 

107 self.options = options or CogOptions() 

108 

109 def parseMarker(self, l): 

110 self.markers.append(l) 

111 

112 def parseLine(self, l): 

113 self.lines.append(l.strip('\n')) 

114 

115 def getCode(self): 

116 """ Extract the executable Python code from the generator. 

117 """ 

118 # If the markers and lines all have the same prefix 

119 # (end-of-line comment chars, for example), 

120 # then remove it from all the lines. 

121 prefIn = commonPrefix(self.markers + self.lines) 

122 if prefIn: 

123 self.markers = [ l.replace(prefIn, '', 1) for l in self.markers ] 

124 self.lines = [ l.replace(prefIn, '', 1) for l in self.lines ] 

125 

126 return reindentBlock(self.lines, '') 

127 

128 def evaluate(self, cog, globals, fname): 

129 # figure out the right whitespace prefix for the output 

130 prefOut = whitePrefix(self.markers) 

131 

132 intext = self.getCode() 

133 if not intext: 

134 return '' 

135 

136 prologue = "import " + cog.cogmodulename + " as cog\n" 

137 if self.options.sPrologue: 137 ↛ 138line 137 didn't jump to line 138, because the condition on line 137 was never true

138 prologue += self.options.sPrologue + '\n' 

139 code = compile(prologue + intext, str(fname), 'exec') 

140 

141 # Make sure the "cog" module has our state. 

142 cog.cogmodule.msg = self.msg 

143 cog.cogmodule.out = self.out 

144 cog.cogmodule.outl = self.outl 

145 cog.cogmodule.error = self.error 

146 

147 real_stdout = sys.stdout 

148 if self.options.bPrintOutput: 148 ↛ 149line 148 didn't jump to line 149, because the condition on line 148 was never true

149 sys.stdout = captured_stdout = io.StringIO() 

150 

151 self.outstring = '' 

152 try: 

153 eval(code, globals) 

154 except CogError: 154 ↛ 155line 154 didn't jump to line 155, because the exception caught by line 154 didn't happen

155 raise 

156 except: 

157 typ, err, tb = sys.exc_info() 

158 frames = (tuple(fr) for fr in traceback.extract_tb(tb.tb_next)) 

159 frames = find_cog_source(frames, prologue) 

160 msg = "".join(traceback.format_list(frames)) 

161 msg += f"{typ.__name__}: {err}" 

162 raise CogUserException(msg) 

163 finally: 

164 sys.stdout = real_stdout 

165 

166 if self.options.bPrintOutput: 166 ↛ 167line 166 didn't jump to line 167, because the condition on line 166 was never true

167 self.outstring = captured_stdout.getvalue() 

168 

169 # We need to make sure that the last line in the output 

170 # ends with a newline, or it will be joined to the 

171 # end-output line, ruining cog's idempotency. 

172 if self.outstring and self.outstring[-1] != '\n': 

173 self.outstring += '\n' 

174 

175 return reindentBlock(self.outstring, prefOut) 

176 

177 def msg(self, s): 

178 self.prout("Message: "+s) 

179 

180 def out(self, sOut='', dedent=False, trimblanklines=False): 

181 """ The cog.out function. 

182 """ 

183 if trimblanklines and ('\n' in sOut): 

184 lines = sOut.split('\n') 

185 if lines[0].strip() == '': 

186 del lines[0] 

187 if lines and lines[-1].strip() == '': 

188 del lines[-1] 

189 sOut = '\n'.join(lines)+'\n' 

190 if dedent: 

191 sOut = reindentBlock(sOut) 

192 self.outstring += sOut 

193 

194 def outl(self, sOut='', **kw): 

195 """ The cog.outl function. 

196 """ 

197 self.out(sOut, **kw) 

198 self.out('\n') 

199 

200 def error(self, msg='Error raised by cog generator.'): 

201 """ The cog.error function. 

202 Instead of raising standard python errors, cog generators can use 

203 this function. It will display the error without a scary Python 

204 traceback. 

205 """ 

206 raise CogGeneratedError(msg) 

207 

208 

209class CogOptions: 

210 """ Options for a run of cog. 

211 """ 

212 def __init__(self): 

213 # Defaults for argument values. 

214 self.args = [] 

215 self.includePath = [] 

216 self.defines = {} 

217 self.bShowVersion = False 

218 self.sMakeWritableCmd = None 

219 self.bReplace = False 

220 self.bNoGenerate = False 

221 self.sOutputName = None 

222 self.bWarnEmpty = False 

223 self.bHashOutput = False 

224 self.bDeleteCode = False 

225 self.bEofCanBeEnd = False 

226 self.sSuffix = None 

227 self.bNewlines = False 

228 self.sBeginSpec = '[[[cog' 

229 self.sEndSpec = ']]]' 

230 self.sEndOutput = '[[[end]]]' 

231 self.sEncoding = "utf-8" 

232 self.verbosity = 2 

233 self.sPrologue = '' 

234 self.bPrintOutput = False 

235 self.bCheck = False 

236 

237 def __eq__(self, other): 

238 """ Comparison operator for tests to use. 

239 """ 

240 return self.__dict__ == other.__dict__ 

241 

242 def clone(self): 

243 """ Make a clone of these options, for further refinement. 

244 """ 

245 return copy.deepcopy(self) 

246 

247 def addToIncludePath(self, dirs): 

248 """ Add directories to the include path. 

249 """ 

250 dirs = dirs.split(os.pathsep) 

251 self.includePath.extend(dirs) 

252 

253 def parseArgs(self, argv): 

254 # Parse the command line arguments. 

255 try: 

256 opts, self.args = getopt.getopt( 

257 argv, 

258 'cdD:eI:n:o:rs:p:PUvw:xz', 

259 [ 

260 'check', 

261 'markers=', 

262 'verbosity=', 

263 ] 

264 ) 

265 except getopt.error as msg: 

266 raise CogUsageError(msg) 

267 

268 # Handle the command line arguments. 

269 for o, a in opts: 

270 if o == '-c': 

271 self.bHashOutput = True 

272 elif o == '-d': 

273 self.bDeleteCode = True 

274 elif o == '-D': 

275 if a.count('=') < 1: 

276 raise CogUsageError("-D takes a name=value argument") 

277 name, value = a.split('=', 1) 

278 self.defines[name] = value 

279 elif o == '-e': 

280 self.bWarnEmpty = True 

281 elif o == '-I': 

282 self.addToIncludePath(os.path.abspath(a)) 

283 elif o == '-n': 

284 self.sEncoding = a 

285 elif o == '-o': 

286 self.sOutputName = a 

287 elif o == '-r': 

288 self.bReplace = True 

289 elif o == '-s': 

290 self.sSuffix = a 

291 elif o == '-p': 

292 self.sPrologue = a 

293 elif o == '-P': 

294 self.bPrintOutput = True 

295 elif o == '-U': 

296 self.bNewlines = True 

297 elif o == '-v': 

298 self.bShowVersion = True 

299 elif o == '-w': 

300 self.sMakeWritableCmd = a 

301 elif o == '-x': 

302 self.bNoGenerate = True 

303 elif o == '-z': 

304 self.bEofCanBeEnd = True 

305 elif o == '--check': 

306 self.bCheck = True 

307 elif o == '--markers': 

308 self._parse_markers(a) 

309 elif o == '--verbosity': 

310 self.verbosity = int(a) 

311 else: 

312 # Since getopt.getopt is given a list of possible flags, 

313 # this is an internal error. 

314 raise CogInternalError(f"Don't understand argument {o}") 

315 

316 def _parse_markers(self, val): 

317 try: 

318 self.sBeginSpec, self.sEndSpec, self.sEndOutput = val.split(" ") 

319 except ValueError: 

320 raise CogUsageError( 

321 f"--markers requires 3 values separated by spaces, could not parse {val!r}" 

322 ) 

323 

324 def validate(self): 

325 """ Does nothing if everything is OK, raises CogError's if it's not. 

326 """ 

327 if self.bReplace and self.bDeleteCode: 

328 raise CogUsageError("Can't use -d with -r (or you would delete all your source!)") 

329 

330 if self.bReplace and self.sOutputName: 

331 raise CogUsageError("Can't use -o with -r (they are opposites)") 

332 

333 

334class Cog(Redirectable): 

335 """ The Cog engine. 

336 """ 

337 def __init__(self): 

338 super().__init__() 

339 self.options = CogOptions() 

340 self._fixEndOutputPatterns() 

341 self.cogmodulename = "cog" 

342 self.createCogModule() 

343 self.bCheckFailed = False 

344 

345 def _fixEndOutputPatterns(self): 

346 end_output = re.escape(self.options.sEndOutput) 

347 self.reEndOutput = re.compile(end_output + r"(?P<hashsect> *\(checksum: (?P<hash>[a-f0-9]+)\))") 

348 self.sEndFormat = self.options.sEndOutput + " (checksum: %s)" 

349 

350 def showWarning(self, msg): 

351 self.prout(f"Warning: {msg}") 

352 

353 def isBeginSpecLine(self, s): 

354 return self.options.sBeginSpec in s 

355 

356 def isEndSpecLine(self, s): 

357 return self.options.sEndSpec in s and not self.isEndOutputLine(s) 

358 

359 def isEndOutputLine(self, s): 

360 return self.options.sEndOutput in s 

361 

362 def createCogModule(self): 

363 """ Make a cog "module" object so that imported Python modules 

364 can say "import cog" and get our state. 

365 """ 

366 self.cogmodule = types.SimpleNamespace() 

367 self.cogmodule.path = [] 

368 

369 def openOutputFile(self, fname): 

370 """ Open an output file, taking all the details into account. 

371 """ 

372 opts = {} 

373 mode = "w" 

374 opts['encoding'] = self.options.sEncoding 

375 if self.options.bNewlines: 

376 opts["newline"] = "\n" 

377 fdir = os.path.dirname(fname) 

378 if os.path.dirname(fdir) and not os.path.exists(fdir): 

379 os.makedirs(fdir) 

380 return open(fname, mode, **opts) 

381 

382 def openInputFile(self, fname): 

383 """ Open an input file. 

384 """ 

385 if fname == "-": 

386 return sys.stdin 

387 else: 

388 return open(fname, encoding=self.options.sEncoding) 

389 

390 def processFile(self, fIn, fOut, fname=None, globals=None): 

391 """ Process an input file object to an output file object. 

392 fIn and fOut can be file objects, or file names. 

393 """ 

394 

395 sFileIn = fname or '' 

396 sFileOut = fname or '' 

397 fInToClose = fOutToClose = None 

398 # Convert filenames to files. 

399 if isinstance(fIn, (bytes, str)): 399 ↛ 401line 399 didn't jump to line 401, because the condition on line 399 was never true

400 # Open the input file. 

401 sFileIn = fIn 

402 fIn = fInToClose = self.openInputFile(fIn) 

403 if isinstance(fOut, (bytes, str)): 403 ↛ 405line 403 didn't jump to line 405, because the condition on line 403 was never true

404 # Open the output file. 

405 sFileOut = fOut 

406 fOut = fOutToClose = self.openOutputFile(fOut) 

407 

408 try: 

409 fIn = NumberedFileReader(fIn) 

410 

411 bSawCog = False 

412 

413 self.cogmodule.inFile = sFileIn 

414 self.cogmodule.outFile = sFileOut 

415 self.cogmodulename = 'cog_' + md5(sFileOut.encode()).hexdigest() 

416 sys.modules[self.cogmodulename] = self.cogmodule 

417 # if "import cog" explicitly done in code by user, note threading will cause clashes. 

418 sys.modules['cog'] = self.cogmodule 

419 

420 # The globals dict we'll use for this file. 

421 if globals is None: 421 ↛ 425line 421 didn't jump to line 425, because the condition on line 421 was never false

422 globals = {} 

423 

424 # If there are any global defines, put them in the globals. 

425 globals.update(self.options.defines) 

426 

427 # loop over generator chunks 

428 l = fIn.readline() 

429 while l: 

430 # Find the next spec begin 

431 while l and not self.isBeginSpecLine(l): 

432 if self.isEndSpecLine(l): 432 ↛ 433line 432 didn't jump to line 433, because the condition on line 432 was never true

433 raise CogError( 

434 f"Unexpected {self.options.sEndSpec!r}", 

435 file=sFileIn, 

436 line=fIn.linenumber(), 

437 ) 

438 if self.isEndOutputLine(l): 438 ↛ 439line 438 didn't jump to line 439, because the condition on line 438 was never true

439 raise CogError( 

440 f"Unexpected {self.options.sEndOutput!r}", 

441 file=sFileIn, 

442 line=fIn.linenumber(), 

443 ) 

444 fOut.write(l) 

445 l = fIn.readline() 

446 if not l: 

447 break 

448 if not self.options.bDeleteCode: 448 ↛ 452line 448 didn't jump to line 452, because the condition on line 448 was never false

449 fOut.write(l) 

450 

451 # l is the begin spec 

452 gen = CogGenerator(options=self.options) 

453 gen.setOutput(stdout=self.stdout) 

454 gen.parseMarker(l) 

455 firstLineNum = fIn.linenumber() 

456 self.cogmodule.firstLineNum = firstLineNum 

457 

458 # If the spec begin is also a spec end, then process the single 

459 # line of code inside. 

460 if self.isEndSpecLine(l): 

461 beg = l.find(self.options.sBeginSpec) 

462 end = l.find(self.options.sEndSpec) 

463 if beg > end: 

464 raise CogError("Cog code markers inverted", 

465 file=sFileIn, line=firstLineNum) 

466 else: 

467 sCode = l[beg+len(self.options.sBeginSpec):end].strip() 

468 gen.parseLine(sCode) 

469 else: 

470 # Deal with an ordinary code block. 

471 l = fIn.readline() 

472 

473 # Get all the lines in the spec 

474 while l and not self.isEndSpecLine(l): 

475 if self.isBeginSpecLine(l): 475 ↛ 476line 475 didn't jump to line 476, because the condition on line 475 was never true

476 raise CogError( 

477 f"Unexpected {self.options.sBeginSpec!r}", 

478 file=sFileIn, 

479 line=fIn.linenumber(), 

480 ) 

481 if self.isEndOutputLine(l): 481 ↛ 482line 481 didn't jump to line 482, because the condition on line 481 was never true

482 raise CogError( 

483 f"Unexpected {self.options.sEndOutput!r}", 

484 file=sFileIn, 

485 line=fIn.linenumber(), 

486 ) 

487 if not self.options.bDeleteCode: 487 ↛ 489line 487 didn't jump to line 489, because the condition on line 487 was never false

488 fOut.write(l) 

489 gen.parseLine(l) 

490 l = fIn.readline() 

491 if not l: 491 ↛ 492line 491 didn't jump to line 492, because the condition on line 491 was never true

492 raise CogError( 

493 "Cog block begun but never ended.", 

494 file=sFileIn, line=firstLineNum) 

495 

496 if not self.options.bDeleteCode: 496 ↛ 498line 496 didn't jump to line 498, because the condition on line 496 was never false

497 fOut.write(l) 

498 gen.parseMarker(l) 

499 

500 l = fIn.readline() 

501 

502 # Eat all the lines in the output section. While reading past 

503 # them, compute the md5 hash of the old output. 

504 previous = [] 

505 hasher = md5() 

506 while l and not self.isEndOutputLine(l): 

507 if self.isBeginSpecLine(l): 507 ↛ 508line 507 didn't jump to line 508, because the condition on line 507 was never true

508 raise CogError( 

509 f"Unexpected {self.options.sBeginSpec!r}", 

510 file=sFileIn, 

511 line=fIn.linenumber(), 

512 ) 

513 if self.isEndSpecLine(l): 513 ↛ 514line 513 didn't jump to line 514, because the condition on line 513 was never true

514 raise CogError( 

515 f"Unexpected {self.options.sEndSpec!r}", 

516 file=sFileIn, 

517 line=fIn.linenumber(), 

518 ) 

519 previous.append(l) 

520 hasher.update(l.encode("utf-8")) 

521 l = fIn.readline() 

522 curHash = hasher.hexdigest() 

523 

524 if not l and not self.options.bEofCanBeEnd: 524 ↛ 526line 524 didn't jump to line 526, because the condition on line 524 was never true

525 # We reached end of file before we found the end output line. 

526 raise CogError( 

527 f"Missing {self.options.sEndOutput!r} before end of file.", 

528 file=sFileIn, 

529 line=fIn.linenumber(), 

530 ) 

531 

532 # Make the previous output available to the current code 

533 self.cogmodule.previous = "".join(previous) 

534 

535 # Write the output of the spec to be the new output if we're 

536 # supposed to generate code. 

537 hasher = md5() 

538 if not self.options.bNoGenerate: 538 ↛ 544line 538 didn't jump to line 544, because the condition on line 538 was never false

539 sFile = f"<cog {sFileIn}:{firstLineNum}>" 

540 sGen = gen.evaluate(cog=self, globals=globals, fname=sFile) 

541 sGen = self.suffixLines(sGen) 

542 hasher.update(sGen.encode("utf-8")) 

543 fOut.write(sGen) 

544 newHash = hasher.hexdigest() 

545 

546 bSawCog = True 

547 

548 # Write the ending output line 

549 hashMatch = self.reEndOutput.search(l) 

550 if self.options.bHashOutput: 550 ↛ 551line 550 didn't jump to line 551, because the condition on line 550 was never true

551 if hashMatch: 

552 oldHash = hashMatch['hash'] 

553 if oldHash != curHash: 

554 raise CogError("Output has been edited! Delete old checksum to unprotect.", 

555 file=sFileIn, line=fIn.linenumber()) 

556 # Create a new end line with the correct hash. 

557 endpieces = l.split(hashMatch.group(0), 1) 

558 else: 

559 # There was no old hash, but we want a new hash. 

560 endpieces = l.split(self.options.sEndOutput, 1) 

561 l = (self.sEndFormat % newHash).join(endpieces) 

562 else: 

563 # We don't want hashes output, so if there was one, get rid of 

564 # it. 

565 if hashMatch: 565 ↛ 566line 565 didn't jump to line 566, because the condition on line 565 was never true

566 l = l.replace(hashMatch['hashsect'], '', 1) 

567 

568 if not self.options.bDeleteCode: 568 ↛ 570line 568 didn't jump to line 570, because the condition on line 568 was never false

569 fOut.write(l) 

570 l = fIn.readline() 

571 

572 if not bSawCog and self.options.bWarnEmpty: 572 ↛ 573line 572 didn't jump to line 573, because the condition on line 572 was never true

573 self.showWarning(f"no cog code found in {sFileIn}") 

574 finally: 

575 if fInToClose: 575 ↛ 576line 575 didn't jump to line 576, because the condition on line 575 was never true

576 fInToClose.close() 

577 if fOutToClose: 577 ↛ 578line 577 didn't jump to line 578, because the condition on line 577 was never true

578 fOutToClose.close() 

579 

580 

581 # A regex for non-empty lines, used by suffixLines. 

582 reNonEmptyLines = re.compile(r"^\s*\S+.*$", re.MULTILINE) 

583 

584 def suffixLines(self, text): 

585 """ Add suffixes to the lines in text, if our options desire it. 

586 text is many lines, as a single string. 

587 """ 

588 if self.options.sSuffix: 588 ↛ 590line 588 didn't jump to line 590, because the condition on line 588 was never true

589 # Find all non-blank lines, and add the suffix to the end. 

590 repl = r"\g<0>" + self.options.sSuffix.replace('\\', '\\\\') 

591 text = self.reNonEmptyLines.sub(repl, text) 

592 return text 

593 

594 def processString(self, sInput, fname=None): 

595 """ Process sInput as the text to cog. 

596 Return the cogged output as a string. 

597 """ 

598 fOld = io.StringIO(sInput) 

599 fNew = io.StringIO() 

600 self.processFile(fOld, fNew, fname=fname) 

601 return fNew.getvalue() 

602 

603 def replaceFile(self, sOldPath, sNewText): 

604 """ Replace file sOldPath with the contents sNewText 

605 """ 

606 if not os.access(sOldPath, os.W_OK): 

607 # Need to ensure we can write. 

608 if self.options.sMakeWritableCmd: 

609 # Use an external command to make the file writable. 

610 cmd = self.options.sMakeWritableCmd.replace('%s', sOldPath) 

611 with os.popen(cmd) as cmdout: 

612 self.stdout.write(cmdout.read()) 

613 if not os.access(sOldPath, os.W_OK): 

614 raise CogError(f"Couldn't make {sOldPath} writable") 

615 else: 

616 # Can't write! 

617 raise CogError(f"Can't overwrite {sOldPath}") 

618 f = self.openOutputFile(sOldPath) 

619 f.write(sNewText) 

620 f.close() 

621 

622 def saveIncludePath(self): 

623 self.savedInclude = self.options.includePath[:] 

624 self.savedSysPath = sys.path[:] 

625 

626 def restoreIncludePath(self): 

627 self.options.includePath = self.savedInclude 

628 self.cogmodule.path = self.options.includePath 

629 sys.path = self.savedSysPath 

630 

631 def addToIncludePath(self, includePath): 

632 self.cogmodule.path.extend(includePath) 

633 sys.path.extend(includePath) 

634 

635 def processOneFile(self, sFile): 

636 """ Process one filename through cog. 

637 """ 

638 

639 self.saveIncludePath() 

640 bNeedNewline = False 

641 

642 try: 

643 self.addToIncludePath(self.options.includePath) 

644 # Since we know where the input file came from, 

645 # push its directory onto the include path. 

646 self.addToIncludePath([os.path.dirname(sFile)]) 

647 

648 # How we process the file depends on where the output is going. 

649 if self.options.sOutputName: 

650 self.processFile(sFile, self.options.sOutputName, sFile) 

651 elif self.options.bReplace or self.options.bCheck: 

652 # We want to replace the cog file with the output, 

653 # but only if they differ. 

654 verb = "Cogging" if self.options.bReplace else "Checking" 

655 if self.options.verbosity >= 2: 

656 self.prout(f"{verb} {sFile}", end="") 

657 bNeedNewline = True 

658 

659 try: 

660 fOldFile = self.openInputFile(sFile) 

661 sOldText = fOldFile.read() 

662 fOldFile.close() 

663 sNewText = self.processString(sOldText, fname=sFile) 

664 if sOldText != sNewText: 

665 if self.options.verbosity >= 1: 

666 if self.options.verbosity < 2: 

667 self.prout(f"{verb} {sFile}", end="") 

668 self.prout(" (changed)") 

669 bNeedNewline = False 

670 if self.options.bReplace: 

671 self.replaceFile(sFile, sNewText) 

672 else: 

673 assert self.options.bCheck 

674 self.bCheckFailed = True 

675 finally: 

676 # The try-finally block is so we can print a partial line 

677 # with the name of the file, and print (changed) on the 

678 # same line, but also make sure to break the line before 

679 # any traceback. 

680 if bNeedNewline: 

681 self.prout("") 

682 else: 

683 self.processFile(sFile, self.stdout, sFile) 

684 finally: 

685 self.restoreIncludePath() 

686 

687 def processWildcards(self, sFile): 

688 files = glob.glob(sFile) 

689 if files: 

690 for sMatchingFile in files: 

691 self.processOneFile(sMatchingFile) 

692 else: 

693 self.processOneFile(sFile) 

694 

695 def processFileList(self, sFileList): 

696 """ Process the files in a file list. 

697 """ 

698 flist = self.openInputFile(sFileList) 

699 lines = flist.readlines() 

700 flist.close() 

701 for l in lines: 

702 # Use shlex to parse the line like a shell. 

703 lex = shlex.shlex(l, posix=True) 

704 lex.whitespace_split = True 

705 lex.commenters = '#' 

706 # No escapes, so that backslash can be part of the path 

707 lex.escape = '' 

708 args = list(lex) 

709 if args: 

710 self.processArguments(args) 

711 

712 def processArguments(self, args): 

713 """ Process one command-line. 

714 """ 

715 saved_options = self.options 

716 self.options = self.options.clone() 

717 

718 self.options.parseArgs(args[1:]) 

719 self.options.validate() 

720 

721 if args[0][0] == '@': 

722 if self.options.sOutputName: 

723 raise CogUsageError("Can't use -o with @file") 

724 self.processFileList(args[0][1:]) 

725 elif args[0][0] == '&': 

726 if self.options.sOutputName: 

727 raise CogUsageError("Can't use -o with &file") 

728 file_list = args[0][1:] 

729 with change_dir(os.path.dirname(file_list)): 

730 self.processFileList(os.path.basename(file_list)) 

731 else: 

732 self.processWildcards(args[0]) 

733 

734 self.options = saved_options 

735 

736 def callableMain(self, argv): 

737 """ All of command-line cog, but in a callable form. 

738 This is used by main. 

739 argv is the equivalent of sys.argv. 

740 """ 

741 argv = argv[1:] 

742 

743 # Provide help if asked for anywhere in the command line. 

744 if '-?' in argv or '-h' in argv: 

745 self.prerr(usage, end="") 

746 return 

747 

748 self.options.parseArgs(argv) 

749 self.options.validate() 

750 self._fixEndOutputPatterns() 

751 

752 if self.options.bShowVersion: 

753 self.prout(f"Cog version {__version__}") 

754 return 

755 

756 if self.options.args: 

757 for a in self.options.args: 

758 self.processArguments([a]) 

759 else: 

760 raise CogUsageError("No files to process") 

761 

762 if self.bCheckFailed: 

763 raise CogCheckFailed("Check failed") 

764 

765 def main(self, argv): 

766 """ Handle the command-line execution for cog. 

767 """ 

768 

769 try: 

770 self.callableMain(argv) 

771 return 0 

772 except CogUsageError as err: 

773 self.prerr(err) 

774 self.prerr("(for help use -h)") 

775 return 2 

776 except CogGeneratedError as err: 

777 self.prerr(f"Error: {err}") 

778 return 3 

779 except CogUserException as err: 

780 self.prerr("Traceback (most recent call last):") 

781 self.prerr(err.args[0]) 

782 return 4 

783 except CogCheckFailed as err: 

784 self.prerr(err) 

785 return 5 

786 except CogError as err: 

787 self.prerr(err) 

788 return 1 

789 

790 

791def find_cog_source(frame_summary, prologue): 

792 """Find cog source lines in a frame summary list, for printing tracebacks. 

793 

794 Arguments: 

795 frame_summary: a list of 4-item tuples, as returned by traceback.extract_tb. 

796 prologue: the text of the code prologue. 

797 

798 Returns 

799 A list of 4-item tuples, updated to correct the cog entries. 

800 

801 """ 

802 prolines = prologue.splitlines() 

803 for filename, lineno, funcname, source in frame_summary: 

804 if not source: 804 ↛ 816line 804 didn't jump to line 816, because the condition on line 804 was never false

805 m = re.search(r"^<cog ([^:]+):(\d+)>$", filename) 

806 if m: 806 ↛ 807line 806 didn't jump to line 807, because the condition on line 806 was never true

807 if lineno <= len(prolines): 

808 filename = '<prologue>' 

809 source = prolines[lineno-1] 

810 lineno -= 1 # Because "import cog" is the first line in the prologue 

811 else: 

812 filename, coglineno = m.groups() 

813 coglineno = int(coglineno) 

814 lineno += coglineno - len(prolines) 

815 source = linecache.getline(filename, lineno).strip() 

816 yield filename, lineno, funcname, source 

817 

818 

819def main(): 

820 """Main function for entry_points to use.""" 

821 return Cog().main(sys.argv)