In this blog, we will demonstrate how to use the AWS Bedrock Batch API to run inference jobs at scale. By following this process, you can achieve a 50% cost savings with guaranteed results within 24 hours. We’ll walk through setting up the necessary AWS resources, creating inference prompts, and processing the results of batch jobs.
This cell imports the necessary libraries for AWS interactions, data manipulation, and time handling.
import boto3 import pandas as pd import json import time
This cell sets up the AWS session and clients for S3 and Bedrock, and loads the list of famous people from a CSV file.
# Replace with your AWS region aws_region = 'us-east-1' session = boto3.Session(profile_name='<AWS_PROFILE>') # Initialize AWS clients s3_client = session.client('s3', region_name=aws_region) bedrock = session.client('bedrock', region_name=aws_region) # Load the list of 100 famous people from a CSV file df = pd.read_csv('famous_people.csv') print("Loaded DataFrame:") print(df.head(10))
Output:
Loaded DataFrame: id prompt 0 1 Fela Kuti 1 2 Marie Curie 2 3 Albert Einstein 3 4 Nelson Mandela 4 5 Mahatma Gandhi 5 6 Frida Kahlo 6 7 Winston Churchill 7 8 Che Guevara 8 9 Bruce Lee 9 10 Serena Williams
This cell creates a list of prompts to identify the place of birth for each person in the DataFrame.
# Create an array of prompts to identify each person's place of birth prompts = [] for index, row in df.iterrows(): person_name = row['prompt'] prompt = { "recordId": f"{index}", "modelInput": { "anthropic_version": "bedrock-2023-05-31", "max_tokens": 1024, "messages": [ { "role": "user", "content": [ { "type": "text", "text": f"Identify the place of birth for {person_name}. Format the result like <city/town>,. Do not include any other information." } ] } ] } } prompts.append(json.dumps(prompt))
This cell prints the first prompt to verify its structure.
print(json.dumps(json.loads(prompts[0]), indent=4))
Output:
{ "recordId": "0", "modelInput": { "anthropic_version": "bedrock-2023-05-31", "max_tokens": 1024, "messages": [ { "role": "user", "content": [ { "type": "text", "text": "Identify the place of birth for Fela Kuti. Format the result like <city/town>,. Do not include any other information." } ] } ] } }
This cell saves the prompts to a JSON Lines file and specifies the S3 bucket and key for uploading.
file_name = f"batch_prompts.jsonl" with open(file_name, 'w') as file: for prompt in prompts: file.write(f"{prompt}\n") # Upload the JSON Lines file to Amazon S3 bucket_name = '<S3_BUCKET>' s3_input_key = f"test_job/{file_name}"
This cell uploads the prompts file to the specified S3 bucket.
try: s3_client.upload_file(file_name, bucket_name, s3_input_key) print(f"Uploaded {file_name} to s3://{bucket_name}/{s3_input_key}") except Exception as e: print(f"Error uploading file to S3: {e}") exit(1)
Output:
Uploaded batch_prompts.jsonl to s3://<S3_BUCKET>/test_job/batch_prompts.jsonl
This cell defines the input and output S3 URIs and prepares the data configuration for the batch job.
# Define the input and output locations in S3 input_s3_uri = f"s3://{bucket_name}/{s3_input_key}" output_s3_uri = f"s3://{bucket_name}/batch_output/" # Prepare inputDataConfig and outputDataConfig inputDataConfig = { "s3InputDataConfig": { "s3Uri": input_s3_uri } } outputDataConfig = { "s3OutputDataConfig": { "s3Uri": output_s3_uri } }
This cell starts the batch inference job using the Bedrock client.
# Start the batch inference job using create_model_invocation_job # Replace roleArn with your actual IAM role ARN try: response = bedrock.create_model_invocation_job( roleArn="arn:aws:iam::<ACCOUNT_ID>:role/service-role/test-inference-role", modelId="anthropic.claude-3-5-sonnet-20240620-v1:0", jobName=f"my-batch-job-{int(time.time())}", inputDataConfig=inputDataConfig, outputDataConfig=outputDataConfig ) jobArn = response.get('jobArn') print(f"Started batch inference job with ARN: {jobArn}") except Exception as e: print(f"Error starting inference job: {e}") exit(1)
Output:
Started batch inference job with ARN: arn:aws:bedrock:us-east-1:432399220289:model-invocation-job/d04yfj9tl2fj
This cell monitors the batch inference job until it completes successfully.
# Monitor the batch job until completion print("Monitoring batch job...") while True: job_status_response = bedrock.get_model_invocation_job(jobIdentifier=jobArn) status = job_status_response['status'] if status in ['InProgress', 'Initializing', 'Submitted', 'Validating']: print(f"Job {jobArn} is {status}. Waiting for completion...") time.sleep(30) elif status == 'Completed': print(f"Job {jobArn} completed successfully.") break elif status == 'Failed': print(f"Job {jobArn} failed.") raise RuntimeError("Job failed.") else: print(f"Job {jobArn} has unexpected status: {status}") time.sleep(30)
Output:
Monitoring batch job... Job arn:aws:bedrock:us-east-1:<ACCOUNT_ID>:model-invocation-job/d04yfj9tl2fj is Submitted. Waiting for completion... Job arn:aws:bedrock:us-east-1:<ACCOUNT_ID>:model-invocation-job/d04yfj9tl2fj is Validating. Waiting for completion... ... Job arn:aws:bedrock:us-east-1:<ACCOUNT_ID>:model-invocation-job/d04yfj9tl2fj completed successfully.
This cell downloads the results from S3 and saves them locally.
# Download and process the results output_file = f'batch_output{jobArn.split("/")[ -1 ]}.jsonl' output_s3_key = f"batch_output/{jobArn.split('/')[ -1 ]}/batch_prompts.jsonl.out" print(output_s3_key) try: s3_client.download_file(bucket_name, output_s3_key, output_file) print(f"Downloaded output to {output_file}") except Exception as e: print(f"Error downloading output file from S3: {e}") exit(1)
Output:
batch_output/d04yfj9tl2fj/batch_prompts.jsonl.out Downloaded output to batch_outputd04yfj9tl2fj.jsonl
This cell reads the results file and extracts the birthplaces from the model outputs.
# Read and process the results birthplaces = [] with open(output_file, 'r') as file: for line in file: # print(line) response = json.loads(line) output_text = response.get('modelOutput', {}).get('content', [])[0].get('text', '') birthplaces.append(output_text)
This cell adds the extracted birthplaces to the DataFrame as a new column.
df['Place of Birth'] = birthplaces
This cell displays the updated DataFrame with the new ‘Place of Birth’ column.
df.head(10)
Output:
id | prompt | Place of Birth | |
---|---|---|---|
0 | 1 | Fela Kuti | Abeokuta,Nigeria |
1 | 2 | Marie Curie | Warsaw,Poland |
2 | 3 | Albert Einstein | Ulm,Germany |
3 | 4 | Nelson Mandela | Mvezo,South Africa |
4 | 5 | Mahatma Gandhi | Porbandar,India |
5 | 6 | Frida Kahlo | Coyoacán,Mexico |
6 | 7 | Winston Churchill | Woodstock,England |
7 | 8 | Che Guevara | Rosario,Argentina |
8 | 9 | Bruce Lee | San Francisco,United States |
9 | 10 | Serena Williams | Saginaw, United States |
This cell saves the updated DataFrame to a CSV file.
df.to_csv('famous_people_with_birthplaces.csv', index=False)
AWS Bedrock Batch API IAM Policy
This cell contains a sample IAM policy used to enable access to S3 and KMS for an AWS Bedrock batch job.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "1", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:PutObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::<S3_BUCKET>", "arn:aws:s3:::<S3_BUCKET>/*", "arn:aws:s3:::<S3_BUCKET>", "arn:aws:s3:::<S3_BUCKET>/*" ], "Condition": { "StringEquals": { "s3:ResourceAccount": "<ACCOUNT ID>" } } } ] }
The IAM role used for running the AWS Bedrock batch job is defined as follows:
roleArn="arn:aws:iam::<ACCOUNT_ID>:role/service-role/test-inference-role"