1. Scripts - best practices

Overview

When developing scripts for CoreAuto workflows and event handling, it is critical to ensure they are idempotent and atomic. This means that:

  • Idempotency: Repeated execution of the script with the same payload will not cause unintended side effects or corruption of the system state.
  • Atomicity: Each script should execute as a single unit of work, ensuring either complete success or no change at all in case of failure.

Idempotency

Ensure that executing a script multiple times with the same payload does not cause unintended side effects. This is vital in both batch and real-time scenarios to prevent data corruption or workflow inconsistencies.

Implementation Tips

  • State Verification: Check the current state of the workflow or step before making updates.
  • Unique Identifiers: Use unique identifiers like ACTIONID to avoid processing the same action multiple times.
  • Safe Updates: Use conditional updates to ensure that actions are applied only when necessary.

Atomicity

Scripts should execute as a single, complete operation. If a failure occurs, the system should be left in its original state without partial updates.

Implementation Tips

  • Database Transactions: Wrap critical database operations in transactions to ensure atomicity.
  • Exception Handling: Capture and handle exceptions to roll back changes on failure.
  • Resource Cleanup: Remove temporary resources (e.g., files, threads) on script completion or error.

These principles ensure that CoreAuto workflows are reliable, consistent, and fault-tolerant.

Batch Job Scripts

Batch job scripts process large datasets or perform scheduled tasks.

  • Examples:
    • Nightly database cleanups.
    • Generating reports or aggregating data.

Best Practices:

  • Validate payloads before execution.
  • Use CaWBSbatch to retrieve shared credentials for connecting to external systems like databases or APIs.
  • Log progress and errors for post-execution analysis.

Batch Script Example: PostgreSQL ETL

The following script showcases a PostgreSQL ETL process for a batch job. It uses the cawbsbatch.py library to authenticate with CoreAuto, retrieve necessary credentials, and perform ETL operations on a PostgreSQL database.

Code Example

import psycopg2
import json
import os
import cawbsbatch

# Initialize the CaWBS library
response = cawbsbatch.Init()
if response['status_code'] >= 400:
    print("Initialization failed:", response)
    exit(1)

# Retrieve credentials from the keystore
response = cawbsbatch.GetKeystore("db_host,db_name,db_user,db_password")
if response['status_code'] >= 400:
    print("Failed to retrieve credentials:", response)
    exit(1)

credentials = response['answer']
db_host = credentials['db_host']
db_name = credentials['db_name']
db_user = credentials['db_user']
db_password = credentials['db_password']

# Connect to the PostgreSQL database
try:
    connection = psycopg2.connect(
        host=db_host,
        database=db_name,
        user=db_user,
        password=db_password
    )
    cursor = connection.cursor()
    print("Connected to the database")

    # Extract data from the source table
    extract_query = "SELECT * FROM source_table"
    cursor.execute(extract_query)
    source_data = cursor.fetchall()

    print(f"Extracted {len(source_data)} records from source_table")

    # Transform data (example transformation: convert all text to uppercase)
    transformed_data = []
    for record in source_data:
        transformed_record = [str(item).upper() if isinstance(item, str) else item for item in record]
        transformed_data.append(transformed_record)

    # Load transformed data into the target table
    load_query = """
        INSERT INTO target_table (column1, column2, column3)
        VALUES (%s, %s, %s)
        ON CONFLICT (column1) DO UPDATE
        SET column2 = EXCLUDED.column2,
            column3 = EXCLUDED.column3
    """
    for record in transformed_data:
        cursor.execute(load_query, record)

    # Commit the transaction
    connection.commit()
    print("ETL operation completed successfully")

except Exception as e:
    # Rollback transaction in case of failure
    if connection:
        connection.rollback()
    print("Error during ETL operation:", e)

finally:
    # Close the database connection
    if cursor:
        cursor.close()
    if connection:
        connection.close()
    print("Database connection closed")

Explanation of the Code

1. Authentication

The script starts by authenticating with CoreAuto using the Init() function from the cawbsbatch.py library.

2. Credential Retrieval

It retrieves database credentials from the CoreAuto keystore using GetKeystore(). This ensures that sensitive information like database passwords is securely managed.

3. ETL Workflow

  • Extract: Fetch data from a source table.
  • Transform: Apply necessary transformations to the data (e.g., format conversion, validation).
  • Load: Insert transformed data into the target table. The script uses ON CONFLICT to handle duplicate records, ensuring idempotency.

4. Transaction Management

The script wraps database operations in a transaction. If an error occurs, the transaction is rolled back to maintain atomicity.

5. Resource Cleanup

Connections and cursors are closed at the end of the script, even in the event of an error.

Real-Time Event Scripts

Real-time scripts handle immediate responses to system events.

  • Examples:
    • Processing an uploaded image.
    • Reacting to a sensor-triggered event.

Best Practices:

  • Minimize latency by using efficient algorithms.
  • Use CaWBS to retrieve event payloads (GetEventPayload) and save results (PutStepPayload).
  • Ensure atomic operations to maintain system integrity during failures.

Example: AI Image Tagging

The following Python script illustrates idempotency and atomicity using AWS S3, OpenAI, and PostgreSQL:

Key Concepts

  • Payload Validation: Ensure all required fields are present in the payload.
  • Error Logging: Capture errors in a designated database table (tags_error) to trace failures.
  • Transactional Operations: Use database transactions to ensure atomic updates.
  • Cleanup: Remove temporary files and rollback resources (e.g., OpenAI threads, files) on failure.

Code Example

import os
import time
import boto3
import json
import io
from PIL import Image
from psd_tools import PSDImage  # For handling PSD files
import cawbs
import pg_util
from openai import OpenAI

# Initialize and validate the environment
result_txt = ''
print("Step started")

# Initialize CaWBS
response = cawbs.Init()
if response['status_code'] >= 400:
    print("Initialization failed:", response)
    exit(1)

# Get Event Payload
response = cawbs.GetEventPayload()
if response['status_code'] >= 400:
    print("Failed to fetch event payload:", response)
    exit(1)
payload = response['payload']

# Validate Payload
if 'tenant_id' not in payload or 'asset_id' not in payload:
    print("Invalid payload format")
    exit(1)

# Fetch Keystore Credentials
response = cawbs.GetKeystore("sm_maindb,sm_encrypt,main_dbname,OPENAI_API_KEY,OPENAI_API_ASSISTANT_ID")
if response['status_code'] >= 400:
    print("Keystore fetch error:", response)
    exit(1)
credentials = response['answer']

# Initialize Database
if pg_util.init(credentials['main_dbname'], credentials['sm_maindb'], credentials['sm_encrypt'],
                payload['tenant_id'], payload['asset_id']) > 0:
    print("Database initialization failed")
    exit(1)

# Process Image
file_ext = os.path.splitext(pg_util.s3_file_name)[1].lower()
allowed_formats = {'.jpg', '.jpeg', '.png', '.gif', '.webp', '.psd'}

if file_ext not in allowed_formats:
    print(f"Unsupported file extension: {file_ext}")
    pg_util.insert_tags_error(payload, 8, f"Unsupported file extension: {file_ext}")
    exit(1)

# Download image from S3
s3 = boto3.client('s3')
try:
    s3_object = s3.get_object(Bucket=pg_util.s3_bucket_name, Key=pg_util.s3_file_key)
    image_data = s3_object['Body'].read()
except Exception as e:
    print(f"S3 download error: {e}")
    pg_util.insert_tags_error(payload, 8, "S3 file not found")
    exit(1)

# Process PSD or PIL Image
if file_ext == '.psd':
    psd = PSDImage.open(io.BytesIO(image_data))
    image = psd.compose()
else:
    image = Image.open(io.BytesIO(image_data))

# Resize or Convert if necessary
if image.size[0] > 512 or image.size[1] > 512:
    image = image.resize((512, 512), Image.Resampling.LANCZOS)

# Save Image Temporarily
tmp_dir = os.path.expanduser("~/temp")
os.makedirs(tmp_dir, exist_ok=True)
tmp_file = os.path.join(tmp_dir, f"tmp_{os.environ.get('ACTIONID')}.jpg")
image.save(tmp_file, format="JPEG")

# Send Image to OpenAI
client = OpenAI(api_key=credentials['OPENAI_API_KEY'])
try:
    # Upload file and process
    file_response = client.files.create(file=open(tmp_file, "rb"), purpose="vision")
    assistant_response = client.chat.create(messages=[{"role": "user", "content": "Analyze this image"}])

    # Store results in database
    pg_util.insert_tags(payload, assistant_response)

except Exception as e:
    print(f"Error with OpenAI API: {e}")
    pg_util.insert_tags_error(payload, 8, str(e))
    exit(1)

finally:
    # Cleanup resources
    if os.path.exists(tmp_file):
        os.remove(tmp_file)

Explanation of the Code

This script demonstrates how to process an image, analyze it using the OpenAI API, and store the results in a PostgreSQL database. The following steps outline its key operations:

1. Authentication

The script authenticates with CoreAuto using the Init() function from the CaWBS library. This ensures that the script has valid API tokens and is authorized to interact with CoreAuto's services. If the authentication fails, the script exits with an error message.

2. Payload Retrieval

Using the GetEventPayload() function, the script retrieves the event payload, which contains metadata about the file to be processed (e.g., tenant_id, asset_id). This step is crucial for ensuring the script operates on the correct data.

3. Credential Retrieval

The script fetches sensitive credentials (e.g., database and OpenAI API keys) securely from the CoreAuto keystore using the GetKeystore() function. This eliminates the need for hardcoding credentials, enhancing security.

4. Database Initialization

The script initializes a connection to the PostgreSQL database using the credentials retrieved earlier. It also validates that the tenant_id and asset_id from the payload match the database structure. If the connection or validation fails, the script logs an error and exits.

5. File Validation

The script determines the file extension of the image from its S3 file name and validates that it belongs to an allowed set of formats (.jpg, .png, .psd, etc.). Unsupported file types are logged as errors, and the script exits.

6. Image Processing

  • Download: The image is downloaded from an S3 bucket using the AWS SDK (boto3).
  • Format Handling:
    • For .psd files, the script uses the psd_tools library to extract and compose the image.
    • For other formats, the Pillow library (PIL.Image) is used.
  • Resizing: If the image dimensions exceed 512 pixels in any direction, it is resized to fit within 512x512 pixels while maintaining aspect ratio.

7. Temporary File Management

The resized image is saved temporarily to the local filesystem. The file path is determined dynamically using the ACTIONID environment variable. This ensures that temporary files are uniquely named and do not conflict with others.

8. Integration with OpenAI

  • The script initializes the OpenAI client using the API key retrieved from the keystore.
  • The resized image is uploaded to OpenAI for analysis (e.g., tagging or classification).
  • The results of the analysis are stored as a JSON object.

9. Database Interaction

The analyzed results are inserted into a PostgreSQL database using a helper function (pg_util.insert_tags). The script ensures idempotency by using database mechanisms like ON CONFLICT to handle duplicate records or update existing entries.

10. Error Handling

  • Any errors during image processing, API interaction, or database operations are logged using pg_util.insert_tags_error.
  • The script gracefully exits if critical errors occur, ensuring no partial operations are left unhandled.

11. Resource Cleanup

  • Temporary files created during the process are deleted to free up disk space.
  • Database connections and other resources are properly closed, even in the event of an error, ensuring no resource leaks.

Multi-Step Execution Across Multiple Hosts

In CoreAuto, each step can be executed on different hosts enabling complex workflows to be distributed across multiple hosts. Each script is responsible for a specific task within a step, with data passed between steps using CoreAuto's API. This document demonstrates a multi-step workflow involving three steps (scripts) executed on different hosts.

Workflow Description

  1. Step 1 (host1): Execute extract.py to extract data from an API.
  2. Step 2 (host2): Execute transform.py to process and transform the data.
  3. Step 3 (host3): Execute load.py to store the transformed data in a database.

Each script handles a single, atomic operation, ensuring modularity and reusability.

Script 1: extract.py (Executed on Host1)

This script extracts data from an external API and passes it to Step 2.

import cawbs
import requests
import sys

# Initialize CaWBS
response = cawbs.Init()
if response['status_code'] >= 400:
    print("Initialization failed:", response)
    sys.exit(1)

# Extract data from API
def extract_data():
    print("Executing Step 1 on host1 (extract.py)...")

    # Example API endpoint
    api_url = "https://api.example.com/data"
    try:
        response = requests.get(api_url)
        response.raise_for_status()
        data = response.json()
    except requests.RequestException as e:
        print(f"Failed to fetch data from API: {e}")
        cawbs.PutStepPayload({"error": "Failed to fetch data"})
        sys.exit(1)

    # Pass extracted data to the next step
    cawbs.PutStepPayload({"data": data})
    print("Data extracted and passed to Step 2.")

# Execute extraction
extract_data()


Script 2: transform.py (Executed on Host2)

This script retrieves data from Step 1, processes it, and passes the transformed data to Step 3.

import cawbs
import sys

# Initialize CaWBS
response = cawbs.Init()
if response['status_code'] >= 400:
    print("Initialization failed:", response)
    sys.exit(1)

# Transform data
def transform_data():
    print("Executing Step 2 on host2 (transform.py)...")

    # Fetch data from Step 1
    response = cawbs.GetStepPayload("step1")
    if response['status_code'] >= 400:
        print("Failed to fetch payload from Step 1:", response)
        sys.exit(1)
    data = response['payload']['data']

    # Transform the data (example: convert strings to uppercase)
    transformed_data = {key: str(value).upper() for key, value in data.items()}

    # Pass transformed data to the next step
    cawbs.PutStepPayload({"transformed_data": transformed_data})
    print("Data transformed and passed to Step 3.")

# Execute transformation
transform_data()


Script 3: load.py (Executed on Host3)

This script retrieves transformed data from Step 2 and stores it in a database.

import cawbs
import psycopg2
import sys

# Initialize CaWBS
response = cawbs.Init()
if response['status_code'] >= 400:
    print("Initialization failed:", response)
    sys.exit(1)

# Load data into the database
def load_data():
    print("Executing Step 3 on host3 (load.py)...")

    # Fetch transformed data from Step 2
    response = cawbs.GetStepPayload("step2")
    if response['status_code'] >= 400:
        print("Failed to fetch payload from Step 2:", response)
        sys.exit(1)
    transformed_data = response['payload']['transformed_data']

    # Database connection details (retrieved from CoreAuto keystore)
    db_response = cawbs.GetKeystore("db_host,db_name,db_user,db_password")
    if db_response['status_code'] >= 400:
        print("Failed to retrieve database credentials:", db_response)
        sys.exit(1)
    db_credentials = db_response['answer']

    try:
        # Connect to PostgreSQL
        connection = psycopg2.connect(
            host=db_credentials['db_host'],
            database=db_credentials['db_name'],
            user=db_credentials['db_user'],
            password=db_credentials['db_password']
        )
        cursor = connection.cursor()

        # Insert transformed data into the database
        for key, value in transformed_data.items():
            cursor.execute(
                "INSERT INTO target_table (key, value) VALUES (%s, %s) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value",
                (key, value)
            )
        connection.commit()
        print("Data successfully stored in the database.")

    except Exception as e:
        print(f"Database error: {e}")
        sys.exit(1)

    finally:
        if cursor:
            cursor.close()
        if connection:
            connection.close()

# Execute loading
load_data()


Step Execution on Different Hosts

  1. Host1 (extract.py):
    • Extracts data from an external API.
    • Passes the data to Step 2 using cawbs.PutStepPayload.
  2. Host2 (transform.py):
    • Retrieves data from Step 1 using cawbs.GetStepPayload("step1").
    • Processes the data (e.g., transformation).
    • Passes the transformed data to Step 3 using cawbs.PutStepPayload.
  3. Host3 (load.py):
    • Retrieves transformed data from Step 2 using cawbs.GetStepPayload("step2").
    • Stores the data in a database.

Key Considerations

  1. Environment Variables
    • Each host must define the STEPNAME environment variable (step1, step2, or step3) to determine the corresponding script execution.
  2. Payload Passing
    • Use PutStepPayload to pass data between steps.
    • Retrieve payloads with GetStepPayload in the next step.
  3. Error Handling
    • Each script should handle errors gracefully and ensure appropriate logging.
  4. Scalability
    • Steps can scale independently by deploying them on different hosts with sufficient resources.
  5. Security
    • Use CoreAuto's GetKeystore to securely retrieve credentials and avoid hardcoding sensitive data.

Version

Library Version: 1.0.0
Last Updated: January 22, 2025