Coverage for cogapp/hashhandler.py: 36.78%

63 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-13 08:29 -0400

1"""Hash handling for cog output verification.""" 

2 

3import base64 

4import re 

5from .utils import md5 

6 

7 

8class HashHandler: 

9 """Handles checksum generation and verification for cog output.""" 

10 

11 def __init__(self, end_output_marker): 

12 """Initialize the hash handler with the end output marker pattern. 

13 

14 Args: 

15 end_output_marker: The end output marker string (e.g., "[[[end]]]") 

16 """ 

17 self.end_output_marker = end_output_marker 

18 self._setup_patterns() 

19 

20 def _setup_patterns(self): 

21 """Set up regex patterns for hash detection and formatting.""" 

22 end_output = re.escape(self.end_output_marker) 

23 # Support both old format (checksum: 32-char hex) and new format (sum: 10-char base64) 

24 self.re_end_output_with_hash = re.compile( 

25 end_output 

26 + r"(?P<hashsect> *\((?:checksum: (?P<hash>[a-f0-9]{32})|sum: (?P<b64hash>[A-Za-z0-9+/]{10}))\))" 

27 ) 

28 self.end_format = self.end_output_marker + " (sum: %s)" 

29 

30 def compute_hash(self, content): 

31 """Compute MD5 hash of the given content. 

32 

33 Args: 

34 content: String content to hash 

35 

36 Returns: 

37 str: Hexadecimal hash digest 

38 """ 

39 hasher = md5() 

40 hasher.update(content.encode("utf-8")) 

41 return hasher.hexdigest() 

42 

43 def compute_lines_hash(self, lines): 

44 """Compute MD5 hash of a list of lines. 

45 

46 Args: 

47 lines: List of line strings 

48 

49 Returns: 

50 str: Hexadecimal hash digest 

51 """ 

52 hasher = md5() 

53 for line in lines: 

54 hasher.update(line.encode("utf-8")) 

55 return hasher.hexdigest() 

56 

57 def hex_to_base64_hash(self, hex_hash): 

58 """Convert a 32-character hex hash to a 10-character base64 hash. 

59 

60 Args: 

61 hex_hash: 32-character hexadecimal hash string 

62 

63 Returns: 

64 str: 10-character base64 hash string 

65 """ 

66 # Convert hex to bytes 

67 hash_bytes = bytes.fromhex(hex_hash) 

68 # Encode to base64 and take first 10 characters 

69 b64_hash = base64.b64encode(hash_bytes).decode("ascii")[:10] 

70 return b64_hash 

71 

72 def extract_hash_from_line(self, line): 

73 """Extract hash from an end output line if present. 

74 

75 Args: 

76 line: The end output line to check 

77 

78 Returns: 

79 tuple: (hash_type, hash_value) where hash_type is 'hex' or 'base64' 

80 and hash_value is the raw hash value, or (None, None) if not found 

81 """ 

82 hash_match = self.re_end_output_with_hash.search(line) 

83 if hash_match: 

84 # Check which format was matched 

85 if hash_match.group("hash"): 

86 # Old format: checksum with hex 

87 return ("hex", hash_match.group("hash")) 

88 else: 

89 # New format: sum with base64 

90 assert hash_match.group("b64hash"), ( 

91 "Regex matched but no hash group found" 

92 ) 

93 return ("base64", hash_match.group("b64hash")) 

94 return (None, None) 

95 

96 def validate_hash(self, line, expected_hash): 

97 """Validate that the hash in the line matches the expected hash. 

98 

99 Args: 

100 line: The end output line containing the hash 

101 expected_hash: The expected hash value (hex format) 

102 

103 Returns: 

104 bool: True if hash matches or no hash present, False if mismatch 

105 

106 Raises: 

107 ValueError: If hash is present but doesn't match expected 

108 """ 

109 hash_type, old_hash = self.extract_hash_from_line(line) 

110 if hash_type is not None: 

111 if hash_type == "hex": 

112 # Compare hex directly 

113 if old_hash != expected_hash: 

114 raise ValueError( 

115 "Output has been edited! Delete old checksum to unprotect." 

116 ) 

117 else: 

118 # Convert expected hex to base64 and compare 

119 assert hash_type == "base64", f"Unknown hash type: {hash_type}" 

120 expected_b64 = self.hex_to_base64_hash(expected_hash) 

121 if old_hash != expected_b64: 

122 raise ValueError( 

123 "Output has been edited! Delete old checksum to unprotect." 

124 ) 

125 return True 

126 

127 def format_end_line_with_hash( 

128 self, line, new_hash, add_hash=True, preserve_format=False 

129 ): 

130 """Format the end output line with or without hash. 

131 

132 Args: 

133 line: The original end output line 

134 new_hash: The hash to add if add_hash is True (hex format) 

135 add_hash: Whether to add hash to the output 

136 preserve_format: If True and an existing hash is found, preserve its format 

137 

138 Returns: 

139 str: The formatted end output line 

140 """ 

141 hash_match = self.re_end_output_with_hash.search(line) 

142 

143 if add_hash: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 if preserve_format and hash_match: 

145 # Preserve the original format 

146 hash_type, old_hash = self.extract_hash_from_line(line) 

147 if hash_type == "hex": 

148 # Keep hex format 

149 formatted_hash = f" (checksum: {new_hash})" 

150 else: 

151 # Keep base64 format 

152 assert hash_type == "base64", f"Unknown hash type: {hash_type}" 

153 b64_hash = self.hex_to_base64_hash(new_hash) 

154 formatted_hash = f" (sum: {b64_hash})" 

155 

156 # Replace the hash section 

157 endpieces = line.split(hash_match.group(0), 1) 

158 line = (self.end_output_marker + formatted_hash).join(endpieces) 

159 else: 

160 # Use new format 

161 b64_hash = self.hex_to_base64_hash(new_hash) 

162 

163 if hash_match: 

164 # Replace existing hash 

165 endpieces = line.split(hash_match.group(0), 1) 

166 else: 

167 # Add new hash 

168 endpieces = line.split(self.end_output_marker, 1) 

169 line = (self.end_format % b64_hash).join(endpieces) 

170 else: 

171 # Remove hash if present 

172 if hash_match: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true

173 line = line.replace(hash_match["hashsect"], "", 1) 

174 

175 return line