Skip to content
This repository was archived by the owner on May 14, 2021. It is now read-only.
156 changes: 155 additions & 1 deletion nexpose/nexpose.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from .nexpose_credential import Credential_FTP, Credential_HTTP, Credential_SNMP, Credential_SNMPV3, Credential_SSH, Credential_SSH_KEY, Credential_Telnet
from .nexpose_discoveryconnection import DiscoveryConnectionProtocol, DiscoveryConnectionSummary, DiscoveryConnectionConfiguration
from .nexpose_engine import EngineStatus, EnginePriority, EngineBase, EngineSummary, EngineConfiguration
from .nexpose_eso import Configuration as EsoConfiguration
from .nexpose_integration_options import IntegrationOption
from .nexpose_node import NodeScanStatus, NodeBase, Node
from .nexpose_privileges import AssetGroupPrivileges, GlobalPrivileges, SitePrivileges
from .nexpose_report import ReportStatus, ReportTemplate, ReportConfigurationSummary, ReportConfiguration, ReportSummary
Expand Down Expand Up @@ -80,6 +82,7 @@ def CreateHeadersWithSessionCookie(session_id):
def CreateHeadersWithSessionCookieAndCustomHeader(session_id):
headers = CreateHeadersWithSessionCookie(session_id)
headers["nexposeCCSessionID"] = "{0}".format(session_id)
headers['X-Requested-With'] = 'XMLHttpRequest'
return headers


Expand All @@ -88,7 +91,7 @@ def ExecuteGet_JSON(session_id, uri, sub_url, timeout, options=None):
if options is None:
options = {}
options = ["{0}={1}".format(a[0], a[1]) for a in iter(options.items())]
headers = CreateHeadersWithSessionCookie(session_id)
headers = CreateHeadersWithSessionCookieAndCustomHeader(session_id)
# headers["Accept-Encoding"] = "utf-8"
if sub_url.startswith('http'): # TODO: refactor uri & sub_url so that json_utils.resolve_urls can work better
uri = sub_url
Expand Down Expand Up @@ -144,6 +147,12 @@ def ExecuteDelete_JSON(session_id, uri, sub_url, timeout):
return False


def ExecuteDelete_JSON_ESO(session_id, uri, sub_url, timeout):
headers = CreateHeadersWithSessionCookieAndCustomHeader(session_id)
uri = uri + sub_url
return ExecuteWebRequest(uri, None, headers, timeout, lambda: 'DELETE')


def ExecutePagedGet_JSON(session_id, uri, sub_url, timeout, per_page=2147483647):
options = {}
options["per_page"] = per_page # NOTA: API 2.0 defaults this to 500 if not set
Expand Down Expand Up @@ -235,6 +244,9 @@ def DownloadFromStreamReader(reader, callback_function=None, block_size=DEFAULT_
APIURL_ASSETS = "assets/{0}/"
APIURL_ASSETGROUPS = "asset_groups/{0}/"

ESOURL_CONFIGMANAGER = "/configuration-manager/api/service/"
ESOURL_INTEG_OPTIONS = "/integration-manager-service/api/integration-options/"


class NexposeException(Exception):
def __init__(self, message):
Expand Down Expand Up @@ -273,6 +285,7 @@ def __init__(self, host, port):
self._URI_root = BuildURI_root(host, port)
self._URI_APIv2d0 = BuildURI_APIv2d0(host, port)
self._URI_APIv2d1 = BuildURI_APIv2d1(host, port)
self._URI_APIESO = BuildRootURI(host, port) + 'eso'
self.timeout = 60

#
Expand Down Expand Up @@ -1203,10 +1216,20 @@ def ExecutePagedGet_v20(self, sub_url):
def ExecutePagedGet_v21(self, sub_url):
return self.ExecutePagedGet_vXX(sub_url, self._URI_APIv2d1)

def ExecutePagedGet_ESO(self, sub_url):
self._RequireAnOpenSession()
result = ExecuteGet_JSON(self._session_id, self._URI_APIESO, sub_url, self.timeout)
return json.loads(result)

def ExecutePost(self, sub_url, post_data):
self._RequireAnOpenSession()
return ExecutePost_JSON(self._session_id, self._URI_APIv2d0, sub_url, self.timeout, post_data)

def ExecutePost_ESO(self, sub_url, post_data):
self._RequireAnOpenSession()
response = ExecutePost_JSON(self._session_id, self._URI_APIESO, sub_url, self.timeout, post_data)
return json.loads(response)

def ExecutePut(self, sub_url, post_data):
self._RequireAnOpenSession()
return ExecutePut_JSON(self._session_id, self._URI_APIv2d0, sub_url, self.timeout, post_data)
Expand All @@ -1215,6 +1238,10 @@ def ExecuteDelete(self, sub_url):
self._RequireAnOpenSession()
return ExecuteDelete_JSON(self._session_id, self._URI_APIv2d0, sub_url, self.timeout)

def ExecuteDelete_ESO(self, sub_url):
self._RequireAnOpenSession()
return ExecuteDelete_JSON_ESO(self._session_id, self._URI_APIESO, sub_url, self.timeout)

def ExecuteFormPost(self, sub_url, post_data):
self._RequireAnOpenSession()
return ExecuteWithPostData_FORM(self._session_id, self._URI_root, sub_url, self.timeout, post_data)
Expand Down Expand Up @@ -2624,3 +2651,130 @@ def DeleteBackup(self, backup_or_filename):
backup_or_filename = backup_or_filename.name
parameters = {'backupid': backup_or_filename}
return self.ExecuteMaintenanceCommand('backupRestore', 'deleteBackup', parameters)

#
# The following functions implement the ESO Configuration API
# They are used for new-style Discovery connections
# =========================================================================

def GetEsoServices(self):
sub_url = ESOURL_CONFIGMANAGER
json_dict = self.ExecutePagedGet_ESO(sub_url)
return json_dict

def GetEsoConfigurationType(self, service_name):
sub_url = ESOURL_CONFIGMANAGER + 'configurationType/{}'.format(service_name)
json_dict = self.ExecutePagedGet_ESO(sub_url)
return json_dict

def GetEsoServiceConfigurations(self, service_name):
sub_url = ESOURL_CONFIGMANAGER + 'configuration/{}/'.format(service_name)
json_list = self.ExecutePagedGet_ESO(sub_url)
services = [EsoConfiguration.CreateFromJSON(i) for i in json_list]
return services

def GetEsoServiceConfigurationByName(self, service_name, config_name):
for config in self.GetEsoServiceConfigurations(service_name):
if config.name == config_name:
return config
else:
raise KeyError("Config {} not found".format(config_name))

def GetEsoServiceConfiguration(self, config_or_id):
if isinstance(config_or_id, EsoConfiguration):
config_or_id = config_or_id.id
sub_url = ESOURL_CONFIGMANAGER + 'configuration/id/{}'.format(config_or_id)
json_dict = self.ExecutePagedGet_ESO(sub_url)
return EsoConfiguration.CreateFromJSON(json_dict)

def SaveEsoServiceConfiguration(self, config, save_integration_options=False):
self._RequireInstanceOf(config, EsoConfiguration)
sub_url = ESOURL_CONFIGMANAGER + 'configuration'
payload = config.as_json()
response = self.ExecutePost_ESO(sub_url, payload)
config_id = int(response['data'])
if config_id < 1:
raise RuntimeError("API returned invalid configID ({}) while attempting to create configuration.".format(config_id))
config.id = config_id
if save_integration_options:
for opt in config.integration_options:
self.SaveIntegrationOption(opt)
self.StartIntegrationOption(opt)
return config_id

def DeleteEsoServiceConfiguration(self, config_or_id, delete_integration_options=False):
if isinstance(config_or_id, EsoConfiguration):
if delete_integration_options:
for opt in config_or_id.integration_options:
self.DeleteIntegrationOption(opt)
config_or_id = config_or_id.id
elif delete_integration_options:
raise TypeError("Need config option if deleting integration options")
sub_url = ESOURL_CONFIGMANAGER + 'configuration/{}'.format(config_or_id)
result = self.ExecuteDelete_ESO(sub_url)
if result != 'success':
raise RuntimeError("Failed to delete configuration with ID: {}".format(config_or_id))

def PreviewEsoServiceConfiguration(self, config):
self._RequireInstanceOf(config, EsoConfiguration)
sub_url = ESOURL_CONFIGMANAGER + 'configuration/preview'
payload = config.as_json()
response_body = self.ExecutePost_ESO(sub_url, payload)
return response_body['previewAssets']

def TestEsoServiceConfiguration(self, config):
self._RequireInstanceOf(config, EsoConfiguration)
sub_url = ESOURL_CONFIGMANAGER + 'configuration/test'
payload = config.as_json()
response_body = self.ExecutePost_ESO(sub_url, payload)
return response_body['data']

#
# Implement the integrations manager API
# =========================================================================

def GetIntegrationOptions(self):
sub_url = ESOURL_INTEG_OPTIONS + 'options-and-states'
json_list = self.ExecutePagedGet_ESO(sub_url)
integration_options = [
IntegrationOption.CreateFromJSON(i['integrationOption'])
for i in json_list]
return integration_options

def SaveIntegrationOption(self, option):
self._RequireInstanceOf(option, IntegrationOption)
sub_url = ESOURL_INTEG_OPTIONS
payload = option.as_json()
response = self.ExecutePost_ESO(sub_url, payload)
option.update(response['data'])
return option.id

def DeleteIntegrationOption(self, option_or_id):
if isinstance(option_or_id, IntegrationOption):
option_or_id = option_or_id.id
sub_url = ESOURL_INTEG_OPTIONS + '{}/state'.format(option_or_id)
result = json.loads(self.ExecuteDelete_ESO(sub_url))
msg = result['data']
if 'being stopped' not in msg:
raise RuntimeError("Failed to delete step with ID: {}: {}".format(option_or_id, msg))

def GetIntegrationOption(self, option_id):
for option in self.GetIntegrationOptions():
if option.id == option_id:
return option
else:
raise KeyError("Option {} not found".format(option_or_id))

def GetIntegrationOptionStatus(self, option_or_id):
if isinstance(option_or_id, IntegrationOption):
option_or_id = option_or_id.id
sub_url = ESOURL_INTEG_OPTIONS + '{}/status'.format(option_or_id)
result = self.ExecutePagedGet_ESO(sub_url)
return result['data']

def StartIntegrationOption(self, option_or_id):
if isinstance(option_or_id, IntegrationOption):
option_or_id = option_or_id.id
sub_url = ESOURL_INTEG_OPTIONS + '{}/state'.format(option_or_id)
result = self.ExecutePost_ESO(sub_url, {})
return result['data']
Loading