Coverage for cogapp / cogapp.py: 50.29%
379 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-09 06:46 -0500
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-09 06:46 -0500
1"""Cog content generation tool."""
3import difflib
4import glob
5import io
6import linecache
7import os
8import re
9import shlex
10import sys
11import traceback
12import types
14from .errors import (
15 CogCheckFailed,
16 CogError,
17 CogGeneratedError,
18 CogUsageError,
19 CogUserException,
20)
21from .options import CogOptions
22from .whiteutils import common_prefix, reindent_block, white_prefix
23from .utils import NumberedFileReader, Redirectable, change_dir, md5
24from .hashhandler import HashHandler
26__version__ = "3.6.0"
29class CogGenerator(Redirectable):
30 """A generator pulled from a source file."""
32 def __init__(self, options=None):
33 super().__init__()
34 self.markers = []
35 self.lines = []
36 self.options = options or CogOptions()
38 def parse_marker(self, line):
39 self.markers.append(line)
41 def parse_line(self, line):
42 self.lines.append(line.strip("\n"))
44 def get_code(self):
45 """Extract the executable Python code from the generator."""
46 # If the markers and lines all have the same prefix
47 # (end-of-line comment chars, for example),
48 # then remove it from all the lines.
49 pref_in = common_prefix(self.markers + self.lines)
50 if pref_in:
51 self.markers = [line.replace(pref_in, "", 1) for line in self.markers]
52 self.lines = [line.replace(pref_in, "", 1) for line in self.lines]
54 return reindent_block(self.lines, "")
56 def evaluate(self, cog, globals, fname):
57 # figure out the right whitespace prefix for the output
58 pref_out = white_prefix(self.markers)
60 intext = self.get_code()
61 if not intext:
62 return ""
64 prologue = "import " + cog.cogmodulename + " as cog\n"
65 if self.options.prologue: 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true
66 prologue += self.options.prologue + "\n"
67 code = compile(prologue + intext, str(fname), "exec")
69 # Make sure the "cog" module has our state.
70 cog.cogmodule.msg = self.msg
71 cog.cogmodule.out = self.out
72 cog.cogmodule.outl = self.outl
73 cog.cogmodule.error = self.error
75 real_stdout = sys.stdout
76 if self.options.print_output: 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true
77 sys.stdout = captured_stdout = io.StringIO()
79 self.outstring = ""
80 try:
81 eval(code, globals)
82 except CogError:
83 raise
84 except: # noqa: E722 (we're just wrapping in CogUserException and rethrowing)
85 typ, err, tb = sys.exc_info()
86 frames = (tuple(fr) for fr in traceback.extract_tb(tb.tb_next))
87 frames = find_cog_source(frames, prologue)
88 msg = "".join(traceback.format_list(frames))
89 msg += f"{typ.__name__}: {err}"
90 raise CogUserException(msg)
91 finally:
92 sys.stdout = real_stdout
94 if self.options.print_output: 94 ↛ 95line 94 didn't jump to line 95 because the condition on line 94 was never true
95 self.outstring = captured_stdout.getvalue()
97 # We need to make sure that the last line in the output
98 # ends with a newline, or it will be joined to the
99 # end-output line, ruining cog's idempotency.
100 if self.outstring and self.outstring[-1] != "\n":
101 self.outstring += "\n"
103 return reindent_block(self.outstring, pref_out)
105 def msg(self, s):
106 self.prout("Message: " + s)
108 def out(self, sOut="", dedent=False, trimblanklines=False):
109 """The cog.out function."""
110 if trimblanklines and ("\n" in sOut):
111 lines = sOut.split("\n")
112 if lines[0].strip() == "":
113 del lines[0]
114 if lines and lines[-1].strip() == "":
115 del lines[-1]
116 sOut = "\n".join(lines) + "\n"
117 if dedent:
118 sOut = reindent_block(sOut)
119 self.outstring += sOut
121 def outl(self, sOut="", **kw):
122 """The cog.outl function."""
123 self.out(sOut, **kw)
124 self.out("\n")
126 def error(self, msg="Error raised by cog generator."):
127 """The cog.error function.
129 Instead of raising standard python errors, cog generators can use
130 this function. It will display the error without a scary Python
131 traceback.
133 """
134 raise CogGeneratedError(msg)
137class Cog(Redirectable):
138 """The Cog engine."""
140 def __init__(self):
141 super().__init__()
142 self.options = CogOptions()
143 self.cogmodulename = "cog"
144 self.create_cog_module()
145 self.check_failed = False
146 self.hash_handler = None
147 self._fix_end_output_patterns()
149 def _fix_end_output_patterns(self):
150 self.hash_handler = HashHandler(self.options.end_output)
152 def show_warning(self, msg):
153 self.prout(f"Warning: {msg}")
155 def is_begin_spec_line(self, s):
156 return self.options.begin_spec in s
158 def is_end_spec_line(self, s):
159 return self.options.end_spec in s and not self.is_end_output_line(s)
161 def is_end_output_line(self, s):
162 return self.options.end_output in s
164 def create_cog_module(self):
165 """Make a cog "module" object.
167 Imported Python modules can use "import cog" to get our state.
169 """
170 self.cogmodule = types.SimpleNamespace()
171 self.cogmodule.path = []
173 def open_output_file(self, fname):
174 """Open an output file, taking all the details into account."""
175 opts = {}
176 mode = "w"
177 opts["encoding"] = self.options.encoding
178 opts["newline"] = self.options.newline
179 fdir = os.path.dirname(fname)
180 if os.path.dirname(fdir) and not os.path.exists(fdir):
181 os.makedirs(fdir)
182 return open(fname, mode, **opts)
184 def open_input_file(self, fname):
185 """Open an input file."""
186 if fname == "-":
187 return sys.stdin
188 else:
189 return open(fname, encoding=self.options.encoding)
191 def process_file(self, file_in, file_out, fname=None, globals=None):
192 """Process an input file object to an output file object.
194 `fileIn` and `fileOut` can be file objects, or file names.
196 """
197 file_name_in = fname or ""
198 file_name_out = fname or ""
199 file_in_to_close = file_out_to_close = None
200 # Convert filenames to files.
201 if isinstance(file_in, (bytes, str)): 201 ↛ 203line 201 didn't jump to line 203 because the condition on line 201 was never true
202 # Open the input file.
203 file_name_in = file_in
204 file_in = file_in_to_close = self.open_input_file(file_in)
205 if isinstance(file_out, (bytes, str)): 205 ↛ 207line 205 didn't jump to line 207 because the condition on line 205 was never true
206 # Open the output file.
207 file_name_out = file_out
208 file_out = file_out_to_close = self.open_output_file(file_out)
210 start_dir = os.getcwd()
212 try:
213 file_in = NumberedFileReader(file_in)
215 saw_cog = False
217 self.cogmodule.inFile = file_name_in
218 self.cogmodule.outFile = file_name_out
219 self.cogmodulename = "cog_" + md5(file_name_out.encode()).hexdigest()
220 sys.modules[self.cogmodulename] = self.cogmodule
221 # if "import cog" explicitly done in code by user, note threading will cause clashes.
222 sys.modules["cog"] = self.cogmodule
224 # The globals dict we'll use for this file.
225 if globals is None: 225 ↛ 229line 225 didn't jump to line 229 because the condition on line 225 was always true
226 globals = {}
228 # If there are any global defines, put them in the globals.
229 globals.update(self.options.defines)
231 # loop over generator chunks
232 line = file_in.readline()
233 while line:
234 # Find the next spec begin
235 while line and not self.is_begin_spec_line(line):
236 if self.is_end_output_line(line): 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true
237 raise CogError(
238 f"Unexpected {self.options.end_output!r}",
239 file=file_name_in,
240 line=file_in.linenumber(),
241 )
242 file_out.write(line)
243 line = file_in.readline()
244 if not line:
245 break
246 if not self.options.delete_code: 246 ↛ 250line 246 didn't jump to line 250 because the condition on line 246 was always true
247 file_out.write(line)
249 # `line` is the begin spec
250 gen = CogGenerator(options=self.options)
251 gen.set_output(stdout=self.stdout)
252 gen.parse_marker(line)
253 first_line_num = file_in.linenumber()
254 self.cogmodule.firstLineNum = first_line_num
256 # If the spec begin is also a spec end, then process the single
257 # line of code inside.
258 if self.is_end_spec_line(line):
259 beg = line.find(self.options.begin_spec)
260 end = line.find(self.options.end_spec)
261 if beg > end:
262 raise CogError(
263 "Cog code markers inverted",
264 file=file_name_in,
265 line=first_line_num,
266 )
267 else:
268 code = line[beg + len(self.options.begin_spec) : end].strip()
269 gen.parse_line(code)
270 else:
271 # Deal with an ordinary code block.
272 line = file_in.readline()
274 # Get all the lines in the spec
275 while line and not self.is_end_spec_line(line):
276 if self.is_begin_spec_line(line): 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true
277 raise CogError(
278 f"Unexpected {self.options.begin_spec!r}",
279 file=file_name_in,
280 line=file_in.linenumber(),
281 )
282 if self.is_end_output_line(line): 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true
283 raise CogError(
284 f"Unexpected {self.options.end_output!r}",
285 file=file_name_in,
286 line=file_in.linenumber(),
287 )
288 if not self.options.delete_code: 288 ↛ 290line 288 didn't jump to line 290 because the condition on line 288 was always true
289 file_out.write(line)
290 gen.parse_line(line)
291 line = file_in.readline()
292 if not line: 292 ↛ 293line 292 didn't jump to line 293 because the condition on line 292 was never true
293 raise CogError(
294 "Cog block begun but never ended.",
295 file=file_name_in,
296 line=first_line_num,
297 )
299 if not self.options.delete_code: 299 ↛ 301line 299 didn't jump to line 301 because the condition on line 299 was always true
300 file_out.write(line)
301 gen.parse_marker(line)
303 line = file_in.readline()
305 # Eat all the lines in the output section. While reading past
306 # them, compute the md5 hash of the old output.
307 previous = []
308 while line and not self.is_end_output_line(line):
309 if self.is_begin_spec_line(line): 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true
310 raise CogError(
311 f"Unexpected {self.options.begin_spec!r}",
312 file=file_name_in,
313 line=file_in.linenumber(),
314 )
315 if self.is_end_spec_line(line): 315 ↛ 316line 315 didn't jump to line 316 because the condition on line 315 was never true
316 raise CogError(
317 f"Unexpected {self.options.end_spec!r}",
318 file=file_name_in,
319 line=file_in.linenumber(),
320 )
321 previous.append(line)
322 line = file_in.readline()
323 cur_hash = self.hash_handler.compute_lines_hash(previous)
325 if not line and not self.options.eof_can_be_end: 325 ↛ 327line 325 didn't jump to line 327 because the condition on line 325 was never true
326 # We reached end of file before we found the end output line.
327 raise CogError(
328 f"Missing {self.options.end_output!r} before end of file.",
329 file=file_name_in,
330 line=file_in.linenumber(),
331 )
333 # Make the previous output available to the current code
334 self.cogmodule.previous = "".join(previous)
336 # Write the output of the spec to be the new output if we're
337 # supposed to generate code.
338 if not self.options.no_generate: 338 ↛ 345line 338 didn't jump to line 345 because the condition on line 338 was always true
339 fname = f"<cog {file_name_in}:{first_line_num}>"
340 gen = gen.evaluate(cog=self, globals=globals, fname=fname)
341 gen = self.suffix_lines(gen)
342 new_hash = self.hash_handler.compute_hash(gen)
343 file_out.write(gen)
344 else:
345 new_hash = ""
347 saw_cog = True
349 # Write the ending output line
350 if self.options.hash_output: 350 ↛ 351line 350 didn't jump to line 351 because the condition on line 350 was never true
351 try:
352 self.hash_handler.validate_hash(line, cur_hash)
353 except ValueError as e:
354 raise CogError(
355 str(e),
356 file=file_name_in,
357 line=file_in.linenumber(),
358 )
359 line = self.hash_handler.format_end_line_with_hash(
360 line,
361 new_hash,
362 add_hash=True,
363 preserve_format=self.options.check,
364 )
365 else:
366 line = self.hash_handler.format_end_line_with_hash(
367 line, new_hash, add_hash=False
368 )
370 if not self.options.delete_code: 370 ↛ 372line 370 didn't jump to line 372 because the condition on line 370 was always true
371 file_out.write(line)
372 line = file_in.readline()
374 if not saw_cog and self.options.warn_empty: 374 ↛ 375line 374 didn't jump to line 375 because the condition on line 374 was never true
375 self.show_warning(f"no cog code found in {file_name_in}")
376 finally:
377 if file_in_to_close: 377 ↛ 378line 377 didn't jump to line 378 because the condition on line 377 was never true
378 file_in_to_close.close()
379 if file_out_to_close: 379 ↛ 380line 379 didn't jump to line 380 because the condition on line 379 was never true
380 file_out_to_close.close()
381 os.chdir(start_dir)
383 # A regex for non-empty lines, used by suffixLines.
384 re_non_empty_lines = re.compile(r"^\s*\S+.*$", re.MULTILINE)
386 def suffix_lines(self, text):
387 """Add suffixes to the lines in text, if our options desire it.
389 `text` is many lines, as a single string.
391 """
392 if self.options.suffix: 392 ↛ 394line 392 didn't jump to line 394 because the condition on line 392 was never true
393 # Find all non-blank lines, and add the suffix to the end.
394 repl = r"\g<0>" + self.options.suffix.replace("\\", "\\\\")
395 text = self.re_non_empty_lines.sub(repl, text)
396 return text
398 def process_string(self, input, fname=None):
399 """Process `input` as the text to cog.
401 Return the cogged output as a string.
403 """
404 file_old = io.StringIO(input)
405 file_new = io.StringIO()
406 self.process_file(file_old, file_new, fname=fname)
407 return file_new.getvalue()
409 def replace_file(self, old_path, new_text):
410 """Replace file oldPath with the contents newText"""
411 if not os.access(old_path, os.W_OK):
412 # Need to ensure we can write.
413 if self.options.make_writable_cmd:
414 # Use an external command to make the file writable.
415 cmd = self.options.make_writable_cmd.replace("%s", old_path)
416 with os.popen(cmd) as cmdout:
417 self.stdout.write(cmdout.read())
418 if not os.access(old_path, os.W_OK):
419 raise CogError(f"Couldn't make {old_path} writable")
420 else:
421 # Can't write!
422 raise CogError(f"Can't overwrite {old_path}")
423 f = self.open_output_file(old_path)
424 f.write(new_text)
425 f.close()
427 def save_include_path(self):
428 self.saved_include = self.options.include_path[:]
429 self.saved_sys_path = sys.path[:]
431 def restore_include_path(self):
432 self.options.include_path = self.saved_include
433 self.cogmodule.path = self.options.include_path
434 sys.path = self.saved_sys_path
436 def add_to_include_path(self, include_path):
437 self.cogmodule.path.extend(include_path)
438 sys.path.extend(include_path)
440 def process_one_file(self, fname):
441 """Process one filename through cog."""
443 self.save_include_path()
444 need_newline = False
446 try:
447 self.add_to_include_path(self.options.include_path)
448 # Since we know where the input file came from,
449 # push its directory onto the include path.
450 self.add_to_include_path([os.path.dirname(fname)])
452 # How we process the file depends on where the output is going.
453 if self.options.output_name:
454 self.process_file(fname, self.options.output_name, fname)
455 elif self.options.replace or self.options.check:
456 # We want to replace the cog file with the output,
457 # but only if they differ.
458 verb = "Cogging" if self.options.replace else "Checking"
459 if self.options.verbosity >= 2:
460 self.prout(f"{verb} {fname}", end="")
461 need_newline = True
463 try:
464 file_old_file = self.open_input_file(fname)
465 old_text = file_old_file.read()
466 file_old_file.close()
467 new_text = self.process_string(old_text, fname=fname)
468 if old_text != new_text:
469 if self.options.verbosity >= 1:
470 if self.options.verbosity < 2:
471 self.prout(f"{verb} {fname}", end="")
472 self.prout(" (changed)")
473 need_newline = False
474 if self.options.replace:
475 self.replace_file(fname, new_text)
476 else:
477 assert self.options.check
478 self.check_failed = True
479 if self.options.diff:
480 old_lines = old_text.splitlines()
481 new_lines = new_text.splitlines()
482 diff = difflib.unified_diff(
483 old_lines,
484 new_lines,
485 fromfile=f"current {fname}",
486 tofile=f"changed {fname}",
487 lineterm="",
488 )
489 for diff_line in diff:
490 self.prout(diff_line)
491 finally:
492 # The try-finally block is so we can print a partial line
493 # with the name of the file, and print (changed) on the
494 # same line, but also make sure to break the line before
495 # any traceback.
496 if need_newline:
497 self.prout("")
498 else:
499 self.process_file(fname, self.stdout, fname)
500 finally:
501 self.restore_include_path()
503 def process_wildcards(self, fname):
504 files = glob.glob(fname)
505 if files:
506 for matching_file in files:
507 self.process_one_file(matching_file)
508 else:
509 self.process_one_file(fname)
511 def process_file_list(self, file_name_list):
512 """Process the files in a file list."""
513 flist = self.open_input_file(file_name_list)
514 lines = flist.readlines()
515 flist.close()
516 for line in lines:
517 # Use shlex to parse the line like a shell.
518 lex = shlex.shlex(line, posix=True)
519 lex.whitespace_split = True
520 lex.commenters = "#"
521 # No escapes, so that backslash can be part of the path
522 lex.escape = ""
523 args = list(lex)
524 if args:
525 self.process_arguments(args)
527 def process_arguments(self, args):
528 """Process one command-line."""
529 saved_options = self.options
530 self.options = self.options.clone()
532 self.options.parse_args(args[1:])
534 if args[0][0] == "@":
535 if self.options.output_name:
536 raise CogUsageError("Can't use -o with @file")
537 self.process_file_list(args[0][1:])
538 elif args[0][0] == "&":
539 if self.options.output_name:
540 raise CogUsageError("Can't use -o with &file")
541 file_list = args[0][1:]
542 with change_dir(os.path.dirname(file_list)):
543 self.process_file_list(os.path.basename(file_list))
544 else:
545 self.process_wildcards(args[0])
547 self.options = saved_options
549 def callable_main(self, argv):
550 """All of command-line cog, but in a callable form.
552 This is used by main. `argv` is the equivalent of sys.argv.
554 """
555 argv = argv[1:]
557 # Provide help if asked for anywhere in the command line.
558 if "-?" in argv or "-h" in argv or "--help" in argv:
559 self.prerr(self.options.format_help(), end="")
560 return
562 self.options.parse_args(argv)
563 self._fix_end_output_patterns()
565 if self.options.show_version:
566 self.prout(f"Cog version {__version__}")
567 return
569 if self.options.args:
570 for a in self.options.args:
571 self.process_arguments([a])
572 else:
573 raise CogUsageError("No files to process")
575 if self.check_failed:
576 msg = "Check failed"
577 if self.options.check_fail_msg:
578 msg = f"{msg}: {self.options.check_fail_msg}"
579 raise CogCheckFailed(msg)
581 def main(self, argv):
582 """Handle the command-line execution for cog."""
584 try:
585 self.callable_main(argv)
586 return 0
587 except CogUsageError as err:
588 self.prerr(err)
589 self.prerr("(for help use --help)")
590 return 2
591 except CogGeneratedError as err:
592 self.prerr(f"Error: {err}")
593 return 3
594 except CogUserException as err:
595 self.prerr("Traceback (most recent call last):")
596 self.prerr(err.args[0])
597 return 4
598 except CogCheckFailed as err:
599 self.prerr(err)
600 return 5
601 except CogError as err:
602 self.prerr(err)
603 return 1
606def find_cog_source(frame_summary, prologue):
607 """Find cog source lines in a frame summary list, for printing tracebacks.
609 Arguments:
610 frame_summary: a list of 4-item tuples, as returned by traceback.extract_tb.
611 prologue: the text of the code prologue.
613 Returns
614 A list of 4-item tuples, updated to correct the cog entries.
616 """
617 prolines = prologue.splitlines()
618 for filename, lineno, funcname, source in frame_summary:
619 if not source: 619 ↛ 633line 619 didn't jump to line 633 because the condition on line 619 was always true
620 m = re.search(r"^<cog ([^:]+):(\d+)>$", filename)
621 if m: 621 ↛ 622line 621 didn't jump to line 622 because the condition on line 621 was never true
622 if lineno <= len(prolines):
623 filename = "<prologue>"
624 source = prolines[lineno - 1]
625 lineno -= (
626 1 # Because "import cog" is the first line in the prologue
627 )
628 else:
629 filename, coglineno = m.groups()
630 coglineno = int(coglineno)
631 lineno += coglineno - len(prolines)
632 source = linecache.getline(filename, lineno).strip()
633 yield filename, lineno, funcname, source
636def main():
637 """Main function for entry_points to use."""
638 return Cog().main(sys.argv)