logo_smallAxellero.io

Best Practices

Best practices for JavaScript and Python code execution in Axellero workflows.

Code Execution Best Practices

This guide covers essential best practices for writing robust, maintainable, and secure code in Axellero workflows.

Both JavaScript and Python nodes have full access to the Workflow Context, providing complete access to node data, variables, constants, and system information.

General Principles

1. Single Responsibility

Each code execution node should have one clear purpose:

✅ Good:

// Node purpose: Calculate order totals
var orders = ctx.vars.orders || [];
var processingUser = ctx.user.login;
var workspace = ctx.workspace_slug;
var total = 0;

for (var i = 0; i < orders.length; i++) {
  total += orders[i].amount || 0;
}

return { orderTotal: total };

❌ Bad:

// Doing too many things: validation, calculation, formatting, logging
var orders = ctx.vars.orders || [];
var total = 0;
var validOrders = [];
var errors = [];

// ... 50 lines of mixed logic

2. Defensive Programming

Always validate inputs and handle edge cases:

✅ Good:

var userData = ctx.vars.userData;
var currentUser = ctx.user;
var nodeResults = ctx.nodes.dataSource.outputs;

// Validate input exists
if (!userData) {
  return { error: "No user data provided" };
}

// Validate required fields
if (!userData.email || !userData.name) {
  return { 
    error: "Missing required fields",
    required: ["email", "name"]
  };
}

// Process safely
return {
  processedUser: {
    email: userData.email.toLowerCase().trim(),
    name: userData.name.trim()
  }
};

3. Clear Error Handling

Provide meaningful error messages and maintain workflow continuity:

✅ Good:

try:
    data = args.get('vars', {}).get('inputData')
    
    if not data:
        return {
            'success': False,
            'error': 'No input data found in workflow variables',
            'suggestion': 'Ensure previous nodes set inputData variable'
        }
    
    # Process data...
    result = process_data(data)
    
    return {
        'success': True,
        'result': result
    }
    
except ValueError as e:
    return {
        'success': False,
        'error': f'Data validation error: {str(e)}',
        'error_type': 'validation'
    }
except Exception as e:
    return {
        'success': False,
        'error': f'Unexpected error: {str(e)}',
        'error_type': 'system'
    }

JavaScript-Specific Best Practices

JavaScript Sandbox Compatibility

Always write ES5.1 compatible code:

✅ Good:

// ES5.1 compatible
var users = ctx.vars.users || [];
var processingUser = ctx.user.login;
var hasAdminRole = ctx.user.roles.indexOf('admin') !== -1;
var activeUsers = [];

for (var i = 0; i < users.length; i++) {
  var user = users[i];
  if (user.active === true) {
    activeUsers.push({
      id: user.id,
      name: user.name,
      status: 'active'
    });
  }
}

function isValidEmail(email) {
  return email && email.indexOf('@') !== -1 && email.indexOf('.') !== -1;
}

return { activeUsers: activeUsers };

❌ Bad:

// Modern JavaScript - will fail in sandbox
var users = ctx.vars.users || [];  // Use var, not const (ES5.1)
const activeUsers = users
  .filter(user => user.active)
  .map(user => ({
    id: user.id,
    name: user.name,
    status: 'active'
  }));

const isValidEmail = email => email?.includes('@') && email?.includes('.');

Context Variable Management

Use consistent patterns for context variables:

✅ Good:

// Read from context
var config = ctx.consts.CONFIG || ctx.vars.config || {};
var items = ctx.vars.items || [];
var systemInfo = {
  workspace: ctx.workspace_slug,
  user: ctx.user.login
};

// Process data
var processed = processItems(items, config);

// Update context with clear naming
// Note: Results are automatically stored in workflow context
// JavaScript nodes don't need to manually update ctx.vars

// Return summary
return {
  itemsProcessed: processed.items.length,
  errors: processed.errors.length,
  success: processed.errors.length === 0
};

function processItems(items, config) {
  // Implementation here
  return {
    items: [],
    stats: {},
    errors: []
  };
}

Memory Management

Be mindful of memory usage in JavaScript:

✅ Good:

var items = ctx.vars.largeDataset || [];
var processingContext = {
  user: ctx.user.login,
  workspace: ctx.workspace_id
};
var batchSize = 1000;
var processed = [];

// Process in batches
for (var i = 0; i < items.length; i += batchSize) {
  var batch = items.slice(i, i + batchSize);
  var batchResult = processBatch(batch);
  processed = processed.concat(batchResult);
  
  // Log progress for long operations
  if (i % (batchSize * 10) === 0) {
    console.log('Processed ' + i + ' items');
  }
}

function processBatch(batch) {
  var result = [];
  for (var i = 0; i < batch.length; i++) {
    if (batch[i] && batch[i].active) {
      result.push({ id: batch[i].id, processed: true });
    }
  }
  return result;
}

return { processedCount: processed.length };

Python-Specific Best Practices

Context Access Patterns

Use consistent patterns for accessing workflow data:

✅ Good:

def get_workflow_data(args, key, default=None):
    """Safely get data from workflow variables"""
    return args.get('vars', {}).get(key, default)

def validate_required_data(args, required_keys):
    """Validate required workflow data exists"""
    missing = []
    vars_data = args.get('vars', {})
    
    for key in required_keys:
        if key not in vars_data or vars_data[key] is None:
            missing.append(key)
    
    return len(missing) == 0, missing

# Usage
is_valid, missing = validate_required_data(args, ['userData', 'config'])
if not is_valid:
    result = {
        'error': f'Missing required data: {missing}',
        'success': False
    }
else:
    user_data = get_workflow_data(args, 'userData', {})
    config = get_workflow_data(args, 'config', {})
    
    # Process data...

Type Safety

Use type checking for robust data processing:

✅ Good:

def safe_convert_to_number(value, default=0):
    """Safely convert value to number"""
    try:
        if isinstance(value, (int, float)):
            return float(value)
        elif isinstance(value, str):
            return float(value.strip())
        else:
            return default
    except (ValueError, AttributeError):
        return default

def process_numeric_data(items):
    """Process list with numeric validation"""
    processed = []
    errors = []
    
    for i, item in enumerate(items):
        try:
            if not isinstance(item, dict):
                errors.append(f"Item {i}: Expected dict, got {type(item)}")
                continue
            
            numeric_value = safe_convert_to_number(item.get('value'))
            processed.append({
                'id': item.get('id', f'item_{i}'),
                'value': numeric_value,
                'original': item.get('value'),
                'valid': numeric_value != 0 or item.get('value') == 0
            })
            
        except Exception as e:
            errors.append(f"Item {i}: {str(e)}")
    
    return processed, errors

# Usage
items = args.get('vars', {}).get('items', [])
processed_items, processing_errors = process_numeric_data(items)

result = {
    'processed': processed_items,
    'errors': processing_errors,
    'success': len(processing_errors) == 0
}

Built-in Library Usage

Leverage Python's standard library for efficient data processing:

✅ Good:

import collections
import statistics
import itertools

def process_data_efficiently(data):
    """Process data using built-in libraries"""
    if not data:
        return {'error': 'No data provided'}
    
    # Use collections for grouping
    grouped = collections.defaultdict(list)
    for item in data:
        if isinstance(item, dict):
            category = item.get('category', 'unknown')
            value = item.get('value', 0)
            grouped[category].append(value)
    
    # Calculate statistics using built-in statistics module
    results = {}
    for category, values in grouped.items():
        if values:
            results[category] = {
                'count': len(values),
                'mean': statistics.mean(values),
                'median': statistics.median(values),
                'total': sum(values)
            }
    
    return results

# Usage
raw_data = args.get('vars', {}).get('data', [])
processed = process_data_efficiently(raw_data)

result = {
    'analysis': processed,
    'method': 'built-in-libraries'
}

Security Best Practices

Input Sanitization

Always sanitize external data:

✅ JavaScript:

function sanitizeString(str) {
  if (typeof str !== 'string') {
    return '';
  }
  
  // Remove potentially dangerous characters
  return str.replace(/[<>\"'&]/g, function(match) {
    var escapeMap = {
      '<': '&lt;',
      '>': '&gt;',
      '"': '&quot;',
      "'": '&#x27;',
      '&': '&amp;'
    };
    return escapeMap[match];
  });
}

var userInput = ctx.vars.userInput || '';
var sanitizationRules = ctx.consts.SANITIZATION_RULES || {};
var cleanInput = sanitizeString(userInput);

// Use cleanInput for processing

✅ Python:

import re

def sanitize_input(text):
    """Sanitize user input"""
    if not isinstance(text, str):
        return ''
    
    # Remove HTML tags
    text = re.sub(r'<[^>]+>', '', text)
    
    # Remove potentially dangerous characters
    text = text.replace('<', '&lt;').replace('>', '&gt;')
    
    # Limit length
    return text[:1000]

user_input = args.get('vars', {}).get('userInput', '')
clean_input = sanitize_input(user_input)

Secret Management

Never hardcode secrets:

❌ Bad:

var apiKey = "sk-1234567890abcdef"; // Never do this!
var dbPassword = "mypassword123"; // Never do this!

✅ Good:

// Get secrets from workflow variables (set by secure nodes)
var apiKey = ctx.consts.API_KEY || ctx.vars.apiKey;
var dbConfig = ctx.consts.DB_CONFIG || ctx.vars.databaseConfig;

if (!apiKey) {
  return { error: "API key not configured" };
}

// Use secrets safely

Performance Optimization

Efficient Data Processing

Optimize for large datasets:

✅ JavaScript:

function processLargeDataset(items) {
  var chunkSize = 1000;
  var results = [];
  
  for (var i = 0; i < items.length; i += chunkSize) {
    var chunk = items.slice(i, i + chunkSize);
    var chunkResults = processChunk(chunk);
    results = results.concat(chunkResults);
    
    // Allow other operations to run
    if (i % 5000 === 0) {
      console.log('Processed ' + i + ' items');
    }
  }
  
  return results;
}

function processChunk(chunk) {
  var processed = [];
  for (var i = 0; i < chunk.length; i++) {
    if (chunk[i] && chunk[i].active) {
      processed.push({
        id: chunk[i].id,
        value: chunk[i].value * 2
      });
    }
  }
  return processed;
}

✅ Python:

def process_large_dataset(items, chunk_size=1000):
    """Process large dataset in chunks"""
    for i in range(0, len(items), chunk_size):
        chunk = items[i:i + chunk_size]
        
        # Process chunk
        processed_chunk = []
        for item in chunk:
            if item.get('active'):
                processed_chunk.append({
                    'id': item.get('id'),
                    'value': (item.get('value', 0) * 2)
                })
        
        yield processed_chunk
        
        # Log progress
        if i % (chunk_size * 10) == 0:
            print(f"Processed {i:,} items")

# Usage
items = args.get('vars', {}).get('largeDataset', [])
all_results = []

for chunk_result in process_large_dataset(items):
    all_results.extend(chunk_result)

result = {
    'processed': all_results,
    'count': len(all_results)
}

Memory Efficiency

Use memory-efficient patterns:

✅ Python:

def calculate_statistics_efficiently(data):
    """Calculate statistics without storing all values in memory"""
    count = 0
    sum_val = 0
    sum_squares = 0
    min_val = float('inf')
    max_val = float('-inf')
    
    for item in data:
        value = item.get('value', 0)
        if isinstance(value, (int, float)):
            count += 1
            sum_val += value
            sum_squares += value * value
            min_val = min(min_val, value)
            max_val = max(max_val, value)
    
    if count == 0:
        return {'error': 'No numeric values found'}
    
    mean = sum_val / count
    variance = (sum_squares / count) - (mean * mean)
    
    return {
        'count': count,
        'sum': sum_val,
        'mean': mean,
        'variance': variance,
        'std_dev': variance ** 0.5,
        'min': min_val,
        'max': max_val
    }

Debugging and Monitoring

Logging Best Practices

Use appropriate logging levels:

✅ JavaScript:

function processWithLogging(items) {
  console.info('Starting processing of ' + items.length + ' items');
  
  var results = [];
  var errors = [];
  
  for (var i = 0; i < items.length; i++) {
    try {
      var item = items[i];
      
      if (!item || !item.id) {
        console.warn('Skipping invalid item at index ' + i);
        continue;
      }
      
      var processed = transformItem(item);
      results.push(processed);
      
      if (i % 100 === 0) {
        console.debug('Processed ' + i + ' items so far');
      }
      
    } catch (error) {
      console.error('Error processing item ' + i + ': ' + error.message);
      errors.push({ index: i, error: error.message });
    }
  }
  
  console.info('Processing complete: ' + results.length + ' successful, ' + errors.length + ' errors');
  
  return { results: results, errors: errors };
}

✅ Python:

def process_with_monitoring(items):
    """Process items with comprehensive monitoring"""
    print(f"Starting processing of {len(items)} items")
    
    results = []
    errors = []
    start_time = time.time()
    
    for i, item in enumerate(items):
        try:
            if not isinstance(item, dict) or 'id' not in item:
                print(f"Warning: Skipping invalid item at index {i}")
                continue
            
            processed = transform_item(item)
            results.append(processed)
            
            # Progress reporting
            if (i + 1) % 100 == 0:
                elapsed = time.time() - start_time
                rate = (i + 1) / elapsed
                print(f"Processed {i + 1} items ({rate:.1f} items/sec)")
                
        except Exception as e:
            error_msg = f"Error processing item {i}: {str(e)}"
            print(f"Error: {error_msg}")
            errors.append({'index': i, 'error': error_msg})
    
    total_time = time.time() - start_time
    print(f"Processing complete: {len(results)} successful, {len(errors)} errors in {total_time:.2f}s")
    
    return {
        'results': results,
        'errors': errors,
        'stats': {
            'total_items': len(items),
            'successful': len(results),
            'failed': len(errors),
            'processing_time': total_time
        }
    }

def transform_item(item):
    """Transform a single item with validation"""
    # Implementation here
    return item

Testing and Validation

Return Value Validation

Always validate your return values:

✅ Good:

function validateResult(result) {
  if (!result) {
    return { valid: false, error: 'Result is null or undefined' };
  }
  
  if (typeof result !== 'object') {
    return { valid: false, error: 'Result must be an object' };
  }
  
  // Add specific validations for your use case
  if (result.items && !Array.isArray(result.items)) {
    return { valid: false, error: 'items must be an array' };
  }
  
  return { valid: true };
}

// Before returning
var result = processData();
var validation = validateResult(result);

if (!validation.valid) {
  console.error('Invalid result: ' + validation.error);
  return { error: validation.error };
}

return result;

Unit Testing Patterns

Structure your code for testability:

✅ Good:

// Pure functions that can be tested
function calculateTotal(orders) {
  var total = 0;
  for (var i = 0; i < orders.length; i++) {
    if (orders[i] && typeof orders[i].amount === 'number') {
      total += orders[i].amount;
    }
  }
  return total;
}

function validateOrder(order) {
  var errors = [];
  
  if (!order) {
    errors.push('Order is required');
  } else {
    if (!order.id) errors.push('Order ID is required');
    if (typeof order.amount !== 'number') errors.push('Order amount must be a number');
    if (order.amount < 0) errors.push('Order amount cannot be negative');
  }
  
  return { valid: errors.length === 0, errors: errors };
}

// Main execution logic
var orders = ctx.vars.orders || [];
var userPermissions = ctx.user.roles;
var workspaceConfig = ctx.consts.WORKSPACE_CONFIG;
var validOrders = [];
var totalErrors = [];

for (var i = 0; i < orders.length; i++) {
  var validation = validateOrder(orders[i]);
  if (validation.valid) {
    validOrders.push(orders[i]);
  } else {
    totalErrors = totalErrors.concat(validation.errors);
  }
}

var total = calculateTotal(validOrders);

return {
  total: total,
  validOrders: validOrders.length,
  totalOrders: orders.length,
  errors: totalErrors
};

Common Anti-Patterns to Avoid

❌ Avoid Global State Modification

// Don't modify globals or shared state
// ❌ NEVER manipulate ctx directly in JavaScript nodes
// Results are automatically handled by the workflow engine

❌ Avoid Silent Failures

// Don't ignore errors silently
try {
  processData();
} catch (e) {
  // Silent failure - bad!
}

// Do this instead
try {
  processData();
} catch (e) {
  console.error('Processing failed:', e.message);
  return { error: e.message, success: false };
}

❌ Avoid Blocking Operations

# Don't use blocking operations without feedback
import time

# Bad - blocks without feedback
time.sleep(60)

# Better - provide progress updates
for i in range(60):
    time.sleep(1)
    if i % 10 == 0:
        print(f"Waiting... {60-i} seconds remaining")

Following these best practices will help you write robust, maintainable, and efficient code for your Axellero workflows.