From 5ec35b4e25ac6d81d304182222538565dd95f80a Mon Sep 17 00:00:00 2001 From: Daniel Blankenberg Date: Wed, 6 May 2026 14:17:58 -0400 Subject: [PATCH] feat: add IBM Quantum Platform REST client Add QuantumPlatformV1 for low-level access to the IBM Quantum Platform / Qiskit Runtime REST API. The client follows the existing platform-services SDK style with BaseService, SDK core authenticators, environment-based new_instance() configuration, and DetailedResponse returns. It supports constructor-level instance CRN and API version defaults, plus per-call overrides for Service-CRN and IBM-API-Version. Also add focused unit tests, example coverage for CRN discovery through ResourceControllerV2, and a README service table entry. --- README.md | 1 + examples/test_quantum_platform_v1_examples.py | 213 ++++ ibm_platform_services/__init__.py | 1 + ibm_platform_services/quantum_platform_v1.py | 909 ++++++++++++++++++ test/unit/test_quantum_platform_v1.py | 331 +++++++ 5 files changed, 1455 insertions(+) create mode 100644 examples/test_quantum_platform_v1_examples.py create mode 100644 ibm_platform_services/quantum_platform_v1.py create mode 100644 test/unit/test_quantum_platform_v1.py diff --git a/README.md b/README.md index b0faf4c5..8a4da0a4 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,7 @@ Service Name | Module Name | Service Class Name [IBM Cloud Shell](https://cloud.ibm.com/apidocs/cloudshell?code=python) | ibm_cloud_shell_v1 | IbmCloudShellV1 [Open Service Broker](https://cloud.ibm.com/apidocs/resource-controller/ibm-cloud-osb-api?code=python) | open_service_broker_v1 | OpenServiceBrokerV1 [Partner Management APIs](https://cloud.ibm.com/apidocs/partner-apis/partner?code=python) | partner_management_v1 | PartnerManagementV1 +[Quantum Platform](https://quantum.cloud.ibm.com/docs/en/api/qiskit-runtime-rest) | quantum_platform_v1 | QuantumPlatformV1 [Resource Controller](https://cloud.ibm.com/apidocs/resource-controller/resource-controller?code=python) | resource_controller_v2 | ResourceControllerV2 [Resource Manager](https://cloud.ibm.com/apidocs/resource-controller/resource-manager?code=python) | resource_manager_v2 | ResourceManagerV2 [Usage Metering](https://cloud.ibm.com/apidocs/usage-metering?code=python) | usage_metering_v4 | UsageMeteringV4 diff --git a/examples/test_quantum_platform_v1_examples.py b/examples/test_quantum_platform_v1_examples.py new file mode 100644 index 00000000..cdfae7a4 --- /dev/null +++ b/examples/test_quantum_platform_v1_examples.py @@ -0,0 +1,213 @@ +# -*- coding: utf-8 -*- +# (C) Copyright IBM Corp. 2026. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Examples for QuantumPlatformV1 +""" + +import json +import os +import pytest +from ibm_cloud_sdk_core import ApiException, read_external_sources +from ibm_platform_services.global_catalog_v1 import GlobalCatalogV1 +from ibm_platform_services.quantum_platform_v1 import * +from ibm_platform_services.resource_controller_v2 import ResourceControllerV2, ResourceInstancesPager + +# +# This file provides examples of how to use the IBM Quantum Platform REST service. +# +# The following configuration properties are assumed to be defined: +# +# QUANTUM_PLATFORM_URL= +# QUANTUM_PLATFORM_AUTH_TYPE=iam +# QUANTUM_PLATFORM_AUTH_URL= +# QUANTUM_PLATFORM_APIKEY= +# QUANTUM_PLATFORM_INSTANCE_CRN= +# QUANTUM_PLATFORM_API_VERSION=2026-03-15 +# +# RESOURCE_CONTROLLER_URL= +# RESOURCE_CONTROLLER_AUTH_TYPE=iam +# RESOURCE_CONTROLLER_APIKEY= +# +# GLOBAL_CATALOG_URL= +# GLOBAL_CATALOG_AUTH_TYPE=iam +# GLOBAL_CATALOG_APIKEY= +# +# These configuration properties can be exported as environment variables, or stored +# in a configuration file and then: +# export IBM_CREDENTIALS_FILE= +# +# Qiskit credentials stored in ~/.qiskit/qiskit-ibm.json are not read automatically +# by this SDK. If qiskit-ibm-runtime is installed, callers can read saved accounts +# and use the saved token as an IBM Cloud API key: +# +# from ibm_cloud_sdk_core.authenticators import IAMAuthenticator, BearerTokenAuthenticator +# from qiskit_ibm_runtime import QiskitRuntimeService +# +# account = QiskitRuntimeService.saved_accounts(channel='ibm_cloud')['default-ibm-cloud'] +# authenticator = IAMAuthenticator(account['token']) +# quantum_platform_service = QuantumPlatformV1( +# authenticator=authenticator, +# instance_crn=account['instance'], +# ) +# resource_controller_service = ResourceControllerV2(authenticator=authenticator) +# +# Use BearerTokenAuthenticator only when you already have an issued bearer/access token, +# not for qiskit-ibm-runtime saved account tokens. +# +config_file = 'quantum_platform.env' + +quantum_platform_service = None +resource_controller_service = None +global_catalog_service = None +config = None + + +class TestQuantumPlatformV1Examples: + """ + Example Test Class for QuantumPlatformV1 + """ + + @classmethod + def setup_class(cls): + global quantum_platform_service + global resource_controller_service + global global_catalog_service + global config + + if os.path.exists(config_file): + os.environ['IBM_CREDENTIALS_FILE'] = config_file + + # begin-common + + quantum_platform_service = QuantumPlatformV1.new_instance() + resource_controller_service = ResourceControllerV2.new_instance() + global_catalog_service = GlobalCatalogV1.new_instance() + + # end-common + assert quantum_platform_service is not None + assert resource_controller_service is not None + assert global_catalog_service is not None + + config = read_external_sources(QuantumPlatformV1.DEFAULT_SERVICE_NAME) + + print('Setup complete.') + + needscredentials = pytest.mark.skipif( + not os.path.exists(config_file), reason="External configuration not available, skipping..." + ) + + @needscredentials + def test_discover_quantum_instance_crns_example(self): + """ + discover quantum instance CRNs request example + """ + try: + print('\ndiscover quantum instance CRNs result:') + # begin-discover_quantum_instance_crns + + catalog_entries = global_catalog_service.list_catalog_entries( + q='name:quantum-computing kind:service', + limit=10, + ).get_result() + quantum_service = next( + entry + for entry in catalog_entries['resources'] + if entry.get('name') == 'quantum-computing' + ) + + all_results = [] + pager = ResourceInstancesPager( + client=resource_controller_service, + type='service_instance', + resource_id=quantum_service['id'], + limit=50, + ) + + while pager.has_next(): + next_page = pager.get_next() + assert next_page is not None + all_results.extend(next_page) + + for instance in all_results: + print( + json.dumps( + { + 'name': instance.get('name'), + 'crn': instance.get('crn'), + 'region_id': instance.get('region_id'), + 'resource_plan_id': instance.get('resource_plan_id'), + }, + indent=2, + ) + ) + + # end-discover_quantum_instance_crns + except ApiException as e: + pytest.fail(str(e)) + + @needscredentials + def test_list_backends_example(self): + """ + list_backends request example + """ + try: + print('\nlist_backends() result:') + # begin-list_backends + + response = quantum_platform_service.list_backends() + backends = response.get_result() + + print(json.dumps(backends, indent=2)) + + # end-list_backends + except ApiException as e: + pytest.fail(str(e)) + + @needscredentials + def test_get_usage_example(self): + """ + get_usage request example + """ + try: + print('\nget_usage() result:') + # begin-get_usage + + response = quantum_platform_service.get_usage() + usage = response.get_result() + + print(json.dumps(usage, indent=2)) + + # end-get_usage + except ApiException as e: + pytest.fail(str(e)) + + @needscredentials + def test_find_instance_workloads_example(self): + """ + find_instance_workloads request example + """ + try: + print('\nfind_instance_workloads() result:') + # begin-find_instance_workloads + + response = quantum_platform_service.find_instance_workloads(limit=10) + workloads = response.get_result() + + print(json.dumps(workloads, indent=2)) + + # end-find_instance_workloads + except ApiException as e: + pytest.fail(str(e)) diff --git a/ibm_platform_services/__init__.py b/ibm_platform_services/__init__.py index b94687ca..3a801fa5 100644 --- a/ibm_platform_services/__init__.py +++ b/ibm_platform_services/__init__.py @@ -35,6 +35,7 @@ from .ibm_cloud_shell_v1 import IbmCloudShellV1 from .open_service_broker_v1 import OpenServiceBrokerV1 from .partner_management_v1 import PartnerManagementV1 +from .quantum_platform_v1 import QuantumPlatformV1 from .resource_controller_v2 import ResourceControllerV2 from .resource_manager_v2 import ResourceManagerV2 from .usage_metering_v4 import UsageMeteringV4 diff --git a/ibm_platform_services/quantum_platform_v1.py b/ibm_platform_services/quantum_platform_v1.py new file mode 100644 index 00000000..73ab159c --- /dev/null +++ b/ibm_platform_services/quantum_platform_v1.py @@ -0,0 +1,909 @@ +# coding: utf-8 + +# (C) Copyright IBM Corp. 2026. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +IBM Quantum Platform REST API client. + +This service exposes the low-level IBM Quantum Platform / Qiskit Runtime REST +endpoints. For Qiskit-native circuits, primitives, sessions, backends, and result +handling, use qiskit-ibm-runtime. + +API Version: 2026-03-15 +""" + +from typing import List, Optional +import json + +from ibm_cloud_sdk_core import BaseService, DetailedResponse, read_external_sources +from ibm_cloud_sdk_core.authenticators.authenticator import Authenticator +from ibm_cloud_sdk_core.get_authenticator import get_authenticator_from_environment +from ibm_cloud_sdk_core.utils import convert_model + +from .common import get_sdk_headers + +############################################################################## +# Service +############################################################################## + + +class QuantumPlatformV1(BaseService): + """The Quantum Platform V1 service.""" + + DEFAULT_SERVICE_URL = 'https://quantum.cloud.ibm.com' + DEFAULT_SERVICE_NAME = 'quantum_platform' + DEFAULT_API_VERSION = '2026-03-15' + + @classmethod + def new_instance( + cls, + service_name: str = DEFAULT_SERVICE_NAME, + ) -> 'QuantumPlatformV1': + """ + Return a new client for the Quantum Platform service using the specified + parameters and external configuration. + + In addition to standard SDK authenticator configuration, this method reads + QUANTUM_PLATFORM_INSTANCE_CRN and QUANTUM_PLATFORM_API_VERSION from the + configured environment or credentials file. + """ + config = read_external_sources(service_name) or {} + authenticator = get_authenticator_from_environment(service_name) + service = cls( + authenticator=authenticator, + instance_crn=config.get('INSTANCE_CRN'), + api_version=config.get('API_VERSION') or cls.DEFAULT_API_VERSION, + ) + service.configure_service(service_name) + return service + + def __init__( + self, + authenticator: Authenticator = None, + *, + instance_crn: Optional[str] = None, + api_version: Optional[str] = None, + ) -> None: + """ + Construct a new client for the Quantum Platform service. + + :param Authenticator authenticator: The authenticator specifies the authentication mechanism. + :param str instance_crn: (optional) Default IBM Quantum Platform instance CRN + to send as the Service-CRN header on CRN-scoped operations. + :param str api_version: (optional) Default IBM-API-Version header value. + """ + BaseService.__init__(self, service_url=self.DEFAULT_SERVICE_URL, authenticator=authenticator) + self.instance_crn = instance_crn + self.api_version = api_version or self.DEFAULT_API_VERSION + + def set_instance_crn(self, instance_crn: str) -> None: + """Set the default instance CRN used by CRN-scoped operations.""" + self.instance_crn = instance_crn + + def set_api_version(self, api_version: str) -> None: + """Set the default IBM-API-Version header used by API-versioned operations.""" + self.api_version = api_version + + def _resolve_instance_crn(self, service_crn: Optional[str], operation_id: str) -> str: + resolved = service_crn or self.instance_crn + if not resolved: + raise ValueError( + f'service_crn must be provided for {operation_id}, either as a per-call value ' + 'or as the service instance_crn default' + ) + return resolved + + def _encode_path_var(self, value: str) -> str: + return next(self.encode_path_vars(value)) + + def _prepare_headers( + self, + operation_id: str, + *, + service_crn: Optional[str] = None, + ibm_api_version: Optional[str] = None, + require_crn: bool = True, + require_api_version: bool = True, + accept: Optional[str] = 'application/json', + content_type: Optional[str] = None, + user_headers: Optional[dict] = None, + ) -> dict: + headers = {} + headers.update( + get_sdk_headers( + service_name=self.DEFAULT_SERVICE_NAME, + service_version='V1', + operation_id=operation_id, + ) + ) + if accept: + headers['Accept'] = accept + if content_type: + headers['Content-Type'] = content_type + if require_crn: + headers['Service-CRN'] = self._resolve_instance_crn(service_crn, operation_id) + if require_api_version: + headers['IBM-API-Version'] = ibm_api_version or self.api_version or self.DEFAULT_API_VERSION + if user_headers: + headers.update(user_headers) + return headers + + def _send_request( + self, + operation_id: str, + method: str, + url: str, + *, + service_crn: Optional[str] = None, + ibm_api_version: Optional[str] = None, + require_crn: bool = True, + require_api_version: bool = True, + params: Optional[dict] = None, + body: Optional[dict] = None, + accept: Optional[str] = 'application/json', + content_type: Optional[str] = None, + **kwargs, + ) -> DetailedResponse: + user_headers = kwargs.pop('headers', None) + headers = self._prepare_headers( + operation_id, + service_crn=service_crn, + ibm_api_version=ibm_api_version, + require_crn=require_crn, + require_api_version=require_api_version, + accept=accept, + content_type=content_type or ('application/json' if body is not None else None), + user_headers=user_headers, + ) + data = json.dumps(convert_model(body)) if body is not None else None + request = self.prepare_request(method=method, url=url, headers=headers, params=params, data=data) + return self.send(request, **kwargs) + + ######################### + # Versions + ######################### + + def get_versions( + self, + *, + ibm_api_version: Optional[str] = None, + **kwargs, + ) -> DetailedResponse: + """Get supported API versions.""" + return self._send_request( + 'get_versions', + 'GET', + '/api/v1/versions', + ibm_api_version=ibm_api_version, + require_crn=False, + require_api_version=False, + **kwargs, + ) + + ######################### + # Jobs + ######################### + + def create_job( + self, + body: dict, + *, + parent_job_id: Optional[str] = None, + service_crn: Optional[str] = None, + ibm_api_version: Optional[str] = None, + **kwargs, + ) -> DetailedResponse: + """Invoke a Qiskit Runtime primitive using the raw REST request body.""" + if body is None: + raise ValueError('body must be provided') + if parent_job_id: + headers = kwargs.get('headers') or {} + headers['Parent-Job-Id'] = parent_job_id + kwargs['headers'] = headers + return self._send_request( + 'create_job', + 'POST', + '/api/v1/jobs', + service_crn=service_crn, + ibm_api_version=ibm_api_version, + body=body, + **kwargs, + ) + + def list_jobs( + self, + *, + limit: Optional[int] = None, + offset: Optional[int] = None, + pending: Optional[bool] = None, + program: Optional[str] = None, + backend: Optional[str] = None, + created_after: Optional[str] = None, + created_before: Optional[str] = None, + sort: Optional[str] = None, + tags: Optional[List[str]] = None, + session_id: Optional[str] = None, + exclude_params: Optional[bool] = None, + service_crn: Optional[str] = None, + ibm_api_version: Optional[str] = None, + **kwargs, + ) -> DetailedResponse: + """List quantum program jobs.""" + params = { + 'limit': limit, + 'offset': offset, + 'pending': pending, + 'program': program, + 'backend': backend, + 'created_after': created_after, + 'created_before': created_before, + 'sort': sort, + 'tags': tags, + 'session_id': session_id, + 'exclude_params': exclude_params, + } + return self._send_request( + 'list_jobs', + 'GET', + '/api/v1/jobs', + service_crn=service_crn, + ibm_api_version=ibm_api_version, + params=params, + **kwargs, + ) + + def get_job( + self, + id: str, + *, + exclude_params: Optional[bool] = None, + service_crn: Optional[str] = None, + ibm_api_version: Optional[str] = None, + **kwargs, + ) -> DetailedResponse: + """List details about the specified quantum program job.""" + if id is None: + raise ValueError('id must be provided') + url = '/api/v1/jobs/{id}'.format(id=self._encode_path_var(id)) + return self._send_request( + 'get_job', + 'GET', + url, + service_crn=service_crn, + ibm_api_version=ibm_api_version, + params={'exclude_params': exclude_params}, + **kwargs, + ) + + def delete_job( + self, + id: str, + *, + service_crn: Optional[str] = None, + ibm_api_version: Optional[str] = None, + **kwargs, + ) -> DetailedResponse: + """Delete the specified job and its associated data.""" + if id is None: + raise ValueError('id must be provided') + url = '/api/v1/jobs/{id}'.format(id=self._encode_path_var(id)) + return self._send_request( + 'delete_job', + 'DELETE', + url, + service_crn=service_crn, + ibm_api_version=ibm_api_version, + **kwargs, + ) + + def cancel_job_jid( + self, + id: str, + *, + parent_job_id: Optional[str] = None, + service_crn: Optional[str] = None, + ibm_api_version: Optional[str] = None, + **kwargs, + ) -> DetailedResponse: + """Cancel the specified job.""" + if id is None: + raise ValueError('id must be provided') + if parent_job_id: + headers = kwargs.get('headers') or {} + headers['Parent-Job-Id'] = parent_job_id + kwargs['headers'] = headers + url = '/api/v1/jobs/{id}/cancel'.format(id=self._encode_path_var(id)) + return self._send_request( + 'cancel_job_jid', + 'POST', + url, + service_crn=service_crn, + ibm_api_version=ibm_api_version, + **kwargs, + ) + + def get_jog_logs_jid( + self, + id: str, + *, + service_crn: Optional[str] = None, + ibm_api_version: Optional[str] = None, + **kwargs, + ) -> DetailedResponse: + """List all job logs for the specified job.""" + if id is None: + raise ValueError('id must be provided') + url = '/api/v1/jobs/{id}/logs'.format(id=self._encode_path_var(id)) + return self._send_request( + 'get_jog_logs_jid', + 'GET', + url, + service_crn=service_crn, + ibm_api_version=ibm_api_version, + accept='text/plain', + **kwargs, + ) + + def get_job_metrics_jid( + self, + id: str, + *, + service_crn: Optional[str] = None, + ibm_api_version: Optional[str] = None, + **kwargs, + ) -> DetailedResponse: + """Get metrics for the specified job.""" + if id is None: + raise ValueError('id must be provided') + url = '/api/v1/jobs/{id}/metrics'.format(id=self._encode_path_var(id)) + return self._send_request( + 'get_job_metrics_jid', + 'GET', + url, + service_crn=service_crn, + ibm_api_version=ibm_api_version, + **kwargs, + ) + + def get_job_results_jid( + self, + id: str, + *, + service_crn: Optional[str] = None, + ibm_api_version: Optional[str] = None, + **kwargs, + ) -> DetailedResponse: + """Return the final result from the specified job.""" + if id is None: + raise ValueError('id must be provided') + url = '/api/v1/jobs/{id}/results'.format(id=self._encode_path_var(id)) + return self._send_request( + 'get_job_results_jid', + 'GET', + url, + service_crn=service_crn, + ibm_api_version=ibm_api_version, + **kwargs, + ) + + def replace_job_tags( + self, + id: str, + tags: List[str], + *, + service_crn: Optional[str] = None, + ibm_api_version: Optional[str] = None, + **kwargs, + ) -> DetailedResponse: + """Replace tags for the specified job.""" + if id is None: + raise ValueError('id must be provided') + if tags is None: + raise ValueError('tags must be provided') + url = '/api/v1/jobs/{id}/tags'.format(id=self._encode_path_var(id)) + return self._send_request( + 'replace_job_tags', + 'PUT', + url, + service_crn=service_crn, + ibm_api_version=ibm_api_version, + body={'tags': tags}, + **kwargs, + ) + + ######################### + # Backends + ######################### + + def list_backends( + self, + *, + fields: Optional[str] = None, + service_crn: Optional[str] = None, + ibm_api_version: Optional[str] = None, + **kwargs, + ) -> DetailedResponse: + """List backends available to the current instance.""" + return self._send_request( + 'list_backends', + 'GET', + '/api/v1/backends', + service_crn=service_crn, + ibm_api_version=ibm_api_version, + params={'fields': fields}, + **kwargs, + ) + + def get_backend_configuration( + self, + backend_name: str, + *, + calibration_id: Optional[str] = None, + service_crn=None, + ibm_api_version=None, + **kwargs, + ) -> DetailedResponse: + """Get backend configuration.""" + if backend_name is None: + raise ValueError('backend_name must be provided') + url = '/api/v1/backends/{backend_name}/configuration'.format( + backend_name=self._encode_path_var(backend_name) + ) + return self._send_request( + 'get_backend_configuration', + 'GET', + url, + service_crn=service_crn, + ibm_api_version=ibm_api_version, + params={'calibration_id': calibration_id}, + **kwargs, + ) + + def get_backend_defaults( + self, + backend_name: str, + *, + service_crn=None, + ibm_api_version=None, + **kwargs, + ) -> DetailedResponse: + """Get backend default settings.""" + if backend_name is None: + raise ValueError('backend_name must be provided') + url = '/api/v1/backends/{backend_name}/defaults'.format(backend_name=self._encode_path_var(backend_name)) + return self._send_request( + 'get_backend_defaults', + 'GET', + url, + service_crn=service_crn, + ibm_api_version=ibm_api_version, + **kwargs, + ) + + def get_backend_properties( + self, + backend_name: str, + *, + updated_before: Optional[str] = None, + calibration_id: Optional[str] = None, + service_crn=None, + ibm_api_version=None, + **kwargs, + ) -> DetailedResponse: + """Get backend properties.""" + if backend_name is None: + raise ValueError('backend_name must be provided') + url = '/api/v1/backends/{backend_name}/properties'.format(backend_name=self._encode_path_var(backend_name)) + return self._send_request( + 'get_backend_properties', + 'GET', + url, + service_crn=service_crn, + ibm_api_version=ibm_api_version, + params={'updated_before': updated_before, 'calibration_id': calibration_id}, + **kwargs, + ) + + def get_backend_status( + self, + backend_name: str, + *, + service_crn=None, + ibm_api_version=None, + **kwargs, + ) -> DetailedResponse: + """Get backend status.""" + if backend_name is None: + raise ValueError('backend_name must be provided') + url = '/api/v1/backends/{backend_name}/status'.format(backend_name=self._encode_path_var(backend_name)) + return self._send_request( + 'get_backend_status', + 'GET', + url, + service_crn=service_crn, + ibm_api_version=ibm_api_version, + **kwargs, + ) + + ######################### + # Sessions + ######################### + + def create_session( + self, + body: Optional[dict] = None, + *, + service_crn=None, + ibm_api_version=None, + **kwargs, + ) -> DetailedResponse: + """Create a job session.""" + return self._send_request( + 'create_session', + 'POST', + '/api/v1/sessions', + service_crn=service_crn, + ibm_api_version=ibm_api_version, + body=body, + **kwargs, + ) + + def get_session(self, id: str, *, service_crn=None, ibm_api_version=None, **kwargs) -> DetailedResponse: + """Get a job session.""" + if id is None: + raise ValueError('id must be provided') + url = '/api/v1/sessions/{id}'.format(id=self._encode_path_var(id)) + return self._send_request( + 'get_session', + 'GET', + url, + service_crn=service_crn, + ibm_api_version=ibm_api_version, + **kwargs, + ) + + def update_session( + self, + id: str, + body: dict, + *, + service_crn=None, + ibm_api_version=None, + **kwargs, + ) -> DetailedResponse: + """Update a job session.""" + if id is None: + raise ValueError('id must be provided') + if body is None: + raise ValueError('body must be provided') + url = '/api/v1/sessions/{id}'.format(id=self._encode_path_var(id)) + return self._send_request( + 'update_session', + 'PATCH', + url, + service_crn=service_crn, + ibm_api_version=ibm_api_version, + body=body, + **kwargs, + ) + + def delete_session_close( + self, + id: str, + *, + service_crn=None, + ibm_api_version=None, + **kwargs, + ) -> DetailedResponse: + """Close a job session.""" + if id is None: + raise ValueError('id must be provided') + url = '/api/v1/sessions/{id}/close'.format(id=self._encode_path_var(id)) + return self._send_request( + 'delete_session_close', + 'DELETE', + url, + service_crn=service_crn, + ibm_api_version=ibm_api_version, + **kwargs, + ) + + ######################### + # Instances + ######################### + + def get_instance(self, *, service_crn=None, ibm_api_version=None, **kwargs) -> DetailedResponse: + """Get current instance details.""" + return self._send_request( + 'get_instance', + 'GET', + '/api/v1/instance', + service_crn=service_crn, + ibm_api_version=ibm_api_version, + **kwargs, + ) + + def get_instance_configuration(self, *, service_crn=None, ibm_api_version=None, **kwargs) -> DetailedResponse: + """Get instance configuration.""" + return self._send_request( + 'get_instance_configuration', + 'GET', + '/api/v1/instances/configuration', + service_crn=service_crn, + ibm_api_version=ibm_api_version, + **kwargs, + ) + + def replace_instance_configuration( + self, + body: dict, + *, + service_crn=None, + ibm_api_version=None, + **kwargs, + ) -> DetailedResponse: + """Update instance configuration.""" + if body is None: + raise ValueError('body must be provided') + return self._send_request( + 'replace_instance_configuration', + 'PUT', + '/api/v1/instances/configuration', + service_crn=service_crn, + ibm_api_version=ibm_api_version, + body=body, + **kwargs, + ) + + def get_usage(self, *, service_crn=None, ibm_api_version=None, **kwargs) -> DetailedResponse: + """Get instance usage.""" + return self._send_request( + 'get_usage', + 'GET', + '/api/v1/instances/usage', + service_crn=service_crn, + ibm_api_version=ibm_api_version, + **kwargs, + ) + + ######################### + # Tags, Accounts, Workloads, Analytics + ######################### + + def list_tags( + self, + type: str = 'job', + search: str = None, + *, + service_crn=None, + ibm_api_version=None, + **kwargs, + ) -> DetailedResponse: + """List tags.""" + if type is None: + raise ValueError('type must be provided') + if search is None: + raise ValueError('search must be provided') + return self._send_request( + 'list_tags', + 'GET', + '/api/v1/tags', + service_crn=service_crn, + ibm_api_version=ibm_api_version, + params={'type': type, 'search': search}, + **kwargs, + ) + + def get_account( + self, + id: str, + *, + plan_id: Optional[str] = None, + service_crn=None, + ibm_api_version=None, + **kwargs, + ) -> DetailedResponse: + """Get account configuration.""" + if id is None: + raise ValueError('id must be provided') + url = '/api/v1/accounts/{id}'.format(id=self._encode_path_var(id)) + return self._send_request( + 'get_account', + 'GET', + url, + service_crn=service_crn, + ibm_api_version=ibm_api_version, + params={'plan_id': plan_id}, + **kwargs, + ) + + def find_instance_workloads( + self, + *, + user: Optional[str] = None, + sort: Optional[str] = None, + limit: Optional[int] = None, + previous: Optional[str] = None, + next: Optional[str] = None, + backend: Optional[str] = None, + search: Optional[str] = None, + status: Optional[List[str]] = None, + mode: Optional[str] = None, + created_after: Optional[str] = None, + created_before: Optional[str] = None, + tags: Optional[List[str]] = None, + service_crn=None, + ibm_api_version=None, + **kwargs, + ) -> DetailedResponse: + """List user instance workloads.""" + params = { + 'user': user, + 'sort': sort, + 'limit': limit, + 'previous': previous, + 'next': next, + 'backend': backend, + 'search': search, + 'status': status, + 'mode': mode, + 'created_after': created_after, + 'created_before': created_before, + 'tags': tags, + } + return self._send_request( + 'find_instance_workloads', + 'GET', + '/api/v1/workloads', + service_crn=service_crn, + ibm_api_version=ibm_api_version, + params=params, + **kwargs, + ) + + def analytics_filters( + self, + *, + instance: Optional[List[str]] = None, + service_crn=None, + ibm_api_version=None, + **kwargs, + ) -> DetailedResponse: + """Get usage analytics filters.""" + return self._send_request( + 'analytics_filters', + 'GET', + '/api/v1/analytics/filters', + service_crn=service_crn, + ibm_api_version=ibm_api_version, + params={'instance': instance}, + **kwargs, + ) + + def analytics_usage( + self, + *, + instance: Optional[List[str]] = None, + interval_start: Optional[str] = None, + interval_end: Optional[str] = None, + backend: Optional[List[str]] = None, + user_id: Optional[List[str]] = None, + simulators: Optional[bool] = None, + plan: Optional[List[str]] = None, + subscription_id: Optional[List[str]] = None, + service_crn=None, + ibm_api_version=None, + **kwargs, + ) -> DetailedResponse: + """Get usage analytics.""" + params = { + 'instance': instance, + 'interval_start': interval_start, + 'interval_end': interval_end, + 'backend': backend, + 'user_id': user_id, + 'simulators': simulators, + 'plan': plan, + 'subscription_id': subscription_id, + } + return self._send_request( + 'analytics_usage', + 'GET', + '/api/v1/analytics/usage', + service_crn=service_crn, + ibm_api_version=ibm_api_version, + params=params, + **kwargs, + ) + + def get_usage_analytics_grouped( + self, + group_by: str, + *, + instance: Optional[List[str]] = None, + interval_start: Optional[str] = None, + interval_end: Optional[str] = None, + backend: Optional[List[str]] = None, + user_id: Optional[List[str]] = None, + simulators: Optional[bool] = None, + plan: Optional[List[str]] = None, + subscription_id: Optional[List[str]] = None, + service_crn=None, + ibm_api_version=None, + **kwargs, + ) -> DetailedResponse: + """Get usage analytics grouped.""" + if group_by is None: + raise ValueError('group_by must be provided') + params = { + 'group_by': group_by, + 'instance': instance, + 'interval_start': interval_start, + 'interval_end': interval_end, + 'backend': backend, + 'user_id': user_id, + 'simulators': simulators, + 'plan': plan, + 'subscription_id': subscription_id, + } + return self._send_request( + 'get_usage_analytics_grouped', + 'GET', + '/api/v1/analytics/usage_grouped', + service_crn=service_crn, + ibm_api_version=ibm_api_version, + params=params, + **kwargs, + ) + + def get_usage_analytics_grouped_by_date( + self, + group_by: str, + *, + instance: Optional[List[str]] = None, + interval_start: Optional[str] = None, + interval_end: Optional[str] = None, + backend: Optional[List[str]] = None, + user_id: Optional[List[str]] = None, + simulators: Optional[bool] = None, + plan: Optional[List[str]] = None, + subscription_id: Optional[List[str]] = None, + service_crn=None, + ibm_api_version=None, + **kwargs, + ) -> DetailedResponse: + """Get usage analytics grouped by date.""" + if group_by is None: + raise ValueError('group_by must be provided') + params = { + 'group_by': group_by, + 'instance': instance, + 'interval_start': interval_start, + 'interval_end': interval_end, + 'backend': backend, + 'user_id': user_id, + 'simulators': simulators, + 'plan': plan, + 'subscription_id': subscription_id, + } + return self._send_request( + 'get_usage_analytics_grouped_by_date', + 'GET', + '/api/v1/analytics/usage_grouped_by_date', + service_crn=service_crn, + ibm_api_version=ibm_api_version, + params=params, + **kwargs, + ) diff --git a/test/unit/test_quantum_platform_v1.py b/test/unit/test_quantum_platform_v1.py new file mode 100644 index 00000000..6f87ad95 --- /dev/null +++ b/test/unit/test_quantum_platform_v1.py @@ -0,0 +1,331 @@ +# -*- coding: utf-8 -*- +# (C) Copyright IBM Corp. 2026. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Unit Tests for QuantumPlatformV1 +""" + +from ibm_cloud_sdk_core.authenticators.no_auth_authenticator import NoAuthAuthenticator +from ibm_cloud_sdk_core.authenticators.bearer_token_authenticator import BearerTokenAuthenticator +import json +import os +import pytest +import re +import responses +import urllib +import ibm_platform_services.quantum_platform_v1 as quantum_platform_v1 +from ibm_platform_services.quantum_platform_v1 import * + + +_base_url = 'https://quantum.cloud.ibm.com' +_service = QuantumPlatformV1( + authenticator=NoAuthAuthenticator(), + instance_crn='crn:v1:bluemix:public:quantum-computing:us-east:a/test:instance::', +) +_service.set_service_url(_base_url) + + +def preprocess_url(operation_path: str): + operation_path = urllib.parse.unquote(operation_path) + operation_path = urllib.parse.quote(operation_path, safe='/') + request_url = _base_url + operation_path + if not request_url.endswith('/'): + return request_url + return re.compile(request_url.rstrip('/') + '/+') + + +class TestNewInstance: + """ + Test Class for new_instance + """ + + def test_new_instance(self): + """ + new_instance() + """ + os.environ['TEST_SERVICE_AUTH_TYPE'] = 'noAuth' + os.environ['TEST_SERVICE_INSTANCE_CRN'] = 'crn:test' + os.environ['TEST_SERVICE_API_VERSION'] = '2026-03-15' + + service = QuantumPlatformV1.new_instance(service_name='TEST_SERVICE') + + assert service is not None + assert isinstance(service, QuantumPlatformV1) + assert service.instance_crn == 'crn:test' + assert service.api_version == '2026-03-15' + + del os.environ['TEST_SERVICE_AUTH_TYPE'] + del os.environ['TEST_SERVICE_INSTANCE_CRN'] + del os.environ['TEST_SERVICE_API_VERSION'] + + def test_new_instance_without_authenticator(self): + """ + new_instance_without_authenticator() + """ + with pytest.raises(ValueError, match='authenticator must be provided'): + QuantumPlatformV1.new_instance(service_name='TEST_SERVICE_NOT_FOUND') + + +class TestHeaders: + """ + Test Class for Quantum-specific headers + """ + + @responses.activate + def test_constructor_instance_crn_and_api_version_headers(self): + """ + list_backends() + """ + url = preprocess_url('/api/v1/backends') + responses.add(responses.GET, url, body='{"backends": []}', content_type='application/json', status=200) + + response = _service.list_backends(headers={}) + + assert response.status_code == 200 + assert len(responses.calls) == 1 + request_headers = responses.calls[0].request.headers + assert request_headers['Service-CRN'] == _service.instance_crn + assert request_headers['IBM-API-Version'] == QuantumPlatformV1.DEFAULT_API_VERSION + + @responses.activate + def test_per_call_headers_override_constructor_defaults(self): + """ + list_backends with per-call overrides + """ + url = preprocess_url('/api/v1/backends') + responses.add(responses.GET, url, body='{"backends": []}', content_type='application/json', status=200) + + response = _service.list_backends( + service_crn='crn:override', + ibm_api_version='2027-01-01', + headers={}, + ) + + assert response.status_code == 200 + request_headers = responses.calls[0].request.headers + assert request_headers['Service-CRN'] == 'crn:override' + assert request_headers['IBM-API-Version'] == '2027-01-01' + + def test_missing_crn_raises_value_error(self): + """ + list_backends without instance CRN + """ + service = QuantumPlatformV1(authenticator=NoAuthAuthenticator()) + with pytest.raises(ValueError, match='service_crn must be provided'): + service.list_backends(headers={}) + + @responses.activate + def test_versions_does_not_require_crn(self): + """ + get_versions() + """ + url = preprocess_url('/api/v1/versions') + responses.add(responses.GET, url, body='{"versions": []}', content_type='application/json', status=200) + + service = QuantumPlatformV1(authenticator=NoAuthAuthenticator()) + service.set_service_url(_base_url) + response = service.get_versions(headers={}) + + assert response.status_code == 200 + request_headers = responses.calls[0].request.headers + assert 'Service-CRN' not in request_headers + assert 'IBM-API-Version' not in request_headers + + @responses.activate + def test_eu_url_override(self): + """ + list_backends with EU URL + """ + eu_url = 'https://eu-de.quantum.cloud.ibm.com' + service = QuantumPlatformV1(authenticator=NoAuthAuthenticator(), instance_crn='crn:test') + service.set_service_url(eu_url) + responses.add( + responses.GET, + eu_url + '/api/v1/backends', + body='{"backends": []}', + content_type='application/json', + status=200, + ) + + response = service.list_backends(headers={}) + + assert response.status_code == 200 + assert len(responses.calls) == 1 + + @responses.activate + def test_bearer_token_authenticator_adds_authorization_header(self): + """ + BearerTokenAuthenticator() + """ + service = QuantumPlatformV1(authenticator=BearerTokenAuthenticator('test-token'), instance_crn='crn:test') + service.set_service_url(_base_url) + url = preprocess_url('/api/v1/backends') + responses.add(responses.GET, url, body='{"backends": []}', content_type='application/json', status=200) + + response = service.list_backends(headers={}) + + assert response.status_code == 200 + assert responses.calls[0].request.headers['Authorization'] == 'Bearer test-token' + + +class TestJobs: + """ + Test Class for job operations + """ + + @responses.activate + def test_create_job(self): + """ + create_job() + """ + url = preprocess_url('/api/v1/jobs') + responses.add(responses.POST, url, body='{"id": "job-id"}', content_type='application/json', status=201) + + body = {'program_id': 'sampler', 'backend': 'ibm_test', 'params': {'pubs': []}} + response = _service.create_job(body=body, parent_job_id='parent-id', headers={}) + + assert response.status_code == 201 + assert json.loads(str(responses.calls[0].request.body, 'utf-8')) == body + assert responses.calls[0].request.headers['Parent-Job-Id'] == 'parent-id' + + @responses.activate + def test_replace_job_tags(self): + """ + replace_job_tags() + """ + url = preprocess_url('/api/v1/jobs/job-id/tags') + responses.add(responses.PUT, url, body='{"tags": ["a"]}', content_type='application/json', status=200) + + response = _service.replace_job_tags('job-id', ['a'], headers={}) + + assert response.status_code == 200 + assert json.loads(str(responses.calls[0].request.body, 'utf-8')) == {'tags': ['a']} + + @responses.activate + def test_job_operation_id_methods(self): + """ + OpenAPI operationId methods + """ + responses.add(responses.POST, preprocess_url('/api/v1/jobs/job-id/cancel'), status=204) + responses.add( + responses.GET, + preprocess_url('/api/v1/jobs/job-id/logs'), + body='logs', + content_type='text/plain', + status=200, + ) + + cancel_response = _service.cancel_job_jid('job-id', parent_job_id='parent-id', headers={}) + logs_response = _service.get_jog_logs_jid('job-id', headers={}) + + assert cancel_response.status_code == 204 + assert logs_response.status_code == 200 + assert responses.calls[0].request.headers['Parent-Job-Id'] == 'parent-id' + assert responses.calls[1].request.headers['Accept'] == 'text/plain' + + @responses.activate + def test_sdk_headers_use_openapi_operation_id(self, monkeypatch): + """ + get_sdk_headers() receives the OpenAPI operationId. + """ + operation_ids = [] + + def mock_get_sdk_headers(service_name, service_version, operation_id): + operation_ids.append(operation_id) + return {} + + monkeypatch.setattr(quantum_platform_v1, 'get_sdk_headers', mock_get_sdk_headers) + responses.add(responses.POST, preprocess_url('/api/v1/jobs'), body='{}', status=200) + + _service.create_job({'program_id': 'sampler'}, headers={}) + + assert operation_ids == ['create_job'] + + def test_friendly_aliases_are_not_present(self): + """ + Friendly method aliases are intentionally not part of the public API. + """ + friendly_names = [ + 'run_job', + 'cancel_job', + 'list_job_logs', + 'list_job_results', + 'get_job_metrics', + 'close_session', + 'update_instance_configuration', + 'get_instance_usage', + 'get_account_configuration', + 'list_workloads', + 'get_analytics_filters', + 'get_usage_analytics', + ] + + for name in friendly_names: + assert not hasattr(QuantumPlatformV1, name) + + +class TestSessionsAndAnalytics: + """ + Test Class for session and analytics operation details + """ + + @responses.activate + def test_update_and_delete_session_close_methods(self): + """ + update_session() and delete_session_close() + """ + update_url = preprocess_url('/api/v1/sessions/session-id') + close_url = preprocess_url('/api/v1/sessions/session-id/close') + responses.add(responses.PATCH, update_url, status=204) + responses.add(responses.DELETE, close_url, status=204) + + update_response = _service.update_session('session-id', {'accepting_jobs': True}, headers={}) + close_response = _service.delete_session_close('session-id', headers={}) + + assert update_response.status_code == 204 + assert close_response.status_code == 204 + assert responses.calls[0].request.method == 'PATCH' + assert responses.calls[1].request.method == 'DELETE' + + @responses.activate + def test_grouped_analytics_path_and_query(self): + """ + get_usage_analytics_grouped() + """ + url = preprocess_url('/api/v1/analytics/usage_grouped') + responses.add(responses.GET, url, body='{}', content_type='application/json', status=200) + + response = _service.get_usage_analytics_grouped('backend', instance=['crn:test'], headers={}) + + assert response.status_code == 200 + assert 'group_by=backend' in responses.calls[0].request.url + assert 'instance=crn%3Atest' in responses.calls[0].request.url + + @responses.activate + def test_other_operation_id_methods(self): + """ + OpenAPI operationId methods for non-job operations + """ + responses.add(responses.GET, preprocess_url('/api/v1/accounts/account-id'), body='{}', status=200) + responses.add(responses.GET, preprocess_url('/api/v1/workloads'), body='{}', status=200) + responses.add(responses.GET, preprocess_url('/api/v1/analytics/usage'), body='{}', status=200) + responses.add(responses.GET, preprocess_url('/api/v1/analytics/filters'), body='{}', status=200) + responses.add(responses.GET, preprocess_url('/api/v1/instances/usage'), body='{}', status=200) + + assert _service.get_account('account-id', headers={}).status_code == 200 + assert _service.find_instance_workloads(headers={}).status_code == 200 + assert _service.analytics_usage(headers={}).status_code == 200 + assert _service.analytics_filters(headers={}).status_code == 200 + assert _service.get_usage(headers={}).status_code == 200