Přeskočit na hlavní obsah

Správa výpočetních a datových zdrojů Qiskit Serverless

Verze balíčků

Kód na této stránce byl vyvinut s použitím následujících požadavků. Doporučujeme používat tyto nebo novější verze.

qiskit[all]~=2.0.0
qiskit-ibm-runtime~=0.37.0
qiskit-serverless~=0.22.0

S Qiskit Serverless můžeš spravovat výpočetní a datové zdroje napříč svým Qiskit patternem, včetně CPU, QPU a dalších výpočetních akcelerátorů.

Nastavení podrobných stavů

Serverless úlohy procházejí během pracovního postupu několika fázemi. Ve výchozím nastavení jsou pomocí job.status() dostupné následující stavy:

  • QUEUED: úloha čeká ve frontě na klasické zdroje
  • INITIALIZING: úloha se nastavuje
  • RUNNING: úloha právě běží na klasických zdrojích
  • DONE: úloha byla úspěšně dokončena

Můžeš také nastavit vlastní stavy, které blíže popisují konkrétní fázi pracovního postupu, jak je uvedeno níže.

# Added by doQumentation — required packages for this notebook
!pip install -q qiskit qiskit-ibm-runtime qiskit-serverless
# This cell is hidden from users, it just creates a new folder
from pathlib import Path

Path("./source_files").mkdir(exist_ok=True)
%%writefile ./source_files/status_example.py

from qiskit_serverless import update_status, Job

## If your function has a mapping stage, particularly application functions, you can set the status to "RUNNING: MAPPING" as follows:
update_status(Job.MAPPING)

## While handling transpilation, error suppression, and so forth, you can set the status to "RUNNING: OPTIMIZING_FOR_HARDWARE":
update_status(Job.OPTIMIZING_HARDWARE)

## After you submit jobs to Qiskit Runtime, the underlying quantum job will be queued. You can set status to "RUNNING: WAITING_FOR_QPU":
update_status(Job.WAITING_QPU)

## When the Qiskit Runtime job starts running on the QPU, set the following status "RUNNING: EXECUTING_QPU":
update_status(Job.EXECUTING_QPU)

## Once QPU is completed and post-processing has begun, set the status "RUNNING: POST_PROCESSING":
update_status(Job.POST_PROCESSING)
Writing ./source_files/status_example.py

Po úspěšném dokončení této úlohy (pomocí save_result()) bude stav automaticky aktualizován na DONE.

Paralelní pracovní postupy

Pro klasické úlohy, které lze paralelizovat, použij dekorátor @distribute_task k definování výpočetních požadavků potřebných pro provedení úlohy. Začni tím, že si připomeneš příklad transpile_remote.py z tématu Napište svůj první program Qiskit Serverless s následujícím kódem.

Následující kód vyžaduje, abys již měl/a uložené přihlašovací údaje.

%%writefile ./source_files/transpile_remote.py

from qiskit.transpiler import generate_preset_pass_manager
from qiskit_ibm_runtime import QiskitRuntimeService
from qiskit_serverless import distribute_task

service = QiskitRuntimeService()

@distribute_task(target={"cpu": 1})
def transpile_remote(circuit, optimization_level, backend):
"""Transpiles an abstract circuit (or list of circuits) into an ISA circuit for a given backend."""
pass_manager = generate_preset_pass_manager(
optimization_level=optimization_level,
backend=service.backend(backend)
)
isa_circuit = pass_manager.run(circuit)
return isa_circuit
Writing ./source_files/transpile_remote.py

V tomto příkladu jsi ozdobil/a funkci transpile_remote() dekorátorem @distribute_task(target={"cpu": 1}). Po spuštění vytvoří asynchronní paralelní pracovní úlohu s jediným jádrem CPU a vrátí odkaz pro sledování tohoto workera. Pro získání výsledku předej odkaz funkci get(). To lze využít ke spuštění více paralelních úloh:

%%writefile --append ./source_files/transpile_remote.py

from time import time
from qiskit_serverless import get, get_arguments, save_result, update_status, Job

# Get arguments
arguments = get_arguments()
circuit = arguments.get("circuit")
optimization_level = arguments.get("optimization_level")
backend = arguments.get("backend")
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Start distributed transpilation

update_status(Job.OPTIMIZING_HARDWARE)

start_time = time()
transpile_worker_references = [
transpile_remote(circuit, optimization_level, backend)
for circuit in arguments.get("circuit_list")
]

transpiled_circuits = get(transpile_worker_references)
end_time = time()
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Save result, with metadata

result = {
"circuits": transpiled_circuits,
"metadata": {
"resource_usage": {
"RUNNING: OPTIMIZING_FOR_HARDWARE": {
"CPU_TIME": end_time - start_time,
"QPU_TIME": 0,
},
}
},
}

save_result(result)
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It uploads the serverless program and checks it runs.

def test_serverless_job(title, entrypoint):
# Import in function to stop them interfering with user-facing code
from qiskit.circuit.random import random_circuit
from qiskit_serverless import IBMServerlessClient, QiskitFunction
import time
import uuid

title += "_" + uuid.uuid4().hex[:8]
serverless = IBMServerlessClient()
transpile_remote_demo = QiskitFunction(
title=title,
entrypoint=entrypoint,
working_dir="./source_files/",
)
serverless.upload(transpile_remote_demo)
job = serverless.get(title).run(
circuit=random_circuit(3, 3),
circuit_list=[random_circuit(3, 3) for _ in range(3)],
backend="ibm_torino",
optimization_level=1,
)
for retry in range(25):
time.sleep(5)
status = job.status()
if status == "DONE":
print("Job completed successfully")
return
if status not in [
"QUEUED",
"INITIALIZING",
"RUNNING",
"RUNNING: OPTIMIZING_FOR_HARDWARE",
"DONE",
]:
raise Exception(
f"Unexpected job status '{status}'.\nHere's the logs:\n"
+ job.logs()
)
print(f"Waiting for job (status '{status}')")
raise Exception("Job did not complete in time")

test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully

Prozkoumání různých konfigurací úloh

Pomocí @distribute_task() můžeš flexibilně přidělovat CPU, GPU a paměť svým úlohám. Pro Qiskit Serverless na IBM Quantum® Platform je každý program vybaven 16 jádry CPU a 32 GB RAM, které lze dynamicky přidělovat podle potřeby.

Jádra CPU lze přidělovat jako celá jádra nebo dokonce jako zlomkové přídělky, jak je znázorněno níže.

Paměť se přiděluje v počtu bajtů. Připomeň si, že v kilobajtu je 1024 bajtů, v megabajtu 1024 kilobajtů a v gigabajtu 1024 megabajtů. Chceš-li svému workeru přidělit 2 GB paměti, musíš zadat "mem": 2 * 1024 * 1024 * 1024.

%%writefile --append ./source_files/transpile_remote.py

@distribute_task(target={
"cpu": 16,
"mem": 2 * 1024 * 1024 * 1024
})
def transpile_remote(circuit, optimization_level, backend):
return None
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It checks the distributed program works.
test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully

Správa dat napříč programem

Qiskit Serverless ti umožňuje spravovat soubory v adresáři /data napříč všemi svými programy. To zahrnuje několik omezení:

  • V současnosti jsou podporovány pouze soubory tar a h5
  • Jedná se pouze o ploché úložiště /data, nelze vytvářet podadresáře /data/folder/

Níže je ukázáno, jak nahrávat soubory. Ujisti se, že ses ověřil/a v Qiskit Serverless pomocí svého IBM Quantum účtu (pokyny viz Nasazení na IBM Quantum Platform).

import tarfile
from qiskit_serverless import IBMServerlessClient

# Create a tar
filename = "transpile_demo.tar"
file = tarfile.open(filename, "w")
file.add("./source_files/transpile_remote.py")
file.close()

# Get a reference to a QiskitFunction
serverless = IBMServerlessClient()
transpile_remote_demo = next(
program
for program in serverless.list()
if program.title == "transpile_remote_serverless"
)

# Upload the tar to Serverless data directory
serverless.file_upload(file=filename, function=transpile_remote_demo)
'{"message":"/usr/src/app/media/5e1f442128cdf60018496a04/transpile_demo.tar"}'

Dále můžeš zobrazit seznam všech souborů ve svém adresáři data. Tato data jsou přístupná všem programům.

serverless.files(function=transpile_remote_demo)
['classifier_name.pkl.tar', 'output.json.tar', 'transpile_demo.tar']

Z programu to lze provést pomocí file_download() pro stažení souboru do prostředí programu a rozbalení archivu tar.

%%writefile ./source_files/extract_tarfile.py

import tarfile
from qiskit_serverless import IBMServerlessClient

serverless = IBMServerlessClient(token="<YOUR_API_KEY>") # Use the 44-character API_KEY you created and saved from the IBM Quantum Platform Home dashboard
files = serverless.files()
demo_file = files[0]
downloaded_tar = serverless.file_download(demo_file)

with tarfile.open(downloaded_tar, 'r') as tar:
tar.extractall()

V tuto chvíli může tvůj program pracovat se soubory stejně jako v lokálním experimentu. Funkce file_upload(), file_download() a file_delete() lze volat z lokálního experimentu nebo z nahraného programu pro konzistentní a flexibilní správu dat.

Další kroky

Doporučení