Skip to content
This repository was archived by the owner on May 14, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions nexpose/json_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,17 @@ def get_id(data, id_field_name):
return data # assume the data is the id


def load_urls(json_dict, url_loader):
def load_urls(json_dict, url_loader, ignore_error=False):
from urllib.error import HTTPError
assert isinstance(json_dict, dict)
for key in list(json_dict.keys()):
if isinstance(json_dict[key], dict):
if json_dict[key].get('json', None) is not None:
raise ValueError('json_dict[' + key + '] already contains a json-element')
url = json_dict[key].get('url', None)
if url is not None:
json_dict[key]['json'] = url_loader(url)
try:
json_dict[key]['json'] = url_loader(url)
except HTTPError:
if not ignore_error:
raise
58 changes: 46 additions & 12 deletions nexpose/nexpose.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import urllib.request
import urllib.parse
import urllib.error
import csv
import io
import base64
import json
from .json_utils import load_urls
Expand Down Expand Up @@ -620,15 +622,20 @@ def RequestReportDelete(self, report_id, reportconfiguration_id=0):
else:
return self.ExecuteBasicOnReport("ReportDeleteRequest", report_id)

def RequestReportAdhocGenerate(self, id):
def RequestReportAdhocGenerate(self, id, format='raw-xml-v2', template_id='audit-report'):
request = """
<AdhocReportConfig format="raw-xml-v2" template-id="audit-report">
<AdhocReportConfig format="{format}" template-id="{template_id}">
<Filters>
<filter type="scan" id="{0}" />
<filter type="scan" id="{scan_id}" />

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does scan_id come from?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It comes from id as part of the method signature. It's always been there, I just renamed it for clarity.

</Filters>
</AdhocReportConfig>
"""
return self.ExecuteBasicWithElement("ReportAdhocGenerateRequest", {}, as_xml(request.format(id)))
request_data = request.format(
format=format,
template_id=template_id,
scan_id=id,
)
return self.ExecuteBasicWithElement("ReportAdhocGenerateRequest", {}, as_xml(request_data))
raise NotImplementedError() # TODO

#
Expand Down Expand Up @@ -1403,7 +1410,7 @@ def GetSiteAssetSummaries(self, site_or_id):
object_creator = lambda xml_data: AssetSummary.CreateFromXML(xml_data, site_id=xml_data.getparent().attrib['site-id'])
return request_and_create_objects_from_xml(requestor, 'SiteDevices/device', object_creator)

def GetAssetDetails(self, asset_or_id):
def GetAssetDetails(self, asset_or_id, ignore_details_error=False):
"""
Get detailed information of an asset.
Requires the 2.1 API!
Expand All @@ -1412,7 +1419,11 @@ def GetAssetDetails(self, asset_or_id):
asset_or_id = asset_or_id.id
sub_url = APIURL_ASSETS.format(asset_or_id)
json_dict = self.ExecutePagedGet_v21(sub_url)
load_urls(json_dict, self.ExecutePagedGet_v21)
if 'tags' not in json_dict:
json_dict['tags'] = {
'url': json_dict['url'] + '/tags'
}
load_urls(json_dict, self.ExecutePagedGet_v21, ignore_error=ignore_details_error)
return AssetDetails.CreateFromJSON(json_dict)

def DeleteAsset(self, asset_or_id):
Expand Down Expand Up @@ -2032,22 +2043,45 @@ def DownloadReport(self, report_or_id, callback_function=None, block_size=DEFAUL
reader = self.GetReportStreamReader(report_or_id)
return DownloadFromStreamReader(reader, callback_function, block_size)

def GenerateScanReport(self, scan_or_id):
def GenerateScanReport(self, scan_or_id, format='raw-xml-v2', template_id='audit-report'):
"""
Generate a report of a scan.
"""
if isinstance(scan_or_id, ScanSummary):
scan_or_id = scan_or_id.id
data = self.RequestReportAdhocGenerate(scan_or_id)
data = self.RequestReportAdhocGenerate(scan_or_id, format, template_id)
data = self.VerifySuccess(data)
data = data.tail.replace('\r', '').strip().split('\n')
assert data[1] == 'Content-Type: text/xml; name=report.xml'
assert data[2] == 'Content-Transfer-Encoding: base64'
assert data[3] == ''
assert data[0] == data[-1][:-2]
boundary_top = data[0]
content_type = data[1]
encoding = data[2]
body = ''.join(data[4:-1])
boundary_bottom = data[-1]
if boundary_top != boundary_bottom[:-2]:
raise ValueError("Invalid boundary")

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced the assertions as per Python recommendation: asserts could be compiled out so throwing exceptions is the preferred method here.

if encoding != 'Content-Transfer-Encoding: base64':
raise ValueError("Unexpected encoding")
if format == 'raw-xml-v2':
return self._ParseScanReportXML(body, content_type)
elif format == 'csv':
return self._ParseScanReportCSV(body, content_type)
else:
return data

@staticmethod
def _ParseScanReportXML(body, content_type):
if content_type != 'Content-Type: text/xml; name=report.xml':
raise ValueError("Invalid content type")
return as_xml(base64.urlsafe_b64decode(body))

@staticmethod
def _ParseScanReportCSV(body, content_type):
if content_type != 'Content-Type: text/csv; name=report.csv':
raise ValueError("Invalid content type")
csv_ = base64.urlsafe_b64decode(body).decode('utf8')
report_data = csv.DictReader(io.StringIO(csv_))
return report_data

#
# The following functions implement the Role Management API:
# =========================================================
Expand Down
59 changes: 55 additions & 4 deletions nexpose/nexpose_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from future import standard_library
standard_library.install_aliases()

from .nexpose_tag import Tag


class AssetHostTypes(object):
Empty = ''
Expand All @@ -22,7 +24,10 @@ def InitializeFromXML(self, xml_data):

def InitializeFromJSON(self, json_dict):
self.id = json_dict['id']
self.risk_score = json_dict['assessment']['json']['risk_score']
try:
self.risk_score = json_dict['assessment']['json']['risk_score']
except KeyError:
pass

def __init__(self):
self.id = 0
Expand Down Expand Up @@ -66,13 +71,38 @@ def CreateFromJSON(json_dict):
details.host_type = host_type
details.os_name = json_dict["os_name"]
details.os_cpe = json_dict["os_cpe"]
details.last_scan_id = json_dict['assessment']['json']['last_scan_id']
details.last_scan_date = json_dict['assessment']['json']['last_scan_date']
try:
assessment = json_dict['assessment']['json']
except KeyError:
pass
else:
details.last_scan_id = assessment['last_scan_id']
details.last_scan_date = assessment['last_scan_date']

try:
tags = json_dict['tags']['json']['resources']
except KeyError:
pass
else:
for tag in tags:
details.tags.append(Tag.CreateFromJSON(tag))

details.unique_identifiers = []
try:
unique_identifiers_data = json_dict['unique_identifiers']['json']
except KeyError:
# Unique Identifiers not fetched
pass
else:
for identifier in unique_identifiers_data:
details.unique_identifiers.append(
UniqueIdentifier.CreateFromJSON(identifier)
)

# TODO:
# ----begin
details.files = []
details.vulnerability_instances = []
details.unique_identifiers = []
details.group_accounts = []
details.user_accounts = []
details.vulnerabilities = []
Expand Down Expand Up @@ -101,3 +131,24 @@ def __init__(self):
self.vulnerabilities = []
self.software = []
self.services = []
self.tags = []


class UniqueIdentifier(object):

def __init__(self):
self.source = ''
self.id = ''

@staticmethod
def CreateFromJSON(json_dict):
unique_identifier = UniqueIdentifier()
unique_identifier.source = json_dict['source']
unique_identifier.id = json_dict['id']
return unique_identifier

def __repr__(self):
return '<UniqueIdentifier {type}: {id}>'.format(
type=self.source,
id=self.id,
)