#!/usr/bin/env python3 """ STACK – Local Repository Agent with nomic-embed-text Auto‑bootstraps venv + dependencies on first run. """ import os import sys import json import math import time import threading import hashlib import subprocess import venv from pathlib import Path from typing import Dict, List, Optional, Tuple, Any # ============================================================================ # 0. VENV BOOTSTRAP – prevent “externally-managed” errors # ============================================================================ STACK_DIR = Path(__file__).resolve().parent VENV_DIR = STACK_DIR / ".stack_venv" def is_venv() -> bool: """Check if we are already running inside a virtual environment.""" return ( hasattr(sys, 'real_prefix') or (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix) ) def create_venv(): """Create the virtual environment and return the path to its Python binary.""" print("[*] Creating virtual environment...") venv.create(str(VENV_DIR), with_pip=True) if os.name == 'nt': python = VENV_DIR / 'Scripts' / 'python.exe' else: python = VENV_DIR / 'bin' / 'python' return str(python) def install_dependencies(python_exe: str): """Install required packages inside the virtual environment.""" packages = ["ollama"] print(f"[*] Installing dependencies inside venv: {packages}") subprocess.check_call([python_exe, "-m", "pip", "install", "--upgrade", "pip"]) subprocess.check_call([python_exe, "-m", "pip", "install"] + packages) def relaunch_in_venv(python_exe: str): """Replace the current process with one running inside the venv.""" print("[*] Relaunching inside virtual environment...") os.execv(python_exe, [python_exe, __file__] + sys.argv[1:]) # Bootstrap logic if not is_venv(): print("[!] Not running in a virtual environment.") if VENV_DIR.exists(): # Venv exists – use it if os.name == 'nt': python_exe = str(VENV_DIR / 'Scripts' / 'python.exe') else: python_exe = str(VENV_DIR / 'bin' / 'python') if not Path(python_exe).exists(): print("[!] Venv appears corrupt, recreating...") python_exe = create_venv() install_dependencies(python_exe) relaunch_in_venv(python_exe) else: # First run – create venv, install deps, relaunch python_exe = create_venv() install_dependencies(python_exe) relaunch_in_venv(python_exe) # ============================================================================ # Now safely inside the venv – import ollama # ============================================================================ import ollama # ============================================================================ # 2. CONFIGURATION & PATHS # ============================================================================ STACK_ROOT = os.path.abspath("./stack_system") WORKSPACE = os.path.join(STACK_ROOT, "current_workspace") TEMPLATES_DIR = os.path.join(STACK_ROOT, "templates") MANIFEST_PATH = os.path.join(TEMPLATES_DIR, "manifest.json") MEMORY_FILE = os.path.join(STACK_ROOT, ".stack_memory.json") MAX_TOOL_DEPTH = 10 os.makedirs(WORKSPACE, exist_ok=True) os.makedirs(TEMPLATES_DIR, exist_ok=True) _manifest_lock = threading.RLock() # ============================================================================ # 3. OLLAMA CONNECTION & MODEL MANAGEMENT # ============================================================================ def check_ollama_running() -> bool: try: ollama.list() return True except Exception: return False def start_ollama() -> bool: try: subprocess.Popen(["ollama", "serve"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) time.sleep(3) return check_ollama_running() except Exception: return False def ensure_models() -> None: print("[*] Verifying Ollama models...") if not check_ollama_running(): print("[!] Ollama not running. Attempting to start...") if not start_ollama(): print("[!] Could not start Ollama. Please run 'ollama serve' manually.") sys.exit(1) required_models = ["nomic-embed-text"] try: installed = ollama.list() installed_names = [m.model for m in installed.models] if hasattr(installed, 'models') else [] except Exception: installed_names = [] for model in required_models: if model not in installed_names: print(f"[*] Pulling {model}...") try: ollama.pull(model) print(f"[✓] {model} ready") except Exception as e: print(f"[!] Failed to pull {model}: {e}") sys.exit(1) def get_embedding(text: str) -> Optional[List[float]]: try: response = ollama.embed(model="nomic-embed-text", input=text) if hasattr(response, 'embeddings'): embeddings = response.embeddings elif isinstance(response, dict): embeddings = response.get('embeddings', []) else: return None if embeddings and len(embeddings) > 0: return embeddings[0] if isinstance(embeddings[0], list) else embeddings[0] return None except Exception as e: print(f"⚠️ Embedding error: {e}") return None # ============================================================================ # 4. VECTOR SIMILARITY & MEMORY SYSTEM # ============================================================================ def cosine_similarity(v1: List[float], v2: List[float]) -> float: if not v1 or not v2 or len(v1) != len(v2): return 0.0 dot = sum(a * b for a, b in zip(v1, v2)) mag1 = math.sqrt(sum(a * a for a in v1)) mag2 = math.sqrt(sum(b * b for b in v2)) if mag1 == 0 or mag2 == 0: return 0.0 return dot / (mag1 * mag2) def remember_action(action_summary: str, metadata: Dict = None) -> None: memory = [] if os.path.exists(MEMORY_FILE): try: with open(MEMORY_FILE, 'r') as f: memory = json.load(f) except json.JSONDecodeError: memory = [] vector = get_embedding(action_summary) if vector: entry = { "text": action_summary, "vector": vector, "timestamp": time.time(), "metadata": metadata or {} } memory.append(entry) if len(memory) > 1000: memory = memory[-1000:] with open(MEMORY_FILE, 'w') as f: json.dump(memory, f) def query_memory(query: str, top_k: int = 3) -> List[str]: if not os.path.exists(MEMORY_FILE): return [] try: with open(MEMORY_FILE, 'r') as f: memory = json.load(f) except (json.JSONDecodeError, IOError): return [] query_vector = get_embedding(query) if not query_vector: return [] scored = [] for item in memory: if "vector" in item and item["vector"]: score = cosine_similarity(query_vector, item["vector"]) scored.append((score, item["text"])) scored.sort(key=lambda x: x[0], reverse=True) return [text for score, text in scored[:top_k] if score > 0.3] # ============================================================================ # 5. TEMPLATE MANAGEMENT & HOT RELOAD # ============================================================================ DEFAULT_TEMPLATES = { "python_web_server": { "description": "minimal fastapi python web server with api routes", "files": { "app.py": "from fastapi import FastAPI\n\napp = FastAPI()\n\n@app.get('/')\ndef root():\n return {'status': 'STACK running'}\n", "requirements.txt": "fastapi\nuvicorn" } }, "html_login_component": { "description": "modern tailwind css login form ui component", "files": { "login.html": """

Login

""" } } } def seed_initial_templates() -> None: if os.path.exists(MANIFEST_PATH): return print("[*] Seeding initial templates...") manifest = {} for name, data in DEFAULT_TEMPLATES.items(): vector = get_embedding(data["description"]) if vector: manifest[name] = { "description": data["description"], "vector": vector, "files": data["files"] } print(f" ✓ {name}") with _manifest_lock: with open(MANIFEST_PATH, 'w') as f: json.dump(manifest, f, indent=2) def scan_templates_folder() -> Dict: discovered = {} if not os.path.exists(TEMPLATES_DIR): return discovered for item in os.listdir(TEMPLATES_DIR): item_path = os.path.join(TEMPLATES_DIR, item) if not os.path.isdir(item_path): continue files_dict = {} description = f"code template for {item.replace('_', ' ')}" for root, _, files in os.walk(item_path): for filename in files: if filename == "description.txt": try: with open(os.path.join(root, filename), 'r') as f: description = f.read().strip() except Exception: pass continue file_path = os.path.join(root, filename) rel_path = os.path.relpath(file_path, item_path) try: with open(file_path, 'r', encoding='utf-8') as f: files_dict[rel_path] = f.read() except Exception: pass if files_dict: discovered[item] = { "description": description, "files": files_dict } return discovered def reload_manifest() -> None: raw_templates = scan_templates_folder() with _manifest_lock: manifest = {} if os.path.exists(MANIFEST_PATH): try: with open(MANIFEST_PATH, 'r') as f: manifest = json.load(f) except json.JSONDecodeError: manifest = {} updated = False for name, data in raw_templates.items(): if name not in manifest or manifest[name]["description"] != data["description"]: vector = get_embedding(data["description"]) if vector: manifest[name] = { "description": data["description"], "vector": vector, "files": data["files"] } updated = True print(f" ✓ Updated template: {name}") if updated: with open(MANIFEST_PATH, 'w') as f: json.dump(manifest, f, indent=2) def start_template_watcher() -> threading.Thread: def watcher(): last_mtime = {} while True: time.sleep(2) try: current = {} if os.path.exists(TEMPLATES_DIR): for root, _, files in os.walk(TEMPLATES_DIR): for f in files: path = os.path.join(root, f) current[path] = os.path.getmtime(path) if current != last_mtime: reload_manifest() last_mtime = current except Exception: pass thread = threading.Thread(target=watcher, daemon=True) thread.start() return thread # ============================================================================ # 6. SANDBOXED GIT & FILE OPERATIONS # ============================================================================ def validate_workspace_path(target_path: str) -> bool: abs_target = os.path.abspath(os.path.join(WORKSPACE, target_path)) abs_workspace = os.path.abspath(WORKSPACE) return abs_target.startswith(abs_workspace) def run_git_safe(args: List[str], error_ok: bool = False) -> Tuple[bool, str]: try: result = subprocess.run( ["git"] + args, cwd=WORKSPACE, capture_output=True, text=True, timeout=30 ) if result.returncode == 0: return True, result.stdout or "OK" elif error_ok: return False, result.stderr else: return False, result.stderr except subprocess.TimeoutExpired: return False, "Git operation timed out" except Exception as e: return False, str(e) def cmd_build(repo_name: str) -> str: global WORKSPACE new_workspace = os.path.join(STACK_ROOT, repo_name) try: os.makedirs(new_workspace, exist_ok=True) WORKSPACE = new_workspace success, msg = run_git_safe(["init"]) if not success: return f"Git init failed: {msg}" success, msg = run_git_safe(["checkout", "-b", "main"]) config_path = os.path.join(WORKSPACE, "STACK.json") with open(config_path, 'w') as f: json.dump({ "name": repo_name, "created": time.time(), "managed_by": "STACK" }, f) run_git_safe(["add", "."]) run_git_safe(["commit", "-m", "chore: initial STACK bootstrap"]) remember_action(f"Created repository '{repo_name}'", {"action": "build", "repo": repo_name}) return f"✓ Repository '{repo_name}' created at {WORKSPACE}" except Exception as e: return f"✗ Build failed: {e}" def cmd_add(search_query: str) -> str: workspace_git = os.path.join(WORKSPACE, ".git") if not os.path.exists(workspace_git): return "✗ No active repository. Use '/build ' first" with _manifest_lock: if not os.path.exists(MANIFEST_PATH): return "✗ No templates available. Run '/import' or add to templates folder" try: with open(MANIFEST_PATH, 'r') as f: manifest = json.load(f) except Exception: return "✗ Corrupted manifest file" if not manifest: return "✗ Manifest is empty. Seed templates first" query_vector = get_embedding(search_query) if not query_vector: return "✗ Could not generate embedding for query" best_match = None best_score = -1.0 for name, data in manifest.items(): if "vector" in data: score = cosine_similarity(query_vector, data["vector"]) if score > best_score: best_score = score best_match = (name, data["files"], data["description"]) if best_score < 0.35: return f"✗ No good match (confidence: {best_score:.2f}). Try different phrasing" template_name, files, description = best_match branch_name = f"stack/{template_name}" success, msg = run_git_safe(["checkout", "-b", branch_name], error_ok=True) files_written = [] for filename, content in files.items(): target = os.path.join(WORKSPACE, filename) if not validate_workspace_path(filename): continue os.makedirs(os.path.dirname(target), exist_ok=True) with open(target, 'w', encoding='utf-8') as f: f.write(content) files_written.append(filename) run_git_safe(["add", "."]) commit_msg = f"feat: add {template_name} via STACK\n\n{description}" run_git_safe(["commit", "-m", commit_msg]) run_git_safe(["checkout", "main"]) run_git_safe(["merge", branch_name]) remember_action(f"Added template '{template_name}' to repository", {"action": "add", "template": template_name, "files": files_written}) return f"✓ Added '{template_name}' (confidence: {best_score:.2f})\n Files: {', '.join(files_written)}" def cmd_import(json_path: str) -> str: if not os.path.exists(json_path): return f"✗ File not found: {json_path}" try: with open(json_path, 'r', encoding='utf-8') as f: new_templates = json.load(f) except Exception as e: return f"✗ Invalid JSON: {e}" with _manifest_lock: manifest = {} if os.path.exists(MANIFEST_PATH): try: with open(MANIFEST_PATH, 'r') as f: manifest = json.load(f) except Exception: pass imported = 0 for name, data in new_templates.items(): if "description" not in data or "files" not in data: continue vector = get_embedding(data["description"]) if vector: manifest[name] = { "description": data["description"], "vector": vector, "files": data["files"] } imported += 1 with open(MANIFEST_PATH, 'w') as f: json.dump(manifest, f, indent=2) return f"✓ Imported {imported} templates from {json_path}" def cmd_status() -> str: with _manifest_lock: manifest_count = 0 if os.path.exists(MANIFEST_PATH): try: with open(MANIFEST_PATH, 'r') as f: manifest_count = len(json.load(f)) except Exception: pass has_git = os.path.exists(os.path.join(WORKSPACE, ".git")) lines = [ f"STACK Status", f" Workspace: {WORKSPACE}", f" Git initialized: {'✓' if has_git else '✗'}", f" Templates in manifest: {manifest_count}", f" Ollama: {'✓' if check_ollama_running() else '✗'}" ] return "\n".join(lines) def cmd_list_templates() -> str: with _manifest_lock: if not os.path.exists(MANIFEST_PATH): return "No templates found" try: with open(MANIFEST_PATH, 'r') as f: manifest = json.load(f) except Exception: return "Error reading manifest" if not manifest: return "No templates in manifest" lines = ["Available templates:"] for name, data in manifest.items(): desc = data.get("description", "No description")[:60] lines.append(f" • {name}: {desc}") return "\n".join(lines) # ============================================================================ # 7. CLI INTERFACE # ============================================================================ def print_banner() -> None: banner = """ ███████╗████████╗ █████╗ ██████╗██╗ ██╗ ██╔════╝╚══██╔══╝██╔══██╗██╔════╝██║ ██╔╝ ███████╗ ██║ ███████║██║ █████╔╝ ╚════██║ ██║ ██╔══██║██║ ██╔═██╗ ███████║ ██║ ██║ ██║╚██████╗██║ ██╗ ╚══════╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝╚═╝ ╚═╝ Embed-Driven Repository Architect """ print(banner) def print_commands() -> None: print(""" Commands: /build - Create new git repository workspace /add - Find matching template, inject into workspace /import - Import external template bundle /list - List available templates /status - Show system status /help - Show this help /quit - Exit STACK Templates folder: ./stack_system/templates/ - Add folders with code + description.txt for hot-reload - JSON imports also supported """) def main() -> None: print("[*] Initializing STACK...") ensure_models() seed_initial_templates() start_template_watcher() print_banner() print_commands() while True: try: user_input = input("\nSTACK > ").strip() if not user_input: continue if user_input.lower() in ["/quit", "/exit", "quit", "exit"]: print("[*] Goodbye!") break if user_input == "/help": print_commands() continue if user_input == "/status": print(cmd_status()) continue if user_input == "/list": print(cmd_list_templates()) continue if user_input.startswith("/build "): repo_name = user_input[7:].strip() if repo_name: print(cmd_build(repo_name)) else: print("Usage: /build ") continue if user_input.startswith("/add "): query = user_input[5:].strip() if query: print(cmd_add(query)) else: print("Usage: /add ") continue if user_input.startswith("/import "): path = user_input[8:].strip() if path: print(cmd_import(path)) else: print("Usage: /import ") continue print(f"Unknown command. Type /help for available commands.") except KeyboardInterrupt: print("\n[*] Interrupted. Goodbye!") break except EOFError: break except Exception as e: print(f"Error: {e}") if __name__ == "__main__": main()