Back to Journal
DevOps

Complete Guide to Infrastructure as Code with Python

A comprehensive guide to implementing Infrastructure as Code using Python, covering architecture, code examples, and production-ready patterns.

Muneer Puthiya Purayil 16 min read

Complete Guide to Infrastructure as Code with Python

Managing cloud infrastructure manually through web consoles doesn't scale. Once you're past a handful of resources, you need reproducibility, version control, and automated provisioning. Python has become one of the strongest choices for Infrastructure as Code (IaC), combining a massive ecosystem with readable syntax that operations and development teams can share.

This guide covers the full landscape of Python-based IaC — from Pulumi's native Python SDK to CDK for Terraform, AWS CDK, and raw SDK automation. You'll see production-ready patterns, real code, and the tradeoffs that matter at scale.

Why Python for Infrastructure as Code

Python's dominance in IaC comes down to three factors:

Ecosystem depth. Libraries like boto3, google-cloud-*, and azure-mgmt-* give you direct API access when abstractions fall short. You can mix IaC definitions with custom logic — data lookups, conditional provisioning, integration tests — without switching languages.

Team accessibility. Most engineering organizations already have Python expertise. Data engineers, ML teams, backend developers, and SREs all read Python fluently. This reduces the bus factor on infrastructure code.

Tooling maturity. Type checking with mypy, testing with pytest, linting with ruff — the entire Python quality toolchain applies to your infrastructure definitions.

Pulumi with Python: The Native Experience

Pulumi treats infrastructure as real code, not configuration. You write standard Python, and Pulumi's engine handles the dependency graph, state management, and cloud API calls.

Setting Up a Pulumi Python Project

bash
pulumi new aws-python --name my-infra --stack dev

This generates a standard Python project structure:

1my-infra/
2├── __main__.py
3├── Pulumi.yaml
4├── Pulumi.dev.yaml
5├── requirements.txt
6└── venv/
7 

Provisioning a Production VPC

python
1import pulumi
2import pulumi_aws as aws
3 
4config = pulumi.Config()
5env = pulumi.get_stack()
6cidr_block = config.require("vpc_cidr")
7 
8vpc = aws.ec2.Vpc(
9 f"{env}-vpc",
10 cidr_block=cidr_block,
11 enable_dns_hostnames=True,
12 enable_dns_support=True,
13 tags={"Name": f"{env}-vpc", "Environment": env, "ManagedBy": "pulumi"},
14)
15 
16availability_zones = aws.get_availability_zones(state="available")
17 
18public_subnets = []
19private_subnets = []
20 
21for i, az in enumerate(availability_zones.names[:3]):
22 public_subnet = aws.ec2.Subnet(
23 f"{env}-public-{az}",
24 vpc_id=vpc.id,
25 cidr_block=f"10.0.{i * 2}.0/24",
26 availability_zone=az,
27 map_public_ip_on_launch=True,
28 tags={"Name": f"{env}-public-{az}", "Tier": "public"},
29 )
30 public_subnets.append(public_subnet)
31 
32 private_subnet = aws.ec2.Subnet(
33 f"{env}-private-{az}",
34 vpc_id=vpc.id,
35 cidr_block=f"10.0.{i * 2 + 1}.0/24",
36 availability_zone=az,
37 tags={"Name": f"{env}-private-{az}", "Tier": "private"},
38 )
39 private_subnets.append(private_subnet)
40 
41igw = aws.ec2.InternetGateway(
42 f"{env}-igw",
43 vpc_id=vpc.id,
44 tags={"Name": f"{env}-igw"},
45)
46 
47public_rt = aws.ec2.RouteTable(
48 f"{env}-public-rt",
49 vpc_id=vpc.id,
50 routes=[aws.ec2.RouteTableRouteArgs(
51 cidr_block="0.0.0.0/0",
52 gateway_id=igw.id,
53 )],
54)
55 
56for i, subnet in enumerate(public_subnets):
57 aws.ec2.RouteTableAssociation(
58 f"{env}-public-rta-{i}",
59 subnet_id=subnet.id,
60 route_table_id=public_rt.id,
61 )
62 
63pulumi.export("vpc_id", vpc.id)
64pulumi.export("public_subnet_ids", [s.id for s in public_subnets])
65pulumi.export("private_subnet_ids", [s.id for s in private_subnets])
66 

This is standard Python. You can extract functions, create classes, write unit tests — all the things you'd do with application code.

Component Resources for Reusability

Pulumi's ComponentResource lets you build reusable infrastructure modules:

python
1import pulumi
2import pulumi_aws as aws
3from typing import Sequence, Optional
4 
5 
6class DatabaseCluster(pulumi.ComponentResource):
7 def __init__(
8 self,
9 name: str,
10 vpc_id: pulumi.Input[str],
11 subnet_ids: Sequence[pulumi.Input[str]],
12 instance_class: str = "db.r6g.large",
13 instance_count: int = 2,
14 engine_version: str = "15.4",
15 opts: Optional[pulumi.ResourceOptions] = None,
16 ):
17 super().__init__("custom:database:PostgresCluster", name, None, opts)
18 
19 self.subnet_group = aws.rds.SubnetGroup(
20 f"{name}-subnet-group",
21 subnet_ids=subnet_ids,
22 tags={"Name": f"{name}-subnet-group"},
23 opts=pulumi.ResourceOptions(parent=self),
24 )
25 
26 self.security_group = aws.ec2.SecurityGroup(
27 f"{name}-sg",
28 vpc_id=vpc_id,
29 ingress=[aws.ec2.SecurityGroupIngressArgs(
30 protocol="tcp",
31 from_port=5432,
32 to_port=5432,
33 cidr_blocks=["10.0.0.0/8"],
34 )],
35 egress=[aws.ec2.SecurityGroupEgressArgs(
36 protocol="-1",
37 from_port=0,
38 to_port=0,
39 cidr_blocks=["0.0.0.0/0"],
40 )],
41 opts=pulumi.ResourceOptions(parent=self),
42 )
43 
44 self.cluster = aws.rds.Cluster(
45 f"{name}-cluster",
46 engine=aws.rds.EngineType.AURORA_POSTGRESQL,
47 engine_version=engine_version,
48 db_subnet_group_name=self.subnet_group.name,
49 vpc_security_group_ids=[self.security_group.id],
50 master_username="admin",
51 manage_master_user_password=True,
52 storage_encrypted=True,
53 backup_retention_period=7,
54 preferred_backup_window="03:00-04:00",
55 deletion_protection=True,
56 opts=pulumi.ResourceOptions(parent=self),
57 )
58 
59 self.instances = []
60 for i in range(instance_count):
61 instance = aws.rds.ClusterInstance(
62 f"{name}-instance-{i}",
63 cluster_identifier=self.cluster.id,
64 instance_class=instance_class,
65 engine=aws.rds.EngineType.AURORA_POSTGRESQL,
66 engine_version=engine_version,
67 opts=pulumi.ResourceOptions(parent=self),
68 )
69 self.instances.append(instance)
70 
71 self.register_outputs({
72 "cluster_endpoint": self.cluster.endpoint,
73 "reader_endpoint": self.cluster.reader_endpoint,
74 })
75 

Usage becomes clean:

python
1db = DatabaseCluster(
2 "production-db",
3 vpc_id=vpc.id,
4 subnet_ids=[s.id for s in private_subnets],
5 instance_class="db.r6g.xlarge",
6 instance_count=3,
7)
8 

CDK for Terraform (CDKTF) with Python

If your organization is invested in Terraform's ecosystem — providers, state backends, Terraform Cloud — CDKTF lets you write Python while keeping the Terraform engine underneath.

Project Setup

bash
cdktf init --template=python --providers=aws

Defining an ECS Fargate Service

python
1from constructs import Construct
2from cdktf import App, TerraformStack, TerraformOutput, S3Backend
3from cdktf_cdktf_provider_aws.provider import AwsProvider
4from cdktf_cdktf_provider_aws.ecs_cluster import EcsCluster
5from cdktf_cdktf_provider_aws.ecs_service import EcsService, EcsServiceNetworkConfiguration
6from cdktf_cdktf_provider_aws.ecs_task_definition import (
7 EcsTaskDefinition,
8 EcsTaskDefinitionContainerDefinitions,
9)
10import json
11 
12 
13class FargateServiceStack(TerraformStack):
14 def __init__(self, scope: Construct, id: str, env: str):
15 super().__init__(scope, id)
16 
17 AwsProvider(self, "aws", region="us-east-1")
18 
19 S3Backend(
20 self,
21 bucket="my-terraform-state",
22 key=f"ecs/{env}/terraform.tfstate",
23 region="us-east-1",
24 dynamodb_table="terraform-locks",
25 )
26 
27 cluster = EcsCluster(
28 self, "cluster",
29 name=f"{env}-api-cluster",
30 setting=[{"name": "containerInsights", "value": "enabled"}],
31 )
32 
33 container_defs = json.dumps([{
34 "name": "api",
35 "image": f"123456789.dkr.ecr.us-east-1.amazonaws.com/api:{env}",
36 "cpu": 512,
37 "memory": 1024,
38 "essential": True,
39 "portMappings": [{"containerPort": 8080, "protocol": "tcp"}],
40 "logConfiguration": {
41 "logDriver": "awslogs",
42 "options": {
43 "awslogs-group": f"/ecs/{env}/api",
44 "awslogs-region": "us-east-1",
45 "awslogs-stream-prefix": "api",
46 },
47 },
48 "healthCheck": {
49 "command": ["CMD-SHELL", "curl -f http://localhost:8080/health || exit 1"],
50 "interval": 30,
51 "timeout": 5,
52 "retries": 3,
53 },
54 }])
55 
56 task_def = EcsTaskDefinition(
57 self, "task",
58 family=f"{env}-api",
59 requires_compatibilities=["FARGATE"],
60 network_mode="awsvpc",
61 cpu="512",
62 memory="1024",
63 execution_role_arn=f"arn:aws:iam::role/{env}-ecs-execution",
64 task_role_arn=f"arn:aws:iam::role/{env}-ecs-task",
65 container_definitions=container_defs,
66 )
67 
68 service = EcsService(
69 self, "service",
70 name=f"{env}-api-service",
71 cluster=cluster.arn,
72 task_definition=task_def.arn,
73 desired_count=3,
74 launch_type="FARGATE",
75 network_configuration=EcsServiceNetworkConfiguration(
76 subnets=["subnet-abc123", "subnet-def456"],
77 security_groups=["sg-123456"],
78 assign_public_ip=False,
79 ),
80 deployment_minimum_healthy_percent=50,
81 deployment_maximum_percent=200,
82 )
83 
84 TerraformOutput(self, "cluster_arn", value=cluster.arn)
85 TerraformOutput(self, "service_name", value=service.name)
86 
87 
88app = App()
89FargateServiceStack(app, "production", env="production")
90FargateServiceStack(app, "staging", env="staging")
91app.synth()
92 

CDKTF vs Pulumi: Key Differences

AspectCDKTFPulumi
State managementTerraform state (S3, TF Cloud)Pulumi Cloud or self-managed
Provider ecosystemAll Terraform providersPulumi-native + Terraform bridge
Execution modelSynth to HCL, then terraform applyDirect API calls via gRPC
Plan outputStandard terraform planpulumi preview
Drift detectionterraform planpulumi refresh
MaturityGA since 2022GA since 2019

AWS CDK with Python

For AWS-only infrastructure, AWS CDK provides the highest-level abstractions:

python
1from aws_cdk import (
2 App, Stack, Duration, RemovalPolicy,
3 aws_lambda as lambda_,
4 aws_apigateway as apigw,
5 aws_dynamodb as dynamodb,
6 aws_logs as logs,
7)
8from constructs import Construct
9 
10 
11class ApiStack(Stack):
12 def __init__(self, scope: Construct, id: str, **kwargs):
13 super().__init__(scope, id, **kwargs)
14 
15 table = dynamodb.Table(
16 self, "OrdersTable",
17 partition_key=dynamodb.Attribute(
18 name="order_id", type=dynamodb.AttributeType.STRING
19 ),
20 sort_key=dynamodb.Attribute(
21 name="created_at", type=dynamodb.AttributeType.STRING
22 ),
23 billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
24 point_in_time_recovery=True,
25 removal_policy=RemovalPolicy.RETAIN,
26 )
27 
28 table.add_global_secondary_index(
29 index_name="customer-index",
30 partition_key=dynamodb.Attribute(
31 name="customer_id", type=dynamodb.AttributeType.STRING
32 ),
33 sort_key=dynamodb.Attribute(
34 name="created_at", type=dynamodb.AttributeType.STRING
35 ),
36 )
37 
38 handler = lambda_.Function(
39 self, "OrderHandler",
40 runtime=lambda_.Runtime.PYTHON_3_12,
41 handler="main.handler",
42 code=lambda_.Code.from_asset("lambda/orders"),
43 memory_size=256,
44 timeout=Duration.seconds(30),
45 environment={
46 "TABLE_NAME": table.table_name,
47 "LOG_LEVEL": "INFO",
48 },
49 log_retention=logs.RetentionDays.TWO_WEEKS,
50 tracing=lambda_.Tracing.ACTIVE,
51 )
52 
53 table.grant_read_write_data(handler)
54 
55 api = apigw.RestApi(
56 self, "OrdersApi",
57 rest_api_name="orders-api",
58 deploy_options=apigw.StageOptions(
59 stage_name="v1",
60 throttling_rate_limit=1000,
61 throttling_burst_limit=500,
62 logging_level=apigw.MethodLoggingLevel.INFO,
63 metrics_enabled=True,
64 ),
65 )
66 
67 orders = api.root.add_resource("orders")
68 orders.add_method("GET", apigw.LambdaIntegration(handler))
69 orders.add_method("POST", apigw.LambdaIntegration(handler))
70 
71 order = orders.add_resource("{order_id}")
72 order.add_method("GET", apigw.LambdaIntegration(handler))
73 

AWS CDK's grant_* methods and L2 constructs handle IAM policies, security groups, and resource connections automatically. For pure-AWS shops, this eliminates significant boilerplate.

Testing Infrastructure Code

One of Python's strongest advantages for IaC is testability. You can use standard Python testing tools.

Unit Testing Pulumi Resources

python
1import pulumi
2import pytest
3from unittest.mock import MagicMock
4 
5 
6class MockResource:
7 """Mock Pulumi outputs for testing."""
8 def __init__(self, values: dict):
9 for key, value in values.items():
10 setattr(self, key, pulumi.Output.from_input(value))
11 
12 
13@pytest.fixture
14def mock_config(monkeypatch):
15 monkeypatch.setenv("PULUMI_CONFIG", '{"project:vpc_cidr":"10.0.0.0/16"}')
16 
17 
18@pulumi.runtime.test
19def test_vpc_creates_correct_subnets(mock_config):
20 from infra.networking import create_vpc
21 
22 resources = []
23 
24 def track_resources(args):
25 resources.append(args)
26 return pulumi.runtime.RegisterResourceResult(
27 urn="test-urn",
28 id="test-id",
29 state=args.props,
30 )
31 
32 pulumi.runtime.set_mocks(
33 pulumi.runtime.MockCallbacks(
34 new_resource=track_resources,
35 call=lambda args: {},
36 )
37 )
38 
39 result = create_vpc("test", "10.0.0.0/16", 3)
40 
41 subnet_resources = [r for r in resources if "Subnet" in r.type_]
42 assert len(subnet_resources) == 6 # 3 public + 3 private
43 
44 
45@pulumi.runtime.test
46def test_database_cluster_has_encryption():
47 from infra.database import DatabaseCluster
48 
49 resources = []
50 
51 def track_resources(args):
52 resources.append(args)
53 return pulumi.runtime.RegisterResourceResult(
54 urn="test-urn", id="test-id", state=args.props,
55 )
56 
57 pulumi.runtime.set_mocks(
58 pulumi.runtime.MockCallbacks(
59 new_resource=track_resources,
60 call=lambda args: {},
61 )
62 )
63 
64 db = DatabaseCluster("test-db", vpc_id="vpc-123", subnet_ids=["s-1", "s-2"])
65 
66 cluster = next(r for r in resources if "rds/cluster:Cluster" in r.type_)
67 assert cluster.props["storage_encrypted"] is True
68 assert cluster.props["deletion_protection"] is True
69 

Integration Testing with LocalStack

python
1import boto3
2import pytest
3import subprocess
4import json
5 
6 
7@pytest.fixture(scope="session")
8def localstack():
9 """Assumes LocalStack is running on port 4566."""
10 return boto3.Session(
11 aws_access_key_id="test",
12 aws_secret_access_key="test",
13 region_name="us-east-1",
14 )
15 
16 
17@pytest.fixture(scope="session")
18def deployed_stack(localstack):
19 result = subprocess.run(
20 ["cdktf", "deploy", "--auto-approve", "--outputs-file", "/tmp/outputs.json"],
21 env={
22 "AWS_ENDPOINT_URL": "http://localhost:4566",
23 "CDKTF_OUTDIR": "/tmp/cdktf-test",
24 },
25 capture_output=True,
26 text=True,
27 )
28 assert result.returncode == 0
29 
30 with open("/tmp/outputs.json") as f:
31 return json.load(f)
32 
33 
34def test_dynamodb_table_created(localstack, deployed_stack):
35 client = localstack.client("dynamodb", endpoint_url="http://localhost:4566")
36 tables = client.list_tables()["TableNames"]
37 assert any("Orders" in t for t in tables)
38 
39 
40def test_lambda_function_responds(localstack, deployed_stack):
41 client = localstack.client("lambda", endpoint_url="http://localhost:4566")
42 response = client.invoke(
43 FunctionName=deployed_stack["handler_name"],
44 Payload=json.dumps({"httpMethod": "GET", "path": "/orders"}),
45 )
46 payload = json.loads(response["Payload"].read())
47 assert payload["statusCode"] == 200
48 

Python-Native Automation with boto3

Sometimes you don't need a full IaC framework. For operational scripts, boto3 with proper structure works well:

python
1import boto3
2from dataclasses import dataclass, field
3from typing import Optional
4import logging
5 
6logger = logging.getLogger(__name__)
7 
8 
9@dataclass
10class InstanceConfig:
11 name: str
12 instance_type: str
13 ami_id: str
14 subnet_id: str
15 security_group_ids: list[str]
16 key_name: Optional[str] = None
17 user_data: Optional[str] = None
18 tags: dict[str, str] = field(default_factory=dict)
19 ebs_optimized: bool = True
20 monitoring: bool = True
21 
22 
23class EC2Provisioner:
24 def __init__(self, region: str = "us-east-1"):
25 self.ec2 = boto3.resource("ec2", region_name=region)
26 self.client = boto3.client("ec2", region_name=region)
27 
28 def launch_instance(self, config: InstanceConfig) -> str:
29 all_tags = {
30 "Name": config.name,
31 "ManagedBy": "python-automation",
32 **config.tags,
33 }
34 
35 kwargs = {
36 "ImageId": config.ami_id,
37 "InstanceType": config.instance_type,
38 "SubnetId": config.subnet_id,
39 "SecurityGroupIds": config.security_group_ids,
40 "EbsOptimized": config.ebs_optimized,
41 "Monitoring": {"Enabled": config.monitoring},
42 "TagSpecifications": [{
43 "ResourceType": "instance",
44 "Tags": [{"Key": k, "Value": v} for k, v in all_tags.items()],
45 }],
46 "MinCount": 1,
47 "MaxCount": 1,
48 }
49 
50 if config.key_name:
51 kwargs["KeyName"] = config.key_name
52 if config.user_data:
53 kwargs["UserData"] = config.user_data
54 
55 instances = self.ec2.create_instances(**kwargs)
56 instance = instances[0]
57 instance.wait_until_running()
58 
59 logger.info(f"Launched instance {instance.id} ({config.name})")
60 return instance.id
61 
62 def ensure_instance_running(self, instance_id: str) -> dict:
63 instance = self.ec2.Instance(instance_id)
64 state = instance.state["Name"]
65 
66 if state == "stopped":
67 instance.start()
68 instance.wait_until_running()
69 logger.info(f"Started instance {instance_id}")
70 elif state == "running":
71 logger.info(f"Instance {instance_id} already running")
72 else:
73 raise RuntimeError(f"Instance {instance_id} in unexpected state: {state}")
74 
75 instance.reload()
76 return {
77 "id": instance.id,
78 "state": instance.state["Name"],
79 "public_ip": instance.public_ip_address,
80 "private_ip": instance.private_ip_address,
81 }
82 

Need a second opinion on your DevOps pipelines architecture?

I run free 30-minute strategy calls for engineering teams tackling this exact problem.

Book a Free Call

Multi-Cloud Patterns

Python excels at abstracting over cloud providers. Here's a pattern for multi-cloud storage:

python
1import pulumi
2import pulumi_aws as aws
3import pulumi_gcp as gcp
4from abc import ABC, abstractmethod
5 
6 
7class StorageBucket(ABC):
8 @abstractmethod
9 def create(self, name: str, versioning: bool = True) -> pulumi.Output[str]: ...
10 
11 
12class AWSBucket(StorageBucket):
13 def create(self, name: str, versioning: bool = True) -> pulumi.Output[str]:
14 bucket = aws.s3.BucketV2(name, bucket=name)
15 
16 if versioning:
17 aws.s3.BucketVersioningV2(
18 f"{name}-versioning",
19 bucket=bucket.id,
20 versioning_configuration=aws.s3.BucketVersioningV2VersioningConfigurationArgs(
21 status="Enabled"
22 ),
23 )
24 
25 aws.s3.BucketServerSideEncryptionConfigurationV2(
26 f"{name}-encryption",
27 bucket=bucket.id,
28 rules=[aws.s3.BucketServerSideEncryptionConfigurationV2RuleArgs(
29 apply_server_side_encryption_by_default=aws.s3.BucketServerSideEncryptionConfigurationV2RuleApplyServerSideEncryptionByDefaultArgs(
30 sse_algorithm="aws:kms",
31 ),
32 )],
33 )
34 
35 aws.s3.BucketPublicAccessBlock(
36 f"{name}-public-block",
37 bucket=bucket.id,
38 block_public_acls=True,
39 block_public_policy=True,
40 ignore_public_acls=True,
41 restrict_public_buckets=True,
42 )
43 
44 return bucket.arn
45 
46 
47class GCPBucket(StorageBucket):
48 def create(self, name: str, versioning: bool = True) -> pulumi.Output[str]:
49 bucket = gcp.storage.Bucket(
50 name,
51 name=name,
52 location="US",
53 uniform_bucket_level_access=True,
54 versioning=gcp.storage.BucketVersioningArgs(enabled=versioning),
55 encryption=gcp.storage.BucketEncryptionArgs(
56 default_kms_key_name="projects/my-project/locations/global/keyRings/my-ring/cryptoKeys/my-key"
57 ),
58 )
59 return bucket.url
60 
61 
62def create_storage(provider: str, name: str) -> pulumi.Output[str]:
63 factories = {"aws": AWSBucket, "gcp": GCPBucket}
64 return factories[provider]().create(name)
65 

Secrets Management

Never hardcode secrets. Here are production patterns for handling them:

python
1import pulumi
2from pulumi_aws import secretsmanager, ssm
3 
4config = pulumi.Config()
5 
6# Store secrets in AWS Secrets Manager
7db_secret = secretsmanager.Secret("db-credentials")
8db_secret_version = secretsmanager.SecretVersion(
9 "db-credentials-v1",
10 secret_id=db_secret.id,
11 secret_string=config.require_secret("db_password"),
12)
13 
14# Use SSM Parameter Store for non-secret config
15api_endpoint = ssm.Parameter(
16 "api-endpoint",
17 type="String",
18 name="/production/api/endpoint",
19 value="https://api.example.com",
20)
21 
22# Reference secrets in other resources without exposing values
23ecs_task = aws.ecs.TaskDefinition(
24 "api-task",
25 container_definitions=pulumi.Output.all(
26 db_secret.arn, api_endpoint.name
27 ).apply(lambda args: json.dumps([{
28 "name": "api",
29 "image": "api:latest",
30 "secrets": [
31 {"name": "DB_PASSWORD", "valueFrom": args[0]},
32 ],
33 "environment": [
34 {"name": "API_ENDPOINT", "valueFrom": args[1]},
35 ],
36 }])),
37)
38 

CI/CD Pipeline Integration

Python IaC integrates smoothly into CI/CD. Here's a GitHub Actions workflow for Pulumi:

yaml
1name: Infrastructure
2on:
3 push:
4 branches: [main]
5 paths: ['infra/**']
6 pull_request:
7 paths: ['infra/**']
8 
9jobs:
10 preview:
11 if: github.event_name == 'pull_request'
12 runs-on: ubuntu-latest
13 steps:
14 - uses: actions/checkout@v4
15 - uses: actions/setup-python@v5
16 with:
17 python-version: '3.12'
18 
19 - name: Install dependencies
20 run: |
21 cd infra
22 pip install -r requirements.txt
23
24 - uses: pulumi/actions@v5
25 with:
26 command: preview
27 work-dir: infra
28 stack-name: staging
29 comment-on-pr: true
30 env:
31 PULUMI_ACCESS_TOKEN: ${{ secrets.PULUMI_ACCESS_TOKEN }}
32 AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
33 AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
34 
35 deploy:
36 if: github.ref == 'refs/heads/main' && github.event_name == 'push'
37 runs-on: ubuntu-latest
38 environment: production
39 steps:
40 - uses: actions/checkout@v4
41 - uses: actions/setup-python@v5
42 with:
43 python-version: '3.12'
44 
45 - name: Install dependencies
46 run: |
47 cd infra
48 pip install -r requirements.txt
49
50 - uses: pulumi/actions@v5
51 with:
52 command: up
53 work-dir: infra
54 stack-name: production
55 env:
56 PULUMI_ACCESS_TOKEN: ${{ secrets.PULUMI_ACCESS_TOKEN }}
57 AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
58 AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
59 

Performance and Scale Considerations

At scale (500+ resources per stack), Python IaC performance becomes relevant:

  • Pulumi parallelism: Set pulumi up --parallel 50 to control concurrent resource operations. Default is 10.
  • Large stacks: Split into micro-stacks using StackReference for cross-stack outputs. One stack per service boundary.
  • State size: Pulumi's state grows linearly with resources. At 2,000+ resources, consider splitting stacks — state operations start exceeding 30 seconds.
  • Import performance: Python's import time matters. Lazy-import cloud provider modules when defining multiple stacks in a monorepo.
python
1# Stack references for cross-stack dependencies
2network_stack = pulumi.StackReference("org/networking/production")
3vpc_id = network_stack.get_output("vpc_id")
4subnet_ids = network_stack.get_output("private_subnet_ids")
5 
6# Use in downstream resources
7db = DatabaseCluster("prod-db", vpc_id=vpc_id, subnet_ids=subnet_ids)
8 

Common Pitfalls

1. Ignoring state drift. Run pulumi refresh before every pulumi up in CI. Manual console changes will cause conflicts otherwise.

2. Not using protect on stateful resources. Databases, S3 buckets with data, and DNS zones should always have protect=True:

python
1db = aws.rds.Cluster(
2 "production-db",
3 # ... config
4 opts=pulumi.ResourceOptions(protect=True),
5)
6 

3. Circular dependencies. Pulumi detects these at runtime. If you hit them, use depends_on explicitly or restructure your component hierarchy.

4. Leaking secrets in state. Always use pulumi.Output.secret() for sensitive values. Regular outputs are stored in plaintext in state.

Conclusion

Python brings the full power of a general-purpose language to infrastructure management. Whether you choose Pulumi for its native Python experience, CDKTF to leverage existing Terraform investments, or AWS CDK for AWS-specific workloads, the patterns remain consistent: type-safe definitions, testable components, and CI/CD integration.

The key advantage isn't just syntax preference — it's that infrastructure code becomes indistinguishable from application code in your development workflow. The same review processes, testing frameworks, and quality tools apply. Teams that adopt Python for IaC consistently report faster onboarding and fewer production incidents from infrastructure changes.

Start with a single stack managing one service. Get comfortable with the state model, learn the component patterns, then expand. The migration from HCL to Python pays off once you need conditional logic, complex data transformations, or cross-stack orchestration that YAML and HCL handle awkwardly.

FAQ

Need expert help?

Building with CI/CD pipelines?

I help teams ship production-grade systems. From architecture review to hands-on builds.

Muneer Puthiya Purayil

SaaS Architect & AI Systems Engineer. 10+ years shipping production infrastructure across fintech, automotive, e-commerce, and healthcare.

Engage

Start a
Conversation.

For teams building at scale: SaaS platforms, agentic AI systems, and enterprise mobile infrastructure. Scope and fit are evaluated before any engagement begins.

Limited availability · Q3 / Q4 2026