Coverage for cogapp/hashhandler.py: 36.78%
63 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-13 08:29 -0400
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-13 08:29 -0400
1"""Hash handling for cog output verification."""
3import base64
4import re
5from .utils import md5
8class HashHandler:
9 """Handles checksum generation and verification for cog output."""
11 def __init__(self, end_output_marker):
12 """Initialize the hash handler with the end output marker pattern.
14 Args:
15 end_output_marker: The end output marker string (e.g., "[[[end]]]")
16 """
17 self.end_output_marker = end_output_marker
18 self._setup_patterns()
20 def _setup_patterns(self):
21 """Set up regex patterns for hash detection and formatting."""
22 end_output = re.escape(self.end_output_marker)
23 # Support both old format (checksum: 32-char hex) and new format (sum: 10-char base64)
24 self.re_end_output_with_hash = re.compile(
25 end_output
26 + r"(?P<hashsect> *\((?:checksum: (?P<hash>[a-f0-9]{32})|sum: (?P<b64hash>[A-Za-z0-9+/]{10}))\))"
27 )
28 self.end_format = self.end_output_marker + " (sum: %s)"
30 def compute_hash(self, content):
31 """Compute MD5 hash of the given content.
33 Args:
34 content: String content to hash
36 Returns:
37 str: Hexadecimal hash digest
38 """
39 hasher = md5()
40 hasher.update(content.encode("utf-8"))
41 return hasher.hexdigest()
43 def compute_lines_hash(self, lines):
44 """Compute MD5 hash of a list of lines.
46 Args:
47 lines: List of line strings
49 Returns:
50 str: Hexadecimal hash digest
51 """
52 hasher = md5()
53 for line in lines:
54 hasher.update(line.encode("utf-8"))
55 return hasher.hexdigest()
57 def hex_to_base64_hash(self, hex_hash):
58 """Convert a 32-character hex hash to a 10-character base64 hash.
60 Args:
61 hex_hash: 32-character hexadecimal hash string
63 Returns:
64 str: 10-character base64 hash string
65 """
66 # Convert hex to bytes
67 hash_bytes = bytes.fromhex(hex_hash)
68 # Encode to base64 and take first 10 characters
69 b64_hash = base64.b64encode(hash_bytes).decode("ascii")[:10]
70 return b64_hash
72 def extract_hash_from_line(self, line):
73 """Extract hash from an end output line if present.
75 Args:
76 line: The end output line to check
78 Returns:
79 tuple: (hash_type, hash_value) where hash_type is 'hex' or 'base64'
80 and hash_value is the raw hash value, or (None, None) if not found
81 """
82 hash_match = self.re_end_output_with_hash.search(line)
83 if hash_match:
84 # Check which format was matched
85 if hash_match.group("hash"):
86 # Old format: checksum with hex
87 return ("hex", hash_match.group("hash"))
88 else:
89 # New format: sum with base64
90 assert hash_match.group("b64hash"), (
91 "Regex matched but no hash group found"
92 )
93 return ("base64", hash_match.group("b64hash"))
94 return (None, None)
96 def validate_hash(self, line, expected_hash):
97 """Validate that the hash in the line matches the expected hash.
99 Args:
100 line: The end output line containing the hash
101 expected_hash: The expected hash value (hex format)
103 Returns:
104 bool: True if hash matches or no hash present, False if mismatch
106 Raises:
107 ValueError: If hash is present but doesn't match expected
108 """
109 hash_type, old_hash = self.extract_hash_from_line(line)
110 if hash_type is not None:
111 if hash_type == "hex":
112 # Compare hex directly
113 if old_hash != expected_hash:
114 raise ValueError(
115 "Output has been edited! Delete old checksum to unprotect."
116 )
117 else:
118 # Convert expected hex to base64 and compare
119 assert hash_type == "base64", f"Unknown hash type: {hash_type}"
120 expected_b64 = self.hex_to_base64_hash(expected_hash)
121 if old_hash != expected_b64:
122 raise ValueError(
123 "Output has been edited! Delete old checksum to unprotect."
124 )
125 return True
127 def format_end_line_with_hash(
128 self, line, new_hash, add_hash=True, preserve_format=False
129 ):
130 """Format the end output line with or without hash.
132 Args:
133 line: The original end output line
134 new_hash: The hash to add if add_hash is True (hex format)
135 add_hash: Whether to add hash to the output
136 preserve_format: If True and an existing hash is found, preserve its format
138 Returns:
139 str: The formatted end output line
140 """
141 hash_match = self.re_end_output_with_hash.search(line)
143 if add_hash: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 if preserve_format and hash_match:
145 # Preserve the original format
146 hash_type, old_hash = self.extract_hash_from_line(line)
147 if hash_type == "hex":
148 # Keep hex format
149 formatted_hash = f" (checksum: {new_hash})"
150 else:
151 # Keep base64 format
152 assert hash_type == "base64", f"Unknown hash type: {hash_type}"
153 b64_hash = self.hex_to_base64_hash(new_hash)
154 formatted_hash = f" (sum: {b64_hash})"
156 # Replace the hash section
157 endpieces = line.split(hash_match.group(0), 1)
158 line = (self.end_output_marker + formatted_hash).join(endpieces)
159 else:
160 # Use new format
161 b64_hash = self.hex_to_base64_hash(new_hash)
163 if hash_match:
164 # Replace existing hash
165 endpieces = line.split(hash_match.group(0), 1)
166 else:
167 # Add new hash
168 endpieces = line.split(self.end_output_marker, 1)
169 line = (self.end_format % b64_hash).join(endpieces)
170 else:
171 # Remove hash if present
172 if hash_match: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true
173 line = line.replace(hash_match["hashsect"], "", 1)
175 return line