diff --git a/nexpose/json_utils.py b/nexpose/json_utils.py index 2a4ed86..a9918b6 100644 --- a/nexpose/json_utils.py +++ b/nexpose/json_utils.py @@ -28,7 +28,8 @@ def get_id(data, id_field_name): return data # assume the data is the id -def load_urls(json_dict, url_loader): +def load_urls(json_dict, url_loader, ignore_error=False): + from urllib.error import HTTPError assert isinstance(json_dict, dict) for key in list(json_dict.keys()): if isinstance(json_dict[key], dict): @@ -36,4 +37,8 @@ def load_urls(json_dict, url_loader): raise ValueError('json_dict[' + key + '] already contains a json-element') url = json_dict[key].get('url', None) if url is not None: - json_dict[key]['json'] = url_loader(url) + try: + json_dict[key]['json'] = url_loader(url) + except HTTPError: + if not ignore_error: + raise diff --git a/nexpose/nexpose.py b/nexpose/nexpose.py index 5657f01..c9e70c5 100644 --- a/nexpose/nexpose.py +++ b/nexpose/nexpose.py @@ -6,6 +6,8 @@ import urllib.request import urllib.parse import urllib.error +import csv +import io import base64 import json from .json_utils import load_urls @@ -620,15 +622,20 @@ def RequestReportDelete(self, report_id, reportconfiguration_id=0): else: return self.ExecuteBasicOnReport("ReportDeleteRequest", report_id) - def RequestReportAdhocGenerate(self, id): + def RequestReportAdhocGenerate(self, id, format='raw-xml-v2', template_id='audit-report'): request = """ - + - + """ - return self.ExecuteBasicWithElement("ReportAdhocGenerateRequest", {}, as_xml(request.format(id))) + request_data = request.format( + format=format, + template_id=template_id, + scan_id=id, + ) + return self.ExecuteBasicWithElement("ReportAdhocGenerateRequest", {}, as_xml(request_data)) raise NotImplementedError() # TODO # @@ -1403,7 +1410,7 @@ def GetSiteAssetSummaries(self, site_or_id): object_creator = lambda xml_data: AssetSummary.CreateFromXML(xml_data, site_id=xml_data.getparent().attrib['site-id']) return request_and_create_objects_from_xml(requestor, 'SiteDevices/device', object_creator) - def GetAssetDetails(self, asset_or_id): + def GetAssetDetails(self, asset_or_id, ignore_details_error=False): """ Get detailed information of an asset. Requires the 2.1 API! @@ -1412,7 +1419,11 @@ def GetAssetDetails(self, asset_or_id): asset_or_id = asset_or_id.id sub_url = APIURL_ASSETS.format(asset_or_id) json_dict = self.ExecutePagedGet_v21(sub_url) - load_urls(json_dict, self.ExecutePagedGet_v21) + if 'tags' not in json_dict: + json_dict['tags'] = { + 'url': json_dict['url'] + '/tags' + } + load_urls(json_dict, self.ExecutePagedGet_v21, ignore_error=ignore_details_error) return AssetDetails.CreateFromJSON(json_dict) def DeleteAsset(self, asset_or_id): @@ -2032,22 +2043,45 @@ def DownloadReport(self, report_or_id, callback_function=None, block_size=DEFAUL reader = self.GetReportStreamReader(report_or_id) return DownloadFromStreamReader(reader, callback_function, block_size) - def GenerateScanReport(self, scan_or_id): + def GenerateScanReport(self, scan_or_id, format='raw-xml-v2', template_id='audit-report'): """ Generate a report of a scan. """ if isinstance(scan_or_id, ScanSummary): scan_or_id = scan_or_id.id - data = self.RequestReportAdhocGenerate(scan_or_id) + data = self.RequestReportAdhocGenerate(scan_or_id, format, template_id) data = self.VerifySuccess(data) data = data.tail.replace('\r', '').strip().split('\n') - assert data[1] == 'Content-Type: text/xml; name=report.xml' - assert data[2] == 'Content-Transfer-Encoding: base64' - assert data[3] == '' - assert data[0] == data[-1][:-2] + boundary_top = data[0] + content_type = data[1] + encoding = data[2] body = ''.join(data[4:-1]) + boundary_bottom = data[-1] + if boundary_top != boundary_bottom[:-2]: + raise ValueError("Invalid boundary") + if encoding != 'Content-Transfer-Encoding: base64': + raise ValueError("Unexpected encoding") + if format == 'raw-xml-v2': + return self._ParseScanReportXML(body, content_type) + elif format == 'csv': + return self._ParseScanReportCSV(body, content_type) + else: + return data + + @staticmethod + def _ParseScanReportXML(body, content_type): + if content_type != 'Content-Type: text/xml; name=report.xml': + raise ValueError("Invalid content type") return as_xml(base64.urlsafe_b64decode(body)) + @staticmethod + def _ParseScanReportCSV(body, content_type): + if content_type != 'Content-Type: text/csv; name=report.csv': + raise ValueError("Invalid content type") + csv_ = base64.urlsafe_b64decode(body).decode('utf8') + report_data = csv.DictReader(io.StringIO(csv_)) + return report_data + # # The following functions implement the Role Management API: # ========================================================= diff --git a/nexpose/nexpose_asset.py b/nexpose/nexpose_asset.py index 1fca4b6..70b8b13 100644 --- a/nexpose/nexpose_asset.py +++ b/nexpose/nexpose_asset.py @@ -6,6 +6,8 @@ from future import standard_library standard_library.install_aliases() +from .nexpose_tag import Tag + class AssetHostTypes(object): Empty = '' @@ -22,7 +24,10 @@ def InitializeFromXML(self, xml_data): def InitializeFromJSON(self, json_dict): self.id = json_dict['id'] - self.risk_score = json_dict['assessment']['json']['risk_score'] + try: + self.risk_score = json_dict['assessment']['json']['risk_score'] + except KeyError: + pass def __init__(self): self.id = 0 @@ -66,13 +71,38 @@ def CreateFromJSON(json_dict): details.host_type = host_type details.os_name = json_dict["os_name"] details.os_cpe = json_dict["os_cpe"] - details.last_scan_id = json_dict['assessment']['json']['last_scan_id'] - details.last_scan_date = json_dict['assessment']['json']['last_scan_date'] + try: + assessment = json_dict['assessment']['json'] + except KeyError: + pass + else: + details.last_scan_id = assessment['last_scan_id'] + details.last_scan_date = assessment['last_scan_date'] + + try: + tags = json_dict['tags']['json']['resources'] + except KeyError: + pass + else: + for tag in tags: + details.tags.append(Tag.CreateFromJSON(tag)) + + details.unique_identifiers = [] + try: + unique_identifiers_data = json_dict['unique_identifiers']['json'] + except KeyError: + # Unique Identifiers not fetched + pass + else: + for identifier in unique_identifiers_data: + details.unique_identifiers.append( + UniqueIdentifier.CreateFromJSON(identifier) + ) + # TODO: # ----begin details.files = [] details.vulnerability_instances = [] - details.unique_identifiers = [] details.group_accounts = [] details.user_accounts = [] details.vulnerabilities = [] @@ -101,3 +131,24 @@ def __init__(self): self.vulnerabilities = [] self.software = [] self.services = [] + self.tags = [] + + +class UniqueIdentifier(object): + + def __init__(self): + self.source = '' + self.id = '' + + @staticmethod + def CreateFromJSON(json_dict): + unique_identifier = UniqueIdentifier() + unique_identifier.source = json_dict['source'] + unique_identifier.id = json_dict['id'] + return unique_identifier + + def __repr__(self): + return ''.format( + type=self.source, + id=self.id, + )