RepoManager¶
The RepoManager
class is the main interface for managing multiple Git repositories, performing high-performance clone operations, running blame analysis, and extracting commit history. It is designed for asynchronous use, allowing you to efficiently manage and analyze many repositories in parallel.
The RepoManager
is implemented in Rust for maximum performance, with Python bindings via PyO3.
Note: All major methods of
RepoManager
are asynchronous and should be awaited. You can use Python'sasyncio
or any compatible event loop to run these methods.
Overview¶
- Manage a set of repositories (clone, track status, cleanup)
- Perform bulk blame operations on files
- Extract commit history from repositories
- All operations are designed to be non-blocking and scalable
Initialization¶
from GitFleet import RepoManager
# List of repository URLs to manage
urls = [
"https://github.com/owner/repo1.git",
"https://github.com/owner/repo2.git",
]
# Create a RepoManager instance
manager = RepoManager(urls, github_username="your-username", github_token="your-token")
Methods¶
clone_all()¶
Clones all repositories configured in this manager instance asynchronously.
fetch_clone_tasks()¶
Fetches the current status of all cloning tasks asynchronously. Returns a dictionary mapping repository URLs to RustCloneTask
objects, which include a RustCloneStatus
.
Return Type: Dict[str, RustCloneTask]
# Get raw Rust objects
rust_tasks = await manager.fetch_clone_tasks()
for url, task in rust_tasks.items():
print(f"{url}: {task.status.status_type}")
# Or convert to Pydantic models for additional features
from GitFleet import convert_clone_tasks
pydantic_tasks = convert_clone_tasks(rust_tasks)
for url, task in pydantic_tasks.items():
print(f"Task: {url}")
print(f"Status: {task.status.status_type}")
print(f"JSON: {task.model_dump_json()}")
clone(url)¶
Clones a single repository specified by URL asynchronously.
bulk_blame(repo_path, file_paths)¶
Performs 'git blame' on multiple files within a cloned repository asynchronously. Returns a dictionary mapping file paths to blame results.
blame_results = await manager.bulk_blame("/path/to/repo", ["file1.py", "file2.py"])
for file, lines in blame_results.items():
print(f"Blame for {file}:", lines)
extract_commits(repo_path)¶
Extracts commit data from a cloned repository asynchronously. Returns a list of commit dictionaries.
cleanup()¶
Cleans up all temporary directories created for cloned repositories. Returns a dictionary with repository URLs as keys and cleanup results as values (True for success, or an error message).
cleanup_results = manager.cleanup()
for url, result in cleanup_results.items():
print(f"Cleanup for {url}: {result}")
Simple Example¶
import asyncio
from GitFleet import RepoManager
async def main():
urls = ["https://github.com/owner/repo1.git"]
manager = RepoManager(urls, github_username="username", github_token="token")
await manager.clone_all()
tasks = await manager.fetch_clone_tasks()
print(tasks)
await manager.bulk_blame("/path/to/repo", ["file1.py"])
await manager.extract_commits("/path/to/repo")
manager.cleanup()
asyncio.run(main())
Advanced Example: Managing Multiple Repositories¶
import asyncio
from GitFleet import RepoManager
async def main():
urls = [
"https://github.com/owner/repo1.git",
"https://github.com/owner/repo2.git",
"https://github.com/owner/repo3.git",
]
manager = RepoManager(urls, github_username="username", github_token="token")
await manager.clone_all()
clone_tasks = await manager.fetch_clone_tasks()
for url, task in clone_tasks.items():
print(f"{url}: {task.status.status_type}")
# Run blame on all files in all repos (example)
for url, task in clone_tasks.items():
if task.temp_dir:
blame = await manager.bulk_blame(task.temp_dir, ["main.py", "utils.py"])
print(f"Blame for {url}: {blame}")
# Extract commits
for url, task in clone_tasks.items():
if task.temp_dir:
commits = await manager.extract_commits(task.temp_dir)
print(f"Commits for {url}: {len(commits)} found.")
# Cleanup
results = manager.cleanup()
print("Cleanup results:", results)
asyncio.run(main())
Working with Pydantic Models¶
GitFleet provides Pydantic models that mirror the Rust objects returned by RepoManager
. These models add serialization, validation, and other Pydantic features:
import asyncio
from GitFleet import RepoManager
from GitFleet import to_pydantic_task, to_pydantic_status, convert_clone_tasks
async def main():
# Create a repo manager
manager = RepoManager(urls=["https://github.com/owner/repo.git"],
github_username="username", github_token="token")
# Get tasks from the manager (returns Rust objects)
rust_tasks = await manager.fetch_clone_tasks()
# Convert individual tasks to Pydantic models
for url, task in rust_tasks.items():
pydantic_task = to_pydantic_task(task)
# Now you can use Pydantic features
task_json = pydantic_task.model_dump_json(indent=2)
print(task_json)
# Convert a status to Pydantic
pydantic_status = to_pydantic_status(task.status)
print(pydantic_status.model_dump())
# Or convert all tasks at once
pydantic_tasks = convert_clone_tasks(rust_tasks)
print(f"Converted {len(pydantic_tasks)} tasks to Pydantic models")
asyncio.run(main())
For details on the Rust objects RustCloneTask
and RustCloneStatus
, and their Pydantic counterparts PydanticCloneTask
and PydanticCloneStatus
, see the CloneTask and CloneStatus documentation pages.