Coverage for claims\services.py: 80%

101 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-14 18:28 -0400

1import csv 

2import logging 

3from datetime import datetime 

4from decimal import Decimal, InvalidOperation 

5from pathlib import Path 

6from typing import Any, Dict, List, Tuple, Union 

7 

8from django.db import transaction, IntegrityError 

9 

10from .models import Claim, ClaimDetail 

11 

12logger = logging.getLogger(__name__) 

13 

14# A type alias for our summary dictionary 

15LoadSummary = Dict[str, int] 

16 

17 

18class ClaimDataIngestor: 

19 """ 

20 A service class to handle the ingestion of claim data from CSV files. 

21 This class encapsulates the logic for parsing, validating, and saving 

22 claim and claim detail data to the database. 

23 """ 

24 

25 def __init__( 

26 self, 

27 claims_csv_path: Union[Path, str], 

28 details_csv_path: Union[Path, str], 

29 delimiter: str = ",", 

30 mode: str = "append", 

31 ): 

32 """ 

33 Initializes the ingestor with paths to the data files. 

34 

35 Args: 

36 claims_csv_path: The file path for the main claims data. 

37 details_csv_path: The file path for the claim details data. 

38 delimiter: The character used to separate values in the CSV files. 

39 """ 

40 self.claims_csv_path = Path(claims_csv_path) 

41 self.details_csv_path = Path(details_csv_path) 

42 self.delimiter = delimiter 

43 self.mode = mode # 'append' (default) or 'overwrite' 

44 self.claims_created = 0 

45 self.claims_updated = 0 

46 self.claims_skipped = 0 

47 self.details_created = 0 

48 self.details_updated = 0 

49 self.details_skipped = 0 

50 self.errors: List[str] = [] 

51 

52 def _log_error(self, row_num: int, file_name: str, error_msg: str) -> None: 

53 """Helper method to format and store an error message.""" 

54 full_error_msg = f"Error in {file_name} at row {row_num}: {error_msg}" 

55 logger.error(full_error_msg) # Log as an ERROR 

56 self.errors.append(full_error_msg) # Keep for the command's summary 

57 

58 @transaction.atomic 

59 def run(self) -> Tuple[LoadSummary, List[str]]: 

60 """ 

61 Executes the full data loading process within a single database transaction. 

62 If any part of the process fails, the transaction is rolled back. 

63 

64 Returns: 

65 A tuple containing a summary dictionary of the load results and a 

66 list of any errors encountered. 

67 """ 

68 if self.mode == "overwrite": 

69 self._purge_existing_data() 

70 

71 self._load_claims() 

72 self._load_claim_details() 

73 

74 summary: LoadSummary = { 

75 "claims_created": self.claims_created, 

76 "claims_updated": self.claims_updated, 

77 "claims_skipped": self.claims_skipped, 

78 "details_created": self.details_created, 

79 "details_updated": self.details_updated, 

80 "details_skipped": self.details_skipped, 

81 } 

82 return summary, self.errors 

83 

84 def _purge_existing_data(self) -> None: 

85 """Deletes all existing claim-related data prior to overwrite reload.""" 

86 logger.info("Overwrite mode: purging existing Claim data (cascade deletes details/notes)...") 

87 # Deleting Claims will cascade to ClaimDetail and Note via FK/OneToOne settings 

88 Claim.objects.all().delete() 

89 

90 def _parse_claim_row(self, row: Dict[str, str]) -> Dict[str, Any]: 

91 """Parses and validates a single row from the claims CSV file.""" 

92 claim_id = int(row["id"]) 

93 billed_amount = Decimal(row["billed_amount"]) 

94 paid_amount = Decimal(row["paid_amount"]) 

95 discharge_date = datetime.strptime(row["discharge_date"], "%Y-%m-%d").date() 

96 

97 return { 

98 "id": claim_id, 

99 "patient_name": row["patient_name"], 

100 "billed_amount": billed_amount, 

101 "paid_amount": paid_amount, 

102 "status": row["status"].upper(), 

103 "insurer_name": row["insurer_name"], 

104 "discharge_date": discharge_date, 

105 } 

106 

107 def _write_claim(self, claim_data: Dict[str, Any]) -> None: 

108 """Create a Claim according to mode semantics.""" 

109 claim_id = claim_data.pop("id") 

110 if self.mode == "append": 

111 if Claim.objects.filter(id=claim_id).exists(): 

112 self.claims_skipped += 1 

113 return 

114 Claim.objects.create(id=claim_id, **claim_data) 

115 self.claims_created += 1 

116 return 

117 # overwrite mode: table was purged; create fresh rows only 

118 try: 

119 Claim.objects.create(id=claim_id, **claim_data) 

120 self.claims_created += 1 

121 except IntegrityError as e: 

122 # Duplicate IDs within the same CSV or DB state mismatch 

123 self._log_error(0, self.claims_csv_path.name, f"Integrity error for claim {claim_id}: {e}") 

124 

125 def _load_claims(self) -> None: 

126 """Loads the main claim records from the provided CSV file.""" 

127 logger.info(f"Processing the main claims file: {self.claims_csv_path.name}") 

128 try: 

129 with open(self.claims_csv_path, mode="r", encoding="utf-8") as f: 

130 reader = csv.DictReader(f, delimiter=self.delimiter) 

131 for i, row in enumerate(reader, start=2): 

132 try: 

133 claim_data = self._parse_claim_row(row) 

134 self._write_claim(claim_data) 

135 except (ValueError, InvalidOperation, KeyError) as e: 

136 self._log_error(i, self.claims_csv_path.name, str(e)) 

137 except FileNotFoundError: 

138 logger.critical(f"File not found: {self.claims_csv_path}") 

139 self.errors.append(f"File not found: {self.claims_csv_path}") 

140 raise 

141 

142 def _load_claim_details(self) -> None: 

143 """Loads the claim detail records from the provided CSV file.""" 

144 logger.info(f"Processing the details file: {self.details_csv_path.name}") 

145 try: 

146 with open(self.details_csv_path, mode="r", encoding="utf-8") as f: 

147 reader = csv.DictReader(f, delimiter=self.delimiter) 

148 for i, row in enumerate(reader, start=2): 

149 try: 

150 claim_id = int(row["claim_id"]) 

151 claim = Claim.objects.get(id=claim_id) 

152 

153 defaults = { 

154 "cpt_codes": row["cpt_codes"], 

155 "denial_reason": row.get("denial_reason", ""), 

156 } 

157 if self.mode == "append": 

158 if ClaimDetail.objects.filter(claim=claim).exists(): 

159 self.details_skipped += 1 

160 else: 

161 ClaimDetail.objects.create(claim=claim, **defaults) 

162 self.details_created += 1 

163 else: 

164 # overwrite mode: table was purged by deleting Claims 

165 try: 

166 ClaimDetail.objects.create(claim=claim, **defaults) 

167 self.details_created += 1 

168 except IntegrityError as e: 

169 self._log_error( 

170 i, 

171 self.details_csv_path.name, 

172 f"Integrity error for claim detail {claim.id}: {e}", 

173 ) 

174 except Claim.DoesNotExist: 

175 self._log_error( 

176 i, 

177 self.details_csv_path.name, 

178 f"Claim with id={claim_id} not found.", 

179 ) 

180 except (ValueError, KeyError) as e: 

181 self._log_error(i, self.details_csv_path.name, str(e)) 

182 except FileNotFoundError: 

183 logger.critical(f"File not found: {self.details_csv_path}") 

184 self.errors.append(f"File not found: {self.details_csv_path}") 

185 raise