1. Scripts - best practices
Overview
When developing scripts for CoreAuto workflows and event handling, it is critical to ensure they are idempotent and atomic. This means that:
- Idempotency: Repeated execution of the script with the same payload will not cause unintended side effects or corruption of the system state.
- Atomicity: Each script should execute as a single unit of work, ensuring either complete success or no change at all in case of failure.
Idempotency
Ensure that executing a script multiple times with the same payload does not cause unintended side effects. This is vital in both batch and real-time scenarios to prevent data corruption or workflow inconsistencies.
Implementation Tips
- State Verification: Check the current state of the workflow or step before making updates.
- Unique Identifiers: Use unique identifiers like
ACTIONID
to avoid processing the same action multiple times. - Safe Updates: Use conditional updates to ensure that actions are applied only when necessary.
Atomicity
Scripts should execute as a single, complete operation. If a failure occurs, the system should be left in its original state without partial updates.
Implementation Tips
- Database Transactions: Wrap critical database operations in transactions to ensure atomicity.
- Exception Handling: Capture and handle exceptions to roll back changes on failure.
- Resource Cleanup: Remove temporary resources (e.g., files, threads) on script completion or error.
These principles ensure that CoreAuto workflows are reliable, consistent, and fault-tolerant.
Batch Job Scripts
Batch job scripts process large datasets or perform scheduled tasks.
- Examples:
- Nightly database cleanups.
- Generating reports or aggregating data.
Best Practices:
- Validate payloads before execution.
- Use CaWBSbatch to retrieve shared credentials for connecting to external systems like databases or APIs.
- Log progress and errors for post-execution analysis.
Batch Script Example: PostgreSQL ETL
The following script showcases a PostgreSQL ETL process for a batch job. It uses the cawbsbatch.py
library to authenticate with CoreAuto, retrieve necessary credentials, and perform ETL operations on a PostgreSQL database.
Code Example
import psycopg2
import json
import os
import cawbsbatch
# Initialize the CaWBS library
response = cawbsbatch.Init()
if response['status_code'] >= 400:
print("Initialization failed:", response)
exit(1)
# Retrieve credentials from the keystore
response = cawbsbatch.GetKeystore("db_host,db_name,db_user,db_password")
if response['status_code'] >= 400:
print("Failed to retrieve credentials:", response)
exit(1)
credentials = response['answer']
db_host = credentials['db_host']
db_name = credentials['db_name']
db_user = credentials['db_user']
db_password = credentials['db_password']
# Connect to the PostgreSQL database
try:
connection = psycopg2.connect(
host=db_host,
database=db_name,
user=db_user,
password=db_password
)
cursor = connection.cursor()
print("Connected to the database")
# Extract data from the source table
extract_query = "SELECT * FROM source_table"
cursor.execute(extract_query)
source_data = cursor.fetchall()
print(f"Extracted {len(source_data)} records from source_table")
# Transform data (example transformation: convert all text to uppercase)
transformed_data = []
for record in source_data:
transformed_record = [str(item).upper() if isinstance(item, str) else item for item in record]
transformed_data.append(transformed_record)
# Load transformed data into the target table
load_query = """
INSERT INTO target_table (column1, column2, column3)
VALUES (%s, %s, %s)
ON CONFLICT (column1) DO UPDATE
SET column2 = EXCLUDED.column2,
column3 = EXCLUDED.column3
"""
for record in transformed_data:
cursor.execute(load_query, record)
# Commit the transaction
connection.commit()
print("ETL operation completed successfully")
except Exception as e:
# Rollback transaction in case of failure
if connection:
connection.rollback()
print("Error during ETL operation:", e)
finally:
# Close the database connection
if cursor:
cursor.close()
if connection:
connection.close()
print("Database connection closed")
Explanation of the Code
1. Authentication
The script starts by authenticating with CoreAuto using the Init()
function from the cawbsbatch.py
library.
2. Credential Retrieval
It retrieves database credentials from the CoreAuto keystore using GetKeystore()
. This ensures that sensitive information like database passwords is securely managed.
3. ETL Workflow
- Extract: Fetch data from a source table.
- Transform: Apply necessary transformations to the data (e.g., format conversion, validation).
- Load: Insert transformed data into the target table. The script uses
ON CONFLICT
to handle duplicate records, ensuring idempotency.
4. Transaction Management
The script wraps database operations in a transaction. If an error occurs, the transaction is rolled back to maintain atomicity.
5. Resource Cleanup
Connections and cursors are closed at the end of the script, even in the event of an error.
Real-Time Event Scripts
Real-time scripts handle immediate responses to system events.
- Examples:
- Processing an uploaded image.
- Reacting to a sensor-triggered event.
Best Practices:
- Minimize latency by using efficient algorithms.
- Use CaWBS to retrieve event payloads (
GetEventPayload
) and save results (PutStepPayload
). - Ensure atomic operations to maintain system integrity during failures.
Example: AI Image Tagging
The following Python script illustrates idempotency and atomicity using AWS S3, OpenAI, and PostgreSQL:
Key Concepts
- Payload Validation: Ensure all required fields are present in the payload.
- Error Logging: Capture errors in a designated database table (
tags_error
) to trace failures. - Transactional Operations: Use database transactions to ensure atomic updates.
- Cleanup: Remove temporary files and rollback resources (e.g., OpenAI threads, files) on failure.
Code Example
import os
import time
import boto3
import json
import io
from PIL import Image
from psd_tools import PSDImage # For handling PSD files
import cawbs
import pg_util
from openai import OpenAI
# Initialize and validate the environment
result_txt = ''
print("Step started")
# Initialize CaWBS
response = cawbs.Init()
if response['status_code'] >= 400:
print("Initialization failed:", response)
exit(1)
# Get Event Payload
response = cawbs.GetEventPayload()
if response['status_code'] >= 400:
print("Failed to fetch event payload:", response)
exit(1)
payload = response['payload']
# Validate Payload
if 'tenant_id' not in payload or 'asset_id' not in payload:
print("Invalid payload format")
exit(1)
# Fetch Keystore Credentials
response = cawbs.GetKeystore("sm_maindb,sm_encrypt,main_dbname,OPENAI_API_KEY,OPENAI_API_ASSISTANT_ID")
if response['status_code'] >= 400:
print("Keystore fetch error:", response)
exit(1)
credentials = response['answer']
# Initialize Database
if pg_util.init(credentials['main_dbname'], credentials['sm_maindb'], credentials['sm_encrypt'],
payload['tenant_id'], payload['asset_id']) > 0:
print("Database initialization failed")
exit(1)
# Process Image
file_ext = os.path.splitext(pg_util.s3_file_name)[1].lower()
allowed_formats = {'.jpg', '.jpeg', '.png', '.gif', '.webp', '.psd'}
if file_ext not in allowed_formats:
print(f"Unsupported file extension: {file_ext}")
pg_util.insert_tags_error(payload, 8, f"Unsupported file extension: {file_ext}")
exit(1)
# Download image from S3
s3 = boto3.client('s3')
try:
s3_object = s3.get_object(Bucket=pg_util.s3_bucket_name, Key=pg_util.s3_file_key)
image_data = s3_object['Body'].read()
except Exception as e:
print(f"S3 download error: {e}")
pg_util.insert_tags_error(payload, 8, "S3 file not found")
exit(1)
# Process PSD or PIL Image
if file_ext == '.psd':
psd = PSDImage.open(io.BytesIO(image_data))
image = psd.compose()
else:
image = Image.open(io.BytesIO(image_data))
# Resize or Convert if necessary
if image.size[0] > 512 or image.size[1] > 512:
image = image.resize((512, 512), Image.Resampling.LANCZOS)
# Save Image Temporarily
tmp_dir = os.path.expanduser("~/temp")
os.makedirs(tmp_dir, exist_ok=True)
tmp_file = os.path.join(tmp_dir, f"tmp_{os.environ.get('ACTIONID')}.jpg")
image.save(tmp_file, format="JPEG")
# Send Image to OpenAI
client = OpenAI(api_key=credentials['OPENAI_API_KEY'])
try:
# Upload file and process
file_response = client.files.create(file=open(tmp_file, "rb"), purpose="vision")
assistant_response = client.chat.create(messages=[{"role": "user", "content": "Analyze this image"}])
# Store results in database
pg_util.insert_tags(payload, assistant_response)
except Exception as e:
print(f"Error with OpenAI API: {e}")
pg_util.insert_tags_error(payload, 8, str(e))
exit(1)
finally:
# Cleanup resources
if os.path.exists(tmp_file):
os.remove(tmp_file)
Explanation of the Code
This script demonstrates how to process an image, analyze it using the OpenAI API, and store the results in a PostgreSQL database. The following steps outline its key operations:
1. Authentication
The script authenticates with CoreAuto using the Init()
function from the CaWBS library. This ensures that the script has valid API tokens and is authorized to interact with CoreAuto's services. If the authentication fails, the script exits with an error message.
2. Payload Retrieval
Using the GetEventPayload()
function, the script retrieves the event payload, which contains metadata about the file to be processed (e.g., tenant_id
, asset_id
). This step is crucial for ensuring the script operates on the correct data.
3. Credential Retrieval
The script fetches sensitive credentials (e.g., database and OpenAI API keys) securely from the CoreAuto keystore using the GetKeystore()
function. This eliminates the need for hardcoding credentials, enhancing security.
4. Database Initialization
The script initializes a connection to the PostgreSQL database using the credentials retrieved earlier. It also validates that the tenant_id
and asset_id
from the payload match the database structure. If the connection or validation fails, the script logs an error and exits.
5. File Validation
The script determines the file extension of the image from its S3 file name and validates that it belongs to an allowed set of formats (.jpg
, .png
, .psd
, etc.). Unsupported file types are logged as errors, and the script exits.
6. Image Processing
- Download: The image is downloaded from an S3 bucket using the AWS SDK (
boto3
). - Format Handling:
- For
.psd
files, the script uses thepsd_tools
library to extract and compose the image. - For other formats, the
Pillow
library (PIL.Image
) is used.
- For
- Resizing: If the image dimensions exceed 512 pixels in any direction, it is resized to fit within 512x512 pixels while maintaining aspect ratio.
7. Temporary File Management
The resized image is saved temporarily to the local filesystem. The file path is determined dynamically using the ACTIONID
environment variable. This ensures that temporary files are uniquely named and do not conflict with others.
8. Integration with OpenAI
- The script initializes the OpenAI client using the API key retrieved from the keystore.
- The resized image is uploaded to OpenAI for analysis (e.g., tagging or classification).
- The results of the analysis are stored as a JSON object.
9. Database Interaction
The analyzed results are inserted into a PostgreSQL database using a helper function (pg_util.insert_tags
). The script ensures idempotency by using database mechanisms like ON CONFLICT
to handle duplicate records or update existing entries.
10. Error Handling
- Any errors during image processing, API interaction, or database operations are logged using
pg_util.insert_tags_error
. - The script gracefully exits if critical errors occur, ensuring no partial operations are left unhandled.
11. Resource Cleanup
- Temporary files created during the process are deleted to free up disk space.
- Database connections and other resources are properly closed, even in the event of an error, ensuring no resource leaks.
Multi-Step Execution Across Multiple Hosts
In CoreAuto, each step can be executed on different hosts enabling complex workflows to be distributed across multiple hosts. Each script is responsible for a specific task within a step, with data passed between steps using CoreAuto's API. This document demonstrates a multi-step workflow involving three steps (scripts) executed on different hosts.
Workflow Description
- Step 1 (host1): Execute
extract.py
to extract data from an API. - Step 2 (host2): Execute
transform.py
to process and transform the data. - Step 3 (host3): Execute
load.py
to store the transformed data in a database.
Each script handles a single, atomic operation, ensuring modularity and reusability.
Script 1: extract.py
(Executed on Host1)
This script extracts data from an external API and passes it to Step 2.
import cawbs
import requests
import sys
# Initialize CaWBS
response = cawbs.Init()
if response['status_code'] >= 400:
print("Initialization failed:", response)
sys.exit(1)
# Extract data from API
def extract_data():
print("Executing Step 1 on host1 (extract.py)...")
# Example API endpoint
api_url = "https://api.example.com/data"
try:
response = requests.get(api_url)
response.raise_for_status()
data = response.json()
except requests.RequestException as e:
print(f"Failed to fetch data from API: {e}")
cawbs.PutStepPayload({"error": "Failed to fetch data"})
sys.exit(1)
# Pass extracted data to the next step
cawbs.PutStepPayload({"data": data})
print("Data extracted and passed to Step 2.")
# Execute extraction
extract_data()
Script 2: transform.py
(Executed on Host2)
This script retrieves data from Step 1, processes it, and passes the transformed data to Step 3.
import cawbs
import sys
# Initialize CaWBS
response = cawbs.Init()
if response['status_code'] >= 400:
print("Initialization failed:", response)
sys.exit(1)
# Transform data
def transform_data():
print("Executing Step 2 on host2 (transform.py)...")
# Fetch data from Step 1
response = cawbs.GetStepPayload("step1")
if response['status_code'] >= 400:
print("Failed to fetch payload from Step 1:", response)
sys.exit(1)
data = response['payload']['data']
# Transform the data (example: convert strings to uppercase)
transformed_data = {key: str(value).upper() for key, value in data.items()}
# Pass transformed data to the next step
cawbs.PutStepPayload({"transformed_data": transformed_data})
print("Data transformed and passed to Step 3.")
# Execute transformation
transform_data()
Script 3: load.py
(Executed on Host3)
This script retrieves transformed data from Step 2 and stores it in a database.
import cawbs
import psycopg2
import sys
# Initialize CaWBS
response = cawbs.Init()
if response['status_code'] >= 400:
print("Initialization failed:", response)
sys.exit(1)
# Load data into the database
def load_data():
print("Executing Step 3 on host3 (load.py)...")
# Fetch transformed data from Step 2
response = cawbs.GetStepPayload("step2")
if response['status_code'] >= 400:
print("Failed to fetch payload from Step 2:", response)
sys.exit(1)
transformed_data = response['payload']['transformed_data']
# Database connection details (retrieved from CoreAuto keystore)
db_response = cawbs.GetKeystore("db_host,db_name,db_user,db_password")
if db_response['status_code'] >= 400:
print("Failed to retrieve database credentials:", db_response)
sys.exit(1)
db_credentials = db_response['answer']
try:
# Connect to PostgreSQL
connection = psycopg2.connect(
host=db_credentials['db_host'],
database=db_credentials['db_name'],
user=db_credentials['db_user'],
password=db_credentials['db_password']
)
cursor = connection.cursor()
# Insert transformed data into the database
for key, value in transformed_data.items():
cursor.execute(
"INSERT INTO target_table (key, value) VALUES (%s, %s) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value",
(key, value)
)
connection.commit()
print("Data successfully stored in the database.")
except Exception as e:
print(f"Database error: {e}")
sys.exit(1)
finally:
if cursor:
cursor.close()
if connection:
connection.close()
# Execute loading
load_data()
Step Execution on Different Hosts
- Host1 (
extract.py
):- Extracts data from an external API.
- Passes the data to Step 2 using
cawbs.PutStepPayload
.
- Host2 (
transform.py
):- Retrieves data from Step 1 using
cawbs.GetStepPayload("step1")
. - Processes the data (e.g., transformation).
- Passes the transformed data to Step 3 using
cawbs.PutStepPayload
.
- Retrieves data from Step 1 using
- Host3 (
load.py
):- Retrieves transformed data from Step 2 using
cawbs.GetStepPayload("step2")
. - Stores the data in a database.
- Retrieves transformed data from Step 2 using
Key Considerations
- Environment Variables
- Each host must define the
STEPNAME
environment variable (step1
,step2
, orstep3
) to determine the corresponding script execution.
- Each host must define the
- Payload Passing
- Use
PutStepPayload
to pass data between steps. - Retrieve payloads with
GetStepPayload
in the next step.
- Use
- Error Handling
- Each script should handle errors gracefully and ensure appropriate logging.
- Scalability
- Steps can scale independently by deploying them on different hosts with sufficient resources.
- Security
- Use CoreAuto's
GetKeystore
to securely retrieve credentials and avoid hardcoding sensitive data.
- Use CoreAuto's
Version
Library Version: 1.0.0
Last Updated: January 22, 2025