Coverage for cogapp / cogapp.py: 100.00%
381 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-25 07:14 -0500
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-25 07:14 -0500
1"""Cog content generation tool."""
3import difflib
4import glob
5import io
6import linecache
7import os
8import re
9import shlex
10import sys
11import traceback
12import types
14from .errors import (
15 CogCheckFailed,
16 CogError,
17 CogGeneratedError,
18 CogUsageError,
19 CogUserException,
20)
21from .options import CogOptions
22from .whiteutils import common_prefix, reindent_block, white_prefix
23from .utils import NumberedFileReader, Redirectable, change_dir, md5
24from .hashhandler import HashHandler
26__version__ = "3.6.0"
29class CogGenerator(Redirectable):
30 """A generator pulled from a source file."""
32 def __init__(self, options=None):
33 super().__init__()
34 self.markers = []
35 self.lines = []
36 self.options = options or CogOptions()
38 def parse_marker(self, line):
39 self.markers.append(line)
41 def parse_line(self, line):
42 self.lines.append(line.strip("\n"))
44 def get_code(self):
45 """Extract the executable Python code from the generator."""
46 # If the markers and lines all have the same prefix
47 # (end-of-line comment chars, for example),
48 # then remove it from all the lines.
49 pref_in = common_prefix(self.markers + self.lines)
50 if pref_in:
51 self.markers = [line.replace(pref_in, "", 1) for line in self.markers]
52 self.lines = [line.replace(pref_in, "", 1) for line in self.lines]
54 return reindent_block(self.lines, "")
56 def evaluate(self, cog, globals, fname):
57 # figure out the right whitespace prefix for the output
58 pref_out = white_prefix(self.markers)
60 intext = self.get_code()
61 if not intext:
62 return ""
64 prologue = "import " + cog.cogmodulename + " as cog\n"
65 if self.options.prologue:
66 prologue += self.options.prologue + "\n"
67 code = compile(prologue + intext, str(fname), "exec")
69 # Make sure the "cog" module has our state.
70 cog.cogmodule.msg = self.msg
71 cog.cogmodule.out = self.out
72 cog.cogmodule.outl = self.outl
73 cog.cogmodule.error = self.error
75 real_stdout = sys.stdout
76 if self.options.print_output:
77 sys.stdout = captured_stdout = io.StringIO()
79 self.outstring = ""
80 try:
81 eval(code, globals)
82 except CogError:
83 raise
84 except: # noqa: E722 (we're just wrapping in CogUserException and rethrowing)
85 typ, err, tb = sys.exc_info()
86 frames = (tuple(fr) for fr in traceback.extract_tb(tb.tb_next))
87 frames = find_cog_source(frames, prologue)
88 msg = "".join(traceback.format_list(frames))
89 msg += f"{typ.__name__}: {err}"
90 raise CogUserException(msg)
91 finally:
92 sys.stdout = real_stdout
94 if self.options.print_output:
95 self.outstring = captured_stdout.getvalue()
97 # We need to make sure that the last line in the output
98 # ends with a newline, or it will be joined to the
99 # end-output line, ruining cog's idempotency.
100 if self.outstring and self.outstring[-1] != "\n":
101 self.outstring += "\n"
103 return reindent_block(self.outstring, pref_out)
105 def msg(self, s):
106 self.prout("Message: " + s)
108 def out(self, sOut="", dedent=False, trimblanklines=False):
109 """The cog.out function."""
110 if trimblanklines and ("\n" in sOut):
111 lines = sOut.split("\n")
112 if lines[0].strip() == "":
113 del lines[0]
114 if lines and lines[-1].strip() == "":
115 del lines[-1]
116 sOut = "\n".join(lines) + "\n"
117 if dedent:
118 sOut = reindent_block(sOut)
119 self.outstring += sOut
121 def outl(self, sOut="", **kw):
122 """The cog.outl function."""
123 self.out(sOut, **kw)
124 self.out("\n")
126 def error(self, msg="Error raised by cog generator."):
127 """The cog.error function.
129 Instead of raising standard python errors, cog generators can use
130 this function. It will display the error without a scary Python
131 traceback.
133 """
134 raise CogGeneratedError(msg)
137class Cog(Redirectable):
138 """The Cog engine."""
140 def __init__(self):
141 super().__init__()
142 self.options = CogOptions()
143 self.cogmodulename = "cog"
144 self.create_cog_module()
145 self.check_failed = False
146 self.hash_handler = None
147 self._fix_end_output_patterns()
149 def _fix_end_output_patterns(self):
150 self.hash_handler = HashHandler(self.options.end_output)
152 def show_warning(self, msg):
153 self.prout(f"Warning: {msg}")
155 def is_begin_spec_line(self, s):
156 return self.options.begin_spec in s
158 def is_end_spec_line(self, s):
159 return self.options.end_spec in s and not self.is_end_output_line(s)
161 def is_end_output_line(self, s):
162 return self.options.end_output in s
164 def create_cog_module(self):
165 """Make a cog "module" object.
167 Imported Python modules can use "import cog" to get our state.
169 """
170 self.cogmodule = types.SimpleNamespace()
171 self.cogmodule.path = []
173 def open_output_file(self, fname):
174 """Open an output file, taking all the details into account."""
175 opts = {}
176 mode = "w"
177 opts["encoding"] = self.options.encoding
178 opts["newline"] = self.options.newline
179 fdir = os.path.dirname(fname)
180 if os.path.dirname(fdir) and not os.path.exists(fdir):
181 os.makedirs(fdir)
182 return open(fname, mode, **opts)
184 def open_input_file(self, fname):
185 """Open an input file."""
186 if fname == "-":
187 return sys.stdin
188 else:
189 return open(fname, encoding=self.options.encoding)
191 def process_file(self, file_in, file_out, fname=None, globals=None):
192 """Process an input file object to an output file object.
194 `fileIn` and `fileOut` can be file objects, or file names.
196 """
197 file_name_in = fname or ""
198 file_name_out = fname or ""
199 file_in_to_close = file_out_to_close = None
200 # Convert filenames to files.
201 if isinstance(file_in, (bytes, str)):
202 # Open the input file.
203 file_name_in = file_in
204 file_in = file_in_to_close = self.open_input_file(file_in)
205 if isinstance(file_out, (bytes, str)):
206 # Open the output file.
207 file_name_out = file_out
208 file_out = file_out_to_close = self.open_output_file(file_out)
210 start_dir = os.getcwd()
212 try:
213 file_in = NumberedFileReader(file_in)
215 saw_cog = False
217 self.cogmodule.inFile = file_name_in
218 self.cogmodule.outFile = file_name_out
219 self.cogmodulename = "cog_" + md5(file_name_out.encode()).hexdigest()
220 sys.modules[self.cogmodulename] = self.cogmodule
221 # if "import cog" explicitly done in code by user, note threading will cause clashes.
222 sys.modules["cog"] = self.cogmodule
224 # The globals dict we'll use for this file.
225 if globals is None:
226 globals = {}
228 # If there are any global defines, put them in the globals.
229 globals.update(self.options.defines)
231 # loop over generator chunks
232 line = file_in.readline()
233 while line:
234 # Find the next spec begin
235 while line and not self.is_begin_spec_line(line):
236 if self.is_end_spec_line(line):
237 raise CogError(
238 f"Unexpected {self.options.end_spec!r}",
239 file=file_name_in,
240 line=file_in.linenumber(),
241 )
242 if self.is_end_output_line(line):
243 raise CogError(
244 f"Unexpected {self.options.end_output!r}",
245 file=file_name_in,
246 line=file_in.linenumber(),
247 )
248 file_out.write(line)
249 line = file_in.readline()
250 if not line:
251 break
252 if not self.options.delete_code:
253 file_out.write(line)
255 # `line` is the begin spec
256 gen = CogGenerator(options=self.options)
257 gen.set_output(stdout=self.stdout)
258 gen.parse_marker(line)
259 first_line_num = file_in.linenumber()
260 self.cogmodule.firstLineNum = first_line_num
262 # If the spec begin is also a spec end, then process the single
263 # line of code inside.
264 if self.is_end_spec_line(line):
265 beg = line.find(self.options.begin_spec)
266 end = line.find(self.options.end_spec)
267 if beg > end:
268 raise CogError(
269 "Cog code markers inverted",
270 file=file_name_in,
271 line=first_line_num,
272 )
273 else:
274 code = line[beg + len(self.options.begin_spec) : end].strip()
275 gen.parse_line(code)
276 else:
277 # Deal with an ordinary code block.
278 line = file_in.readline()
280 # Get all the lines in the spec
281 while line and not self.is_end_spec_line(line):
282 if self.is_begin_spec_line(line):
283 raise CogError(
284 f"Unexpected {self.options.begin_spec!r}",
285 file=file_name_in,
286 line=file_in.linenumber(),
287 )
288 if self.is_end_output_line(line):
289 raise CogError(
290 f"Unexpected {self.options.end_output!r}",
291 file=file_name_in,
292 line=file_in.linenumber(),
293 )
294 if not self.options.delete_code:
295 file_out.write(line)
296 gen.parse_line(line)
297 line = file_in.readline()
298 if not line:
299 raise CogError(
300 "Cog block begun but never ended.",
301 file=file_name_in,
302 line=first_line_num,
303 )
305 if not self.options.delete_code:
306 file_out.write(line)
307 gen.parse_marker(line)
309 line = file_in.readline()
311 # Eat all the lines in the output section. While reading past
312 # them, compute the md5 hash of the old output.
313 previous = []
314 while line and not self.is_end_output_line(line):
315 if self.is_begin_spec_line(line):
316 raise CogError(
317 f"Unexpected {self.options.begin_spec!r}",
318 file=file_name_in,
319 line=file_in.linenumber(),
320 )
321 if self.is_end_spec_line(line):
322 raise CogError(
323 f"Unexpected {self.options.end_spec!r}",
324 file=file_name_in,
325 line=file_in.linenumber(),
326 )
327 previous.append(line)
328 line = file_in.readline()
329 cur_hash = self.hash_handler.compute_lines_hash(previous)
331 if not line and not self.options.eof_can_be_end:
332 # We reached end of file before we found the end output line.
333 raise CogError(
334 f"Missing {self.options.end_output!r} before end of file.",
335 file=file_name_in,
336 line=file_in.linenumber(),
337 )
339 # Make the previous output available to the current code
340 self.cogmodule.previous = "".join(previous)
342 # Write the output of the spec to be the new output if we're
343 # supposed to generate code.
344 if not self.options.no_generate:
345 fname = f"<cog {file_name_in}:{first_line_num}>"
346 gen = gen.evaluate(cog=self, globals=globals, fname=fname)
347 gen = self.suffix_lines(gen)
348 new_hash = self.hash_handler.compute_hash(gen)
349 file_out.write(gen)
350 else:
351 new_hash = ""
353 saw_cog = True
355 # Write the ending output line
356 if self.options.hash_output:
357 try:
358 self.hash_handler.validate_hash(line, cur_hash)
359 except ValueError as e:
360 raise CogError(
361 str(e),
362 file=file_name_in,
363 line=file_in.linenumber(),
364 )
365 line = self.hash_handler.format_end_line_with_hash(
366 line,
367 new_hash,
368 add_hash=True,
369 preserve_format=self.options.check,
370 )
371 else:
372 line = self.hash_handler.format_end_line_with_hash(
373 line, new_hash, add_hash=False
374 )
376 if not self.options.delete_code:
377 file_out.write(line)
378 line = file_in.readline()
380 if not saw_cog and self.options.warn_empty:
381 self.show_warning(f"no cog code found in {file_name_in}")
382 finally:
383 if file_in_to_close:
384 file_in_to_close.close()
385 if file_out_to_close:
386 file_out_to_close.close()
387 os.chdir(start_dir)
389 # A regex for non-empty lines, used by suffixLines.
390 re_non_empty_lines = re.compile(r"^\s*\S+.*$", re.MULTILINE)
392 def suffix_lines(self, text):
393 """Add suffixes to the lines in text, if our options desire it.
395 `text` is many lines, as a single string.
397 """
398 if self.options.suffix:
399 # Find all non-blank lines, and add the suffix to the end.
400 repl = r"\g<0>" + self.options.suffix.replace("\\", "\\\\")
401 text = self.re_non_empty_lines.sub(repl, text)
402 return text
404 def process_string(self, input, fname=None):
405 """Process `input` as the text to cog.
407 Return the cogged output as a string.
409 """
410 file_old = io.StringIO(input)
411 file_new = io.StringIO()
412 self.process_file(file_old, file_new, fname=fname)
413 return file_new.getvalue()
415 def replace_file(self, old_path, new_text):
416 """Replace file oldPath with the contents newText"""
417 if not os.access(old_path, os.W_OK):
418 # Need to ensure we can write.
419 if self.options.make_writable_cmd:
420 # Use an external command to make the file writable.
421 cmd = self.options.make_writable_cmd.replace("%s", old_path)
422 with os.popen(cmd) as cmdout:
423 self.stdout.write(cmdout.read())
424 if not os.access(old_path, os.W_OK):
425 raise CogError(f"Couldn't make {old_path} writable")
426 else:
427 # Can't write!
428 raise CogError(f"Can't overwrite {old_path}")
429 f = self.open_output_file(old_path)
430 f.write(new_text)
431 f.close()
433 def save_include_path(self):
434 self.saved_include = self.options.include_path[:]
435 self.saved_sys_path = sys.path[:]
437 def restore_include_path(self):
438 self.options.include_path = self.saved_include
439 self.cogmodule.path = self.options.include_path
440 sys.path = self.saved_sys_path
442 def add_to_include_path(self, include_path):
443 self.cogmodule.path.extend(include_path)
444 sys.path.extend(include_path)
446 def process_one_file(self, fname):
447 """Process one filename through cog."""
449 self.save_include_path()
450 need_newline = False
452 try:
453 self.add_to_include_path(self.options.include_path)
454 # Since we know where the input file came from,
455 # push its directory onto the include path.
456 self.add_to_include_path([os.path.dirname(fname)])
458 # How we process the file depends on where the output is going.
459 if self.options.output_name:
460 self.process_file(fname, self.options.output_name, fname)
461 elif self.options.replace or self.options.check:
462 # We want to replace the cog file with the output,
463 # but only if they differ.
464 verb = "Cogging" if self.options.replace else "Checking"
465 if self.options.verbosity >= 2:
466 self.prout(f"{verb} {fname}", end="")
467 need_newline = True
469 try:
470 file_old_file = self.open_input_file(fname)
471 old_text = file_old_file.read()
472 file_old_file.close()
473 new_text = self.process_string(old_text, fname=fname)
474 if old_text != new_text:
475 if self.options.verbosity >= 1:
476 if self.options.verbosity < 2:
477 self.prout(f"{verb} {fname}", end="")
478 self.prout(" (changed)")
479 need_newline = False
480 if self.options.replace:
481 self.replace_file(fname, new_text)
482 else:
483 assert self.options.check
484 self.check_failed = True
485 if self.options.diff:
486 old_lines = old_text.splitlines()
487 new_lines = new_text.splitlines()
488 diff = difflib.unified_diff(
489 old_lines,
490 new_lines,
491 fromfile=f"current {fname}",
492 tofile=f"changed {fname}",
493 lineterm="",
494 )
495 for diff_line in diff:
496 self.prout(diff_line)
497 finally:
498 # The try-finally block is so we can print a partial line
499 # with the name of the file, and print (changed) on the
500 # same line, but also make sure to break the line before
501 # any traceback.
502 if need_newline:
503 self.prout("")
504 else:
505 self.process_file(fname, self.stdout, fname)
506 finally:
507 self.restore_include_path()
509 def process_wildcards(self, fname):
510 files = glob.glob(fname)
511 if files:
512 for matching_file in files:
513 self.process_one_file(matching_file)
514 else:
515 self.process_one_file(fname)
517 def process_file_list(self, file_name_list):
518 """Process the files in a file list."""
519 flist = self.open_input_file(file_name_list)
520 lines = flist.readlines()
521 flist.close()
522 for line in lines:
523 # Use shlex to parse the line like a shell.
524 lex = shlex.shlex(line, posix=True)
525 lex.whitespace_split = True
526 lex.commenters = "#"
527 # No escapes, so that backslash can be part of the path
528 lex.escape = ""
529 args = list(lex)
530 if args:
531 self.process_arguments(args)
533 def process_arguments(self, args):
534 """Process one command-line."""
535 saved_options = self.options
536 self.options = self.options.clone()
538 self.options.parse_args(args[1:])
540 if args[0][0] == "@":
541 if self.options.output_name:
542 raise CogUsageError("Can't use -o with @file")
543 self.process_file_list(args[0][1:])
544 elif args[0][0] == "&":
545 if self.options.output_name:
546 raise CogUsageError("Can't use -o with &file")
547 file_list = args[0][1:]
548 with change_dir(os.path.dirname(file_list)):
549 self.process_file_list(os.path.basename(file_list))
550 else:
551 self.process_wildcards(args[0])
553 self.options = saved_options
555 def callable_main(self, argv):
556 """All of command-line cog, but in a callable form.
558 This is used by main. `argv` is the equivalent of sys.argv.
560 """
561 argv = argv[1:]
563 # Provide help if asked for anywhere in the command line.
564 if "-?" in argv or "-h" in argv or "--help" in argv:
565 self.prerr(self.options.format_help(), end="")
566 return
568 self.options.parse_args(argv)
569 self._fix_end_output_patterns()
571 if self.options.show_version:
572 self.prout(f"Cog version {__version__}")
573 return
575 if self.options.args:
576 for a in self.options.args:
577 self.process_arguments([a])
578 else:
579 raise CogUsageError("No files to process")
581 if self.check_failed:
582 msg = "Check failed"
583 if self.options.check_fail_msg:
584 msg = f"{msg}: {self.options.check_fail_msg}"
585 raise CogCheckFailed(msg)
587 def main(self, argv):
588 """Handle the command-line execution for cog."""
590 try:
591 self.callable_main(argv)
592 return 0
593 except CogUsageError as err:
594 self.prerr(err)
595 self.prerr("(for help use --help)")
596 return 2
597 except CogGeneratedError as err:
598 self.prerr(f"Error: {err}")
599 return 3
600 except CogUserException as err:
601 self.prerr("Traceback (most recent call last):")
602 self.prerr(err.args[0])
603 return 4
604 except CogCheckFailed as err:
605 self.prerr(err)
606 return 5
607 except CogError as err:
608 self.prerr(err)
609 return 1
612def find_cog_source(frame_summary, prologue):
613 """Find cog source lines in a frame summary list, for printing tracebacks.
615 Arguments:
616 frame_summary: a list of 4-item tuples, as returned by traceback.extract_tb.
617 prologue: the text of the code prologue.
619 Returns
620 A list of 4-item tuples, updated to correct the cog entries.
622 """
623 prolines = prologue.splitlines()
624 for filename, lineno, funcname, source in frame_summary:
625 if not source:
626 m = re.search(r"^<cog ([^:]+):(\d+)>$", filename)
627 if m:
628 if lineno <= len(prolines):
629 filename = "<prologue>"
630 source = prolines[lineno - 1]
631 lineno -= (
632 1 # Because "import cog" is the first line in the prologue
633 )
634 else:
635 filename, coglineno = m.groups()
636 coglineno = int(coglineno)
637 lineno += coglineno - len(prolines)
638 source = linecache.getline(filename, lineno).strip()
639 yield filename, lineno, funcname, source
642def main():
643 """Main function for entry_points to use."""
644 return Cog().main(sys.argv)