Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import datetime 

2import traceback 

3import logging 

4import xmltodict 

5import io 

6import csv 

7 

8from django.utils import timezone 

9from django.contrib.admin.widgets import AdminDateWidget 

10from django.db import transaction 

11from django import forms 

12from django.utils.safestring import mark_safe 

13from django.contrib import messages 

14 

15from .models import ( 

16 Report, 

17 ProjectSOI, 

18 RegisterChildByAgeAndGender, 

19 PresenceAndParticipation, 

20 ChildFamilyParticipation, 

21 LanguagePeopleGroupDisability, 

22 SupportPariticipationDetail, 

23 

24 MostVulnerableChildrenIndicator, 

25 MostVulnerableChildrenVulnerabilityMarker, 

26 

27 BulkImportReport, 

28) 

29from .bulk_import import ( 

30 SOI, 

31 child_family_participation as child_family_participation_importer, 

32 presence_and_participation as presence_and_participation_importer, 

33 register_child_by_age_and_gender as register_child_by_age_and_gender_importer, 

34 language_people_group_disability as language_people_group_disability_importer, 

35 support_pariticipation_detail as support_pariticipation_detail_importer, 

36 

37 most_vulnerable_children_indicator as most_vulnerable_children_indicator_importer, 

38 most_vulnerable_children_vulnerability_marker as most_vulnerable_children_vulnerability_marker_importer, 

39) 

40from .utils import delete_file 

41 

42logger = logging.getLogger(__name__) 

43 

44BULK_IMPORTER = { 

45 ProjectSOI: SOI, 

46 RegisterChildByAgeAndGender: register_child_by_age_and_gender_importer, 

47 PresenceAndParticipation: presence_and_participation_importer, 

48 ChildFamilyParticipation: child_family_participation_importer, 

49 LanguagePeopleGroupDisability: language_people_group_disability_importer, 

50 SupportPariticipationDetail: support_pariticipation_detail_importer, 

51 

52 MostVulnerableChildrenIndicator: most_vulnerable_children_indicator_importer, 

53 MostVulnerableChildrenVulnerabilityMarker: most_vulnerable_children_vulnerability_marker_importer, 

54} 

55 

56# where user needs to provide the data information (else it's optional but will be used if provided) 

57DATE_REQUIRED_MODELS = [ 

58 PresenceAndParticipation, 

59 ChildFamilyParticipation, 

60 LanguagePeopleGroupDisability, 

61] 

62 

63# Where this is no necessicty for date field from user (date isn't used) 

64DATE_NOT_REQUIRED_MODELS = [ 

65 SupportPariticipationDetail, 

66 MostVulnerableChildrenIndicator, 

67 MostVulnerableChildrenVulnerabilityMarker, 

68] 

69 

70# where the import files should be CSV (instead of xml) 

71CSV_IMPORT_MODELS = [ 

72 LanguagePeopleGroupDisability, 

73 SupportPariticipationDetail, 

74 MostVulnerableChildrenIndicator, 

75 MostVulnerableChildrenVulnerabilityMarker, 

76] 

77 

78 

79class ReportAdminForm(forms.ModelForm): 

80 date = forms.DateField( 

81 widget=AdminDateWidget(), 

82 required=False, 

83 help_text='Overrides generated date for the document.', 

84 ) 

85 

86 class Meta: 

87 model = Report 

88 fields = '__all__' 

89 

90 def __init__(self, *args, **kwargs): 

91 super().__init__(*args, **kwargs) 

92 self.original_report_file = self.instance.file 

93 if self.fields.get('file'): 93 ↛ exitline 93 didn't return from function '__init__', because the condition on line 93 was never false

94 self.fields['file'].widget.attrs = {'accept': '.xml'} 

95 

96 def clean(self): 

97 cleaned_data = super().clean() 

98 file = cleaned_data.get('file') 

99 if file and self.instance.file != file: 99 ↛ exitline 99 didn't return from function 'clean', because the condition on line 99 was never false

100 cleaned_data['data'] = Report.extract_from_file(file) 

101 try: 

102 cleaned_data['date'] = ( 

103 cleaned_data.get('date') or 

104 datetime.datetime.strptime(cleaned_data['data'].get('reportDate'), '%m/%d/%Y %H:%M:%S %p').date() 

105 ) 

106 except (TypeError, ValueError): 

107 # Default value 

108 cleaned_data['date'] = timezone.now().date() 

109 

110 def save(self, *args, **kwargs): 

111 if self.cleaned_data.get('data'): 111 ↛ 115line 111 didn't jump to line 115, because the condition on line 111 was never false

112 self.instance.data = self.cleaned_data.get('data') 

113 if self.original_report_file: 113 ↛ 114line 113 didn't jump to line 114, because the condition on line 113 was never true

114 delete_file(self.original_report_file.path) 

115 report = super().save(*args, **kwargs) 

116 return report 

117 

118 

119class BulkImportForm(forms.Form): 

120 file = forms.FileField( 

121 widget=forms.FileInput( 

122 attrs={'accept': 'text/xml'} 

123 ) 

124 ) 

125 generated_on = forms.DateField( 

126 widget=AdminDateWidget(), 

127 required=False, 

128 help_text='Overrides generated date for the document.', 

129 ) 

130 

131 def __init__(self, *args, **kwargs): 

132 bulk_model = kwargs.pop('bulk_model', 0) 

133 

134 super().__init__(*args, **kwargs) 

135 if bulk_model in DATE_NOT_REQUIRED_MODELS: 

136 self.fields['generated_on'].widget = forms.HiddenInput() 

137 elif bulk_model in DATE_REQUIRED_MODELS: 

138 self.fields['generated_on'].required = True 

139 self.fields['generated_on'].help_text = 'Required' 

140 if bulk_model in CSV_IMPORT_MODELS: 

141 self.fields['file'].widget.attrs['accept'] = 'text/csv' 

142 

143 @staticmethod 

144 def handle_uploaded_file(request, generated_on, model): 

145 file = request.FILES['file'] 

146 bulk_import_log = BulkImportReport.objects.create( 

147 report_type=model.__name__, 

148 # Provided params for import 

149 generated_on=generated_on if generated_on else None, 

150 file=request.FILES['file'], 

151 created_by=request.user, 

152 ) 

153 try: 

154 file.seek(0) 

155 if model in CSV_IMPORT_MODELS: 

156 raw_data = csv.DictReader( 

157 io.StringIO(file.read().decode('utf-8', errors='ignore')), 

158 skipinitialspace=True, 

159 ) 

160 else: 

161 raw_data = xmltodict.parse(file.open().read()) 

162 with transaction.atomic(): 

163 BULK_IMPORTER.get(model).extract(raw_data, generated_on) 

164 success_message = ( 

165 f"Successfully imported <b>{file}</b> " 

166 f"to <b>{model._meta.verbose_name.upper()}</b>" 

167 ) 

168 messages.add_message(request, messages.INFO, mark_safe(success_message)) 

169 bulk_import_log.log_message = success_message 

170 bulk_import_log.status = BulkImportReport.SUCCESS 

171 except Exception: 

172 logger.error(f'Error importing {model}', exc_info=True) 

173 error_message = ( 

174 f"Importing <b>{file}</b> failed for <b>{model._meta.verbose_name.upper()}</b>" 

175 " !! Check file structure and try again!!" 

176 f"<br /><pre>{traceback.format_exc()}</pre>" 

177 ) 

178 messages.add_message(request, messages.ERROR, mark_safe(error_message)) 

179 bulk_import_log.log_message = error_message 

180 bulk_import_log.status = BulkImportReport.FAILED 

181 bulk_import_log.save()