Coverage for claims\services.py: 80%
101 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-14 18:28 -0400
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-14 18:28 -0400
1import csv
2import logging
3from datetime import datetime
4from decimal import Decimal, InvalidOperation
5from pathlib import Path
6from typing import Any, Dict, List, Tuple, Union
8from django.db import transaction, IntegrityError
10from .models import Claim, ClaimDetail
12logger = logging.getLogger(__name__)
14# A type alias for our summary dictionary
15LoadSummary = Dict[str, int]
18class ClaimDataIngestor:
19 """
20 A service class to handle the ingestion of claim data from CSV files.
21 This class encapsulates the logic for parsing, validating, and saving
22 claim and claim detail data to the database.
23 """
25 def __init__(
26 self,
27 claims_csv_path: Union[Path, str],
28 details_csv_path: Union[Path, str],
29 delimiter: str = ",",
30 mode: str = "append",
31 ):
32 """
33 Initializes the ingestor with paths to the data files.
35 Args:
36 claims_csv_path: The file path for the main claims data.
37 details_csv_path: The file path for the claim details data.
38 delimiter: The character used to separate values in the CSV files.
39 """
40 self.claims_csv_path = Path(claims_csv_path)
41 self.details_csv_path = Path(details_csv_path)
42 self.delimiter = delimiter
43 self.mode = mode # 'append' (default) or 'overwrite'
44 self.claims_created = 0
45 self.claims_updated = 0
46 self.claims_skipped = 0
47 self.details_created = 0
48 self.details_updated = 0
49 self.details_skipped = 0
50 self.errors: List[str] = []
52 def _log_error(self, row_num: int, file_name: str, error_msg: str) -> None:
53 """Helper method to format and store an error message."""
54 full_error_msg = f"Error in {file_name} at row {row_num}: {error_msg}"
55 logger.error(full_error_msg) # Log as an ERROR
56 self.errors.append(full_error_msg) # Keep for the command's summary
58 @transaction.atomic
59 def run(self) -> Tuple[LoadSummary, List[str]]:
60 """
61 Executes the full data loading process within a single database transaction.
62 If any part of the process fails, the transaction is rolled back.
64 Returns:
65 A tuple containing a summary dictionary of the load results and a
66 list of any errors encountered.
67 """
68 if self.mode == "overwrite":
69 self._purge_existing_data()
71 self._load_claims()
72 self._load_claim_details()
74 summary: LoadSummary = {
75 "claims_created": self.claims_created,
76 "claims_updated": self.claims_updated,
77 "claims_skipped": self.claims_skipped,
78 "details_created": self.details_created,
79 "details_updated": self.details_updated,
80 "details_skipped": self.details_skipped,
81 }
82 return summary, self.errors
84 def _purge_existing_data(self) -> None:
85 """Deletes all existing claim-related data prior to overwrite reload."""
86 logger.info("Overwrite mode: purging existing Claim data (cascade deletes details/notes)...")
87 # Deleting Claims will cascade to ClaimDetail and Note via FK/OneToOne settings
88 Claim.objects.all().delete()
90 def _parse_claim_row(self, row: Dict[str, str]) -> Dict[str, Any]:
91 """Parses and validates a single row from the claims CSV file."""
92 claim_id = int(row["id"])
93 billed_amount = Decimal(row["billed_amount"])
94 paid_amount = Decimal(row["paid_amount"])
95 discharge_date = datetime.strptime(row["discharge_date"], "%Y-%m-%d").date()
97 return {
98 "id": claim_id,
99 "patient_name": row["patient_name"],
100 "billed_amount": billed_amount,
101 "paid_amount": paid_amount,
102 "status": row["status"].upper(),
103 "insurer_name": row["insurer_name"],
104 "discharge_date": discharge_date,
105 }
107 def _write_claim(self, claim_data: Dict[str, Any]) -> None:
108 """Create a Claim according to mode semantics."""
109 claim_id = claim_data.pop("id")
110 if self.mode == "append":
111 if Claim.objects.filter(id=claim_id).exists():
112 self.claims_skipped += 1
113 return
114 Claim.objects.create(id=claim_id, **claim_data)
115 self.claims_created += 1
116 return
117 # overwrite mode: table was purged; create fresh rows only
118 try:
119 Claim.objects.create(id=claim_id, **claim_data)
120 self.claims_created += 1
121 except IntegrityError as e:
122 # Duplicate IDs within the same CSV or DB state mismatch
123 self._log_error(0, self.claims_csv_path.name, f"Integrity error for claim {claim_id}: {e}")
125 def _load_claims(self) -> None:
126 """Loads the main claim records from the provided CSV file."""
127 logger.info(f"Processing the main claims file: {self.claims_csv_path.name}")
128 try:
129 with open(self.claims_csv_path, mode="r", encoding="utf-8") as f:
130 reader = csv.DictReader(f, delimiter=self.delimiter)
131 for i, row in enumerate(reader, start=2):
132 try:
133 claim_data = self._parse_claim_row(row)
134 self._write_claim(claim_data)
135 except (ValueError, InvalidOperation, KeyError) as e:
136 self._log_error(i, self.claims_csv_path.name, str(e))
137 except FileNotFoundError:
138 logger.critical(f"File not found: {self.claims_csv_path}")
139 self.errors.append(f"File not found: {self.claims_csv_path}")
140 raise
142 def _load_claim_details(self) -> None:
143 """Loads the claim detail records from the provided CSV file."""
144 logger.info(f"Processing the details file: {self.details_csv_path.name}")
145 try:
146 with open(self.details_csv_path, mode="r", encoding="utf-8") as f:
147 reader = csv.DictReader(f, delimiter=self.delimiter)
148 for i, row in enumerate(reader, start=2):
149 try:
150 claim_id = int(row["claim_id"])
151 claim = Claim.objects.get(id=claim_id)
153 defaults = {
154 "cpt_codes": row["cpt_codes"],
155 "denial_reason": row.get("denial_reason", ""),
156 }
157 if self.mode == "append":
158 if ClaimDetail.objects.filter(claim=claim).exists():
159 self.details_skipped += 1
160 else:
161 ClaimDetail.objects.create(claim=claim, **defaults)
162 self.details_created += 1
163 else:
164 # overwrite mode: table was purged by deleting Claims
165 try:
166 ClaimDetail.objects.create(claim=claim, **defaults)
167 self.details_created += 1
168 except IntegrityError as e:
169 self._log_error(
170 i,
171 self.details_csv_path.name,
172 f"Integrity error for claim detail {claim.id}: {e}",
173 )
174 except Claim.DoesNotExist:
175 self._log_error(
176 i,
177 self.details_csv_path.name,
178 f"Claim with id={claim_id} not found.",
179 )
180 except (ValueError, KeyError) as e:
181 self._log_error(i, self.details_csv_path.name, str(e))
182 except FileNotFoundError:
183 logger.critical(f"File not found: {self.details_csv_path}")
184 self.errors.append(f"File not found: {self.details_csv_path}")
185 raise