Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Dec 17, 2025

📄 3,144% (31.44x) speedup for AES.encrypt in skyvern/forge/sdk/encrypt/aes.py

⏱️ Runtime : 15.0 seconds 464 milliseconds (best of 20 runs)

📝 Explanation and details

The optimized code achieves a 31x speedup (3144% improvement) and 4x throughput increase (300% improvement) through two key optimizations:

Primary Optimization: Key Derivation Caching

The most significant performance gain comes from caching the expensive PBKDF2 key derivation. In the original code, _derive_key() performed 100,000 iterations of PBKDF2-HMAC-SHA256 on every single encryption call, consuming 99.9% of execution time (15.07 seconds).

The optimized version adds:

self._derived_key: bytes | None = None

def _derive_key(self) -> bytes:
    if self._derived_key is None:
        # Only compute once per AES instance
        self._derived_key = kdf.derive(self.secret_key)
    return self._derived_key

This reduces key derivation from 1,273 expensive operations to just 39, since the derived key is identical for all encryptions using the same AES instance (same secret_key and salt).

Secondary Optimization: Padding Performance

The _pad method was optimized from:

padding = bytes([padding_length] * padding_length)  # Creates intermediate list

to:

padding = bytes([padding_length]) * padding_length  # Direct bytes multiplication

This avoids creating an intermediate list object, reducing both memory allocation and execution time by ~27% for the padding operation.

Impact Analysis

  • High-volume scenarios benefit most - the line profiler shows the optimization is particularly effective when the same AES instance encrypts multiple messages
  • Throughput improvements are substantial for sustained encryption workloads, increasing from 6,365 to 25,460 operations/second
  • Test results show consistent performance gains across all test cases, with the largest improvements in concurrent and high-volume scenarios (500+ encryptions)
  • The optimization maintains identical cryptographic security and deterministic output behavior

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 1313 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Generated Regression Tests and Runtime
import asyncio  # used to run async functions
# function to test
# Copied EXACTLY as provided, DO NOT MODIFY
import base64
import hashlib

import pytest  # used for our unit tests
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from skyvern.forge.sdk.encrypt.aes import AES

# ----------------- UNIT TESTS -----------------

# Basic Test Cases

@pytest.mark.asyncio
async def test_encrypt_basic_string():
    """Test encryption of a basic string with default salt and iv."""
    aes = AES(secret_key="mysecretkey")
    plaintext = "hello world"
    ciphertext = await aes.encrypt(plaintext)
    # Should be valid base64
    base64.b64decode(ciphertext)

@pytest.mark.asyncio
async def test_encrypt_empty_string():
    """Test encryption of an empty string."""
    aes = AES(secret_key="mysecretkey")
    plaintext = ""
    ciphertext = await aes.encrypt(plaintext)
    base64.b64decode(ciphertext)

@pytest.mark.asyncio
async def test_encrypt_unicode_string():
    """Test encryption of a unicode string."""
    aes = AES(secret_key="mysecretkey")
    plaintext = "こんにちは世界🌏"
    ciphertext = await aes.encrypt(plaintext)
    base64.b64decode(ciphertext)

@pytest.mark.asyncio
async def test_encrypt_with_custom_salt_and_iv():
    """Test encryption with custom salt and iv."""
    aes = AES(secret_key="mysecretkey", salt="mysalt", iv="myiv")
    plaintext = "custom salt and iv"
    ciphertext = await aes.encrypt(plaintext)
    base64.b64decode(ciphertext)

@pytest.mark.asyncio
async def test_encrypt_consistency_with_same_params():
    """Encrypting the same plaintext with the same key/salt/iv should produce the same ciphertext."""
    aes = AES(secret_key="mysecretkey", salt="mysalt", iv="myiv")
    plaintext = "repeatable"
    ciphertext1 = await aes.encrypt(plaintext)
    ciphertext2 = await aes.encrypt(plaintext)

@pytest.mark.asyncio
async def test_encrypt_different_keys_produce_different_ciphertext():
    """Different secret keys should produce different ciphertexts for the same plaintext."""
    aes1 = AES(secret_key="key1")
    aes2 = AES(secret_key="key2")
    plaintext = "sameplaintext"
    ciphertext1 = await aes1.encrypt(plaintext)
    ciphertext2 = await aes2.encrypt(plaintext)

# Edge Test Cases

@pytest.mark.asyncio
async def test_encrypt_long_string():
    """Test encryption of a long string (edge case)."""
    aes = AES(secret_key="mysecretkey")
    plaintext = "A" * 512  # 512 bytes
    ciphertext = await aes.encrypt(plaintext)
    base64.b64decode(ciphertext)

@pytest.mark.asyncio
async def test_encrypt_non_ascii_bytes():
    """Test encryption of a string containing non-ascii bytes."""
    aes = AES(secret_key="mysecretkey")
    plaintext = "üñîçødë"  # contains non-ascii characters
    ciphertext = await aes.encrypt(plaintext)
    base64.b64decode(ciphertext)

@pytest.mark.asyncio
async def test_encrypt_concurrent_execution():
    """Test concurrent encryption of multiple plaintexts."""
    aes = AES(secret_key="mysecretkey")
    plaintexts = ["one", "two", "three", "four", "five"]
    # Run all encryptions concurrently
    results = await asyncio.gather(*(aes.encrypt(pt) for pt in plaintexts))
    for ct in results:
        base64.b64decode(ct)

@pytest.mark.asyncio
async def test_encrypt_exception_handling():
    """Test that encrypt raises a descriptive exception on invalid input."""
    aes = AES(secret_key="mysecretkey")
    # Simulate invalid input by passing a non-string (should raise exception)
    with pytest.raises(Exception) as excinfo:
        await aes.encrypt(None)  # type: ignore

@pytest.mark.asyncio
async def test_encrypt_empty_secret_key():
    """Test encryption with an empty secret key (edge case)."""
    aes = AES(secret_key="")
    plaintext = "test"
    ciphertext = await aes.encrypt(plaintext)
    base64.b64decode(ciphertext)

@pytest.mark.asyncio
async def test_encrypt_empty_salt_and_iv():
    """Test encryption with empty salt and iv (should fallback to defaults)."""
    aes = AES(secret_key="mysecretkey", salt="", iv="")
    plaintext = "test"
    ciphertext = await aes.encrypt(plaintext)
    base64.b64decode(ciphertext)

# Large Scale Test Cases

@pytest.mark.asyncio
async def test_encrypt_large_scale_concurrent():
    """Test encrypting a large number of plaintexts concurrently."""
    aes = AES(secret_key="mysecretkey")
    plaintexts = [f"msg_{i}" for i in range(100)]
    results = await asyncio.gather(*(aes.encrypt(pt) for pt in plaintexts))
    for ct in results:
        base64.b64decode(ct)

@pytest.mark.asyncio
async def test_encrypt_large_string():
    """Test encryption of a very large string (edge case, but bounded)."""
    aes = AES(secret_key="mysecretkey")
    plaintext = "B" * 1024  # 1KB string
    ciphertext = await aes.encrypt(plaintext)
    base64.b64decode(ciphertext)

# Throughput Test Cases

@pytest.mark.asyncio
async def test_AES_encrypt_throughput_small_load():
    """Throughput: Test encrypting a small batch of messages quickly."""
    aes = AES(secret_key="mysecretkey")
    plaintexts = [f"small_{i}" for i in range(10)]
    results = await asyncio.gather(*(aes.encrypt(pt) for pt in plaintexts))
    for ct in results:
        base64.b64decode(ct)

@pytest.mark.asyncio
async def test_AES_encrypt_throughput_medium_load():
    """Throughput: Test encrypting a medium batch of messages quickly."""
    aes = AES(secret_key="mysecretkey")
    plaintexts = [f"medium_{i}" for i in range(100)]
    results = await asyncio.gather(*(aes.encrypt(pt) for pt in plaintexts))
    for ct in results:
        base64.b64decode(ct)

@pytest.mark.asyncio
async def test_AES_encrypt_throughput_high_volume():
    """Throughput: Test encrypting a high volume of messages quickly (bounded)."""
    aes = AES(secret_key="mysecretkey")
    plaintexts = [f"high_{i}" for i in range(500)]
    results = await asyncio.gather(*(aes.encrypt(pt) for pt in plaintexts))
    for ct in results:
        base64.b64decode(ct)

@pytest.mark.asyncio
async def test_AES_encrypt_throughput_varied_lengths():
    """Throughput: Test encrypting messages of varied lengths."""
    aes = AES(secret_key="mysecretkey")
    plaintexts = ["short", "medium" * 10, "long" * 100, "x" * 512, "y" * 1024]
    results = await asyncio.gather(*(aes.encrypt(pt) for pt in plaintexts))
    for ct in results:
        base64.b64decode(ct)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
import asyncio  # used to run async functions
# function to test
import base64
import hashlib

import pytest  # used for our unit tests
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from skyvern.forge.sdk.encrypt.aes import AES

# ------------------- UNIT TESTS -------------------

# Basic Test Cases

@pytest.mark.asyncio
async def test_encrypt_basic_string():
    """Test encrypting a basic ASCII string."""
    aes = AES(secret_key="mysecret")
    result = await aes.encrypt("hello world")
    # Should decode to bytes of correct length (multiple of 16)
    decoded = base64.b64decode(result)

@pytest.mark.asyncio
async def test_encrypt_empty_string():
    """Test encrypting an empty string."""
    aes = AES(secret_key="mysecret")
    result = await aes.encrypt("")
    decoded = base64.b64decode(result)

@pytest.mark.asyncio
async def test_encrypt_unicode_string():
    """Test encrypting a string with unicode characters."""
    aes = AES(secret_key="mysecret")
    plaintext = "你好,世界🌍"
    result = await aes.encrypt(plaintext)
    decoded = base64.b64decode(result)

@pytest.mark.asyncio
async def test_encrypt_with_custom_salt_iv():
    """Test encrypting with custom salt and IV."""
    aes = AES(secret_key="mysecret", salt="mysalt", iv="myiv")
    result = await aes.encrypt("test123")
    decoded = base64.b64decode(result)

@pytest.mark.asyncio
async def test_encrypt_same_input_same_output_with_same_key():
    """Test deterministic output for same input/key/salt/iv."""
    aes1 = AES(secret_key="key", salt="salt", iv="iv")
    aes2 = AES(secret_key="key", salt="salt", iv="iv")
    plaintext = "repeatable"
    # Should produce same output
    out1 = await aes1.encrypt(plaintext)
    out2 = await aes2.encrypt(plaintext)

@pytest.mark.asyncio
async def test_encrypt_same_input_different_key():
    """Test different output for same input with different key."""
    aes1 = AES(secret_key="key1")
    aes2 = AES(secret_key="key2")
    plaintext = "repeatable"
    out1 = await aes1.encrypt(plaintext)
    out2 = await aes2.encrypt(plaintext)

# Edge Test Cases

@pytest.mark.asyncio
async def test_encrypt_non_ascii_edge():
    """Test encrypting string with edge-case unicode (emoji, accents)."""
    aes = AES(secret_key="edgekey")
    plaintext = "éèêëēėę 😃🚀"
    result = await aes.encrypt(plaintext)
    decoded = base64.b64decode(result)

@pytest.mark.asyncio
async def test_encrypt_invalid_input_type_raises():
    """Test that non-string input raises an exception."""
    aes = AES(secret_key="key")
    with pytest.raises(Exception) as excinfo:
        await aes.encrypt(12345)  # Not a string

@pytest.mark.asyncio
async def test_encrypt_invalid_key_type_raises():
    """Test that non-string key raises an exception at init."""
    with pytest.raises(AttributeError):
        AES(secret_key=12345)  # Not a string

@pytest.mark.asyncio
async def test_encrypt_concurrent_execution():
    """Test concurrent encryptions with different inputs."""
    aes = AES(secret_key="concurrent")
    inputs = ["alpha", "beta", "gamma", "delta", "epsilon"]
    results = await asyncio.gather(*(aes.encrypt(x) for x in inputs))
    for res in results:
        pass

@pytest.mark.asyncio
async def test_encrypt_concurrent_same_input():
    """Test concurrent encryptions with same input."""
    aes = AES(secret_key="concurrent")
    inputs = ["same"] * 5
    results = await asyncio.gather(*(aes.encrypt(x) for x in inputs))

@pytest.mark.asyncio
async def test_encrypt_iv_and_salt_edge_cases():
    """Test encrypting with empty salt and iv strings."""
    aes = AES(secret_key="key", salt="", iv="")
    result = await aes.encrypt("edgecase")
    decoded = base64.b64decode(result)

# Large Scale Test Cases

@pytest.mark.asyncio
async def test_encrypt_large_string():
    """Test encrypting a large string."""
    aes = AES(secret_key="largekey")
    plaintext = "A" * 4096  # 4KB of 'A'
    result = await aes.encrypt(plaintext)
    decoded = base64.b64decode(result)

@pytest.mark.asyncio
async def test_encrypt_many_concurrent_large_scale():
    """Test many concurrent encryptions with unique inputs."""
    aes = AES(secret_key="scale")
    inputs = [f"input_{i}" for i in range(50)]
    results = await asyncio.gather(*(aes.encrypt(x) for x in inputs))
    for res in results:
        pass

# Throughput Test Cases

@pytest.mark.asyncio
async def test_AES_encrypt_throughput_small_load():
    """Throughput test: small load (10 encryptions)."""
    aes = AES(secret_key="throughput")
    inputs = [f"msg_{i}" for i in range(10)]
    results = await asyncio.gather(*(aes.encrypt(x) for x in inputs))

@pytest.mark.asyncio
async def test_AES_encrypt_throughput_medium_load():
    """Throughput test: medium load (100 encryptions)."""
    aes = AES(secret_key="throughput")
    inputs = [f"msg_{i}" for i in range(100)]
    results = await asyncio.gather(*(aes.encrypt(x) for x in inputs))

@pytest.mark.asyncio
async def test_AES_encrypt_throughput_large_load():
    """Throughput test: large load (250 encryptions)."""
    aes = AES(secret_key="throughput")
    inputs = [f"msg_{i}" for i in range(250)]
    results = await asyncio.gather(*(aes.encrypt(x) for x in inputs))

@pytest.mark.asyncio
async def test_AES_encrypt_throughput_varied_lengths():
    """Throughput test: encrypt strings of varied lengths."""
    aes = AES(secret_key="throughput")
    inputs = [
        "short",
        "medium" * 10,
        "long" * 100,
        "unicode" + "🌟" * 50,
        "A" * 512,
        "B" * 1024,
        "C" * 2048,
    ]
    results = await asyncio.gather(*(aes.encrypt(x) for x in inputs))
    for res in results:
        decoded = base64.b64decode(res)

@pytest.mark.asyncio
async def test_AES_encrypt_throughput_sustained_execution():
    """Throughput test: sustained execution pattern."""
    aes = AES(secret_key="throughput")
    # Simulate sustained load: 5 rounds of 20 encryptions
    for round in range(5):
        inputs = [f"round{round}_msg_{i}" for i in range(20)]
        results = await asyncio.gather(*(aes.encrypt(x) for x in inputs))
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-AES.encrypt-mjaa28jy and push.

Codeflash Static Badge

The optimized code achieves a **31x speedup (3144% improvement)** and **4x throughput increase (300% improvement)** through two key optimizations:

## **Primary Optimization: Key Derivation Caching**
The most significant performance gain comes from caching the expensive PBKDF2 key derivation. In the original code, `_derive_key()` performed 100,000 iterations of PBKDF2-HMAC-SHA256 on **every single encryption call**, consuming 99.9% of execution time (15.07 seconds). 

The optimized version adds:
```python
self._derived_key: bytes | None = None

def _derive_key(self) -> bytes:
    if self._derived_key is None:
        # Only compute once per AES instance
        self._derived_key = kdf.derive(self.secret_key)
    return self._derived_key
```

This reduces key derivation from 1,273 expensive operations to just 39, since the derived key is identical for all encryptions using the same AES instance (same secret_key and salt).

## **Secondary Optimization: Padding Performance**
The `_pad` method was optimized from:
```python
padding = bytes([padding_length] * padding_length)  # Creates intermediate list
```
to:
```python
padding = bytes([padding_length]) * padding_length  # Direct bytes multiplication
```

This avoids creating an intermediate list object, reducing both memory allocation and execution time by ~27% for the padding operation.

## **Impact Analysis**
- **High-volume scenarios** benefit most - the line profiler shows the optimization is particularly effective when the same AES instance encrypts multiple messages
- **Throughput improvements** are substantial for sustained encryption workloads, increasing from 6,365 to 25,460 operations/second
- **Test results** show consistent performance gains across all test cases, with the largest improvements in concurrent and high-volume scenarios (500+ encryptions)
- The optimization maintains **identical cryptographic security** and deterministic output behavior
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 December 17, 2025 17:20
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Dec 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant