User Guides¶
Configuring SDK for your project¶
acme-portal
VSCode
extension will look into .acme_portal_sdk
dot directory in root of your open workspace and look for python files:
flow_finder.py
deployment_finder.py
flow_deploy.py
deployment_promote.py
and attempt to find instances of any child classes of:
flow_finder.py
->FlowFinder
deployment_finder.py
->DeploymentFinder
flow_deploy.py
->DeployWorkflow
deployment_promote.py
->PromoteWorkflow
acme-portal
will then use them to delegate UI operations performed by the user in VSCode
extension to appropriate SDK
class. e.g. using Deploy
button in UI will trigger a call to DeployWorkflow.run
method etc.
Using platform-specific implementations¶
For specific platform implementations:
- Prefect: See the Prefect Support documentation for details on using the built-in Prefect implementations
- Airflow: See the Airflow Support documentation for details on using the built-in Airflow implementations
- GitHub Workflows: See the GitHub Workflows Guide for using GitHub Actions as a deployment provider
Creating Custom Workflow Implementations¶
The SDK provides base classes that you can extend to create custom implementations for your specific needs. Each base class can be configured using subclasses that implement sample functionality and return custom data.
Overview of Base Classes¶
The SDK defines four main base classes that need to be implemented:
FlowFinder
- Discovers flows in your codebaseDeploymentFinder
- Finds existing deploymentsDeployWorkflow
- Handles flow deployment operationsPromoteWorkflow
- Manages deployment promotion between environments
Custom FlowFinder Implementation¶
Create a custom FlowFinder
subclass to discover flows in your codebase:
# .acme-portal-sdk/flow_finder.py
from typing import List
from acme_portal_sdk.flow_finder import FlowFinder, FlowDetails
class CustomFlowFinder(FlowFinder):
"""Custom flow finder that discovers flows with custom metadata."""
def find_flows(self) -> List[FlowDetails]:
# Sample implementation that returns flows with custom metadata
flows = []
# Example: discover critical batch processing flows
flows.append(FlowDetails(
name="data_ingestion",
original_name="data-ingestion",
description="Daily data ingestion pipeline",
id="flow_001",
source_path="/src/pipelines/ingestion.py",
source_relative="pipelines/ingestion.py",
grouping=["data", "batch"],
child_attributes={
# Implementation-specific attributes go here
"obj_type": "function",
"obj_name": "run_data_ingestion",
"obj_parent_type": "module",
"obj_parent": "pipelines.ingestion",
"module": "pipelines.ingestion",
"import_path": "pipelines.ingestion",
# Custom metadata for your specific needs
"priority": 10, # High priority
"category": "critical",
}
))
# Example: discover standard processing flows
flows.append(FlowDetails(
name="data_transformation",
original_name="data-transformation",
description="Transform raw data into analytics format",
id="flow_002",
source_path="/src/pipelines/transform.py",
source_relative="pipelines/transform.py",
grouping=["data", "processing"],
child_attributes={
# Implementation-specific attributes go here
"obj_type": "function",
"obj_name": "transform_data",
"obj_parent_type": "module",
"obj_parent": "pipelines.transform",
"module": "pipelines.transform",
"import_path": "pipelines.transform",
# Custom metadata for your specific needs
"priority": 5, # Medium priority
"category": "standard",
}
))
return flows
# Create the instance
flow_finder = CustomFlowFinder()
Important: The child_attributes
field should be used to store any custom metadata or implementation-specific attributes like obj_type
, obj_name
, etc. Platform implementations like PrefectFlowFinder
and AirflowFlowFinder
automatically populate these attributes when discovering flows.
API Changes and Migration¶
For information about breaking API changes in v1.0.0 and detailed migration instructions, see the API Migration Guide.
The guide covers: - Complete breakdown of architectural changes - Migration instructions for different user types - Before/after code examples - Validation and testing guidance - Easier to integrate with different workflow platforms
Migration Guide for FlowDetails API Changes¶
Starting with version 1.0.0, the FlowDetails
class has been simplified to make several attributes optional that were previously required. This is a breaking change that affects how you work with FlowDetails objects.
What Changed¶
Previously required attributes that are now in child_attributes
:
- obj_type
- Type of object defining the flow (e.g., function, method)
- obj_name
- Name of the object defining the flow
- obj_parent_type
- Type of container for object defining the flow
- obj_parent
- Name of container for flow object
- module
- Module name where the flow is defined
- import_path
- Python import path to the source file
If You Use Prefect Implementation¶
If you're using the default PrefectFlowFinder
, no changes are required in your setup. The Prefect implementation automatically puts these attributes in child_attributes
when discovering flows.
To access these attributes from discovered flows:
from acme_portal_sdk.prefect.flow_finder import PrefectFlowFinder
finder = PrefectFlowFinder("path/to/flows")
flows = finder.find_flows()
for flow in flows:
# Access implementation-specific attributes via child_attributes
obj_type = flow.child_attributes.get("obj_type") # "function" or "method"
obj_name = flow.child_attributes.get("obj_name") # function/method name
module = flow.child_attributes.get("module") # module name
import_path = flow.child_attributes.get("import_path") # import path
If You Create FlowDetails Manually¶
If you manually create FlowDetails
objects, you need to update your code:
Before (v0.x):
flow = FlowDetails(
name="my_flow",
original_name="my-flow",
description="Example flow",
obj_type="function",
obj_name="my_function",
obj_parent_type="module",
obj_parent="my_module",
id="flow_123",
module="my_module",
source_path="/path/to/file.py",
source_relative="file.py",
import_path="my_module.file"
)
After (v1.0+):
flow = FlowDetails(
name="my_flow",
original_name="my-flow",
description="Example flow",
id="flow_123",
source_path="/path/to/file.py",
source_relative="file.py",
child_attributes={
"obj_type": "function",
"obj_name": "my_function",
"obj_parent_type": "module",
"obj_parent": "my_module",
"module": "my_module",
"import_path": "my_module.file"
}
)
Custom DeploymentFinder Implementation¶
Create a custom DeploymentFinder
subclass to discover deployments in your environment:
# .acme-portal-sdk/deployment_finder.py
from typing import List
from acme_portal_sdk.deployment_finder import DeploymentFinder, DeploymentDetails
class CustomDeploymentFinder(DeploymentFinder):
"""Custom deployment finder that discovers deployments with resource and region metadata."""
def get_deployments(self, project_name: str, branch_name: str, env: str) -> List[DeploymentDetails]:
# Sample implementation that returns deployments with custom metadata
deployments = []
# Example: production deployment with high resources
deployments.append(DeploymentDetails(
name=f"{project_name}--{branch_name}--data_ingestion--{env}",
project_name=project_name,
branch=branch_name,
flow_name="data_ingestion",
env=env,
commit_hash="abc123def456",
package_version="1.2.3",
tags=["production", "critical"],
id="deploy_001",
created_at="2024-01-15T10:30:00Z",
updated_at="2024-01-15T14:20:00Z",
flow_id="flow_001",
url="https://deployment-system.com/deployments/deploy_001",
child_attributes={
# Custom metadata for your specific needs
"region": "us-west-2", # Custom region
"cpu_limit": "4000m", # High CPU limit for production
"memory_limit": "8Gi",
"health_check_enabled": True,
}
))
# Example: development deployment with standard resources
if env == "dev":
deployments.append(DeploymentDetails(
name=f"{project_name}--{branch_name}--data_transformation--{env}",
project_name=project_name,
branch=branch_name,
flow_name="data_transformation",
env=env,
commit_hash="def456ghi789",
package_version="1.2.4-dev",
tags=["development", "testing"],
id="deploy_002",
created_at="2024-01-16T08:15:00Z",
updated_at="2024-01-16T09:45:00Z",
flow_id="flow_002",
url="https://deployment-system.com/deployments/deploy_002",
child_attributes={
# Custom metadata for your specific needs
"region": "us-east-1", # Standard region for dev
"cpu_limit": "1000m", # Lower CPU limit for dev
"memory_limit": "2Gi",
"health_check_enabled": False,
}
))
return deployments
# Create the instance
deployment_finder = CustomDeploymentFinder()
Custom DeployWorkflow Implementation¶
Both DeployWorkflow
and PromoteWorkflow
use flexible signatures (*args, **kwargs
) allowing custom implementations to accept additional parameters beyond the standard ones.
# .acme-portal-sdk/flow_deploy.py
from typing import Optional, Dict, Any, List
from acme_portal_sdk.flow_deploy import DeployWorkflow
class CustomDeployWorkflow(DeployWorkflow):
"""Custom deployment workflow with environment-specific configuration and notifications."""
def __init__(self, notification_webhook: Optional[str] = None):
self.notification_webhook = notification_webhook
def run(self, *args, **kwargs) -> Optional[str]:
# Extract standard parameters with flexible argument handling
flows = kwargs.get('flows_to_deploy', args[0] if args else [])
ref = kwargs.get('ref', args[1] if len(args) > 1 else 'main')
# Accept custom parameters
environment = kwargs.get('environment', 'dev')
config = kwargs.get('deployment_config', {})
dry_run = kwargs.get('dry_run', False)
notification_channels = kwargs.get('notification_channels', [])
# Sample deployment logic with custom functionality
deployment_id = f"deploy_{hash(f'{flows}_{ref}_{environment}')}"
print(f"Starting deployment {deployment_id}")
print(f" Flows: {flows}")
print(f" Reference: {ref}")
print(f" Environment: {environment}")
print(f" Configuration: {config}")
if dry_run:
print(" DRY RUN - No actual deployment performed")
return f"dry_run_{deployment_id}"
# Simulate environment-specific deployment steps
if environment == "prod":
print(" Performing production deployment with safety checks...")
# Add production-specific logic like approval workflows
self._validate_production_deployment(flows, config)
else:
print(f" Performing {environment} deployment...")
# Send notifications if configured
if notification_channels:
self._send_notifications(notification_channels, deployment_id, flows, environment)
print(f" Deployment {deployment_id} completed successfully")
return deployment_id
def _validate_production_deployment(self, flows: List[str], config: Dict[str, Any]):
"""Additional validation for production deployments."""
required_config = ["resource_limits", "health_checks", "rollback_strategy"]
missing = [key for key in required_config if key not in config]
if missing:
raise ValueError(f"Production deployment requires: {missing}")
def _send_notifications(self, channels: List[str], deployment_id: str, flows: List[str], environment: str):
"""Send deployment notifications to specified channels."""
message = f"Deployment {deployment_id} completed for flows {flows} in {environment}"
for channel in channels:
print(f" Notification sent to {channel}: {message}")
# Create the instance
deploy = CustomDeployWorkflow(notification_webhook="https://hooks.slack.com/...")
Custom PromoteWorkflow Implementation¶
# .acme-portal-sdk/deployment_promote.py
from typing import Optional, List, Dict, Any
from acme_portal_sdk.deployment_promote import PromoteWorkflow
class CustomPromoteWorkflow(PromoteWorkflow):
"""Custom promotion workflow with approval gates and validation."""
def __init__(self, require_approval: bool = True):
self.require_approval = require_approval
def run(self, *args, **kwargs) -> Optional[str]:
# Extract standard parameters with flexible argument handling
flows = kwargs.get('flows_to_deploy', args[0] if args else [])
source_env = kwargs.get('source_env', args[1] if len(args) > 1 else 'dev')
target_env = kwargs.get('target_env', args[2] if len(args) > 2 else 'prod')
ref = kwargs.get('ref', args[3] if len(args) > 3 else 'main')
# Accept custom parameters
project_name = kwargs.get('project_name', 'default-project')
branch_name = kwargs.get('branch_name', 'main')
auto_approve = kwargs.get('auto_approve', False)
validation_rules = kwargs.get('validation_rules', [])
# Sample promotion logic with custom functionality
promotion_id = f"promote_{hash(f'{flows}_{source_env}_{target_env}_{ref}')}"
print(f"Starting promotion {promotion_id}")
print(f" Flows: {flows}")
print(f" Source Environment: {source_env}")
print(f" Target Environment: {target_env}")
print(f" Reference: {ref}")
print(f" Project: {project_name}")
# Run custom validation rules
if validation_rules:
print(" Running validation rules...")
for rule in validation_rules:
self._run_validation_rule(rule, flows, source_env, target_env)
# Handle approval requirements
if self.require_approval and not auto_approve:
approval_status = self._request_approval(promotion_id, flows, source_env, target_env)
if not approval_status:
print(f" Promotion {promotion_id} rejected or timed out")
return None
# Simulate promotion steps
print(" Extracting source deployment configuration...")
source_config = self._get_source_deployment_config(flows, source_env)
print(" Applying target environment settings...")
target_config = self._apply_target_environment_settings(source_config, target_env)
print(" Creating target deployments...")
# Simulate deployment creation
for flow in flows:
print(f" Creating deployment for {flow} in {target_env}")
print(f" Promotion {promotion_id} completed successfully")
return promotion_id
def _run_validation_rule(self, rule: str, flows: List[str], source_env: str, target_env: str):
"""Run a custom validation rule."""
print(f" Validating rule: {rule}")
# Example validation logic
if rule == "security_scan" and target_env == "prod":
print(" Security scan passed")
elif rule == "performance_test":
print(" Performance test passed")
def _request_approval(self, promotion_id: str, flows: List[str], source_env: str, target_env: str) -> bool:
"""Request approval for the promotion (simulated)."""
print(f" Approval requested for promotion from {source_env} to {target_env}")
print(f" Flows requiring approval: {flows}")
# Simulate approval (in real implementation, this would integrate with approval systems)
return True
def _get_source_deployment_config(self, flows: List[str], source_env: str) -> Dict[str, Any]:
"""Extract configuration from source deployments."""
return {
"version": "1.2.3",
"commit_hash": "abc123def456",
"resource_limits": {"cpu": "1000m", "memory": "2Gi"},
"environment_vars": {"ENV": source_env}
}
def _apply_target_environment_settings(self, source_config: Dict[str, Any], target_env: str) -> Dict[str, Any]:
"""Apply target environment specific settings."""
config = source_config.copy()
config["environment_vars"]["ENV"] = target_env
# Apply environment-specific resource scaling
if target_env == "prod":
config["resource_limits"]["cpu"] = "4000m"
config["resource_limits"]["memory"] = "8Gi"
return config
# Create the instance
promote = CustomPromoteWorkflow(require_approval=True)
Usage Examples¶
Basic Usage¶
# Standard calls work with any implementation
deploy_workflow.run(["flow1", "flow2"], "main")
promote_workflow.run(["flow1"], "dev", "prod", "main")
Extended Usage with Custom Parameters¶
# Use additional parameters as needed
deploy_workflow.run(
flows_to_deploy=["data_ingestion", "data_transformation"],
ref="main",
environment="staging",
deployment_config={
"resource_limits": {"cpu": "2000m", "memory": "4Gi"},
"health_checks": {"enabled": True, "timeout": 30},
"rollback_strategy": "automatic"
},
dry_run=False,
notification_channels=["slack", "email"]
)
promote_workflow.run(
flows_to_deploy=["data_ingestion"],
source_env="staging",
target_env="prod",
ref="v1.2.3",
project_name="analytics-platform",
branch_name="main",
auto_approve=False,
validation_rules=["security_scan", "performance_test"]
)
Using GitHub Workflows for Deployment and Promotion¶
For detailed instructions on using GitHub Actions as a provider for DeployWorkflow
and PromoteWorkflow
, refer to the GitHub Workflows Guide.