-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmanager.py
More file actions
239 lines (191 loc) · 7.77 KB
/
manager.py
File metadata and controls
239 lines (191 loc) · 7.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
#!/usr/bin/env python
import os
from git import Repo
from git.repo.fun import is_git_dir
from git.repo.fun import read_gitfile
import csv
import tempfile
import shutil
from ConfigParser import SafeConfigParser
class Manager:
def __init__(self, repo_db):
self.repo_db = repo_db
self.load_repo_db()
def load_repo_db(self):
parser = SafeConfigParser()
parser.readfp(open(self.repo_db))
path = parser.get('setting', 'worktree_root')
self.worktree_root = os.path.realpath(os.path.expanduser(path))
path = parser.get('setting', 'git_dirs_root')
self.repo_root = os.path.realpath(os.path.expanduser(path))
self.saved_repos = []
for section in parser.sections():
if section[0:5] != 'repo ':
continue
path = section[5:]
self.saved_repos.append((path, parser.get(section, 'url')))
def list_repos(self, verbose=False):
if verbose:
return self.list_repos_verbose()
max_length = max([len(path) for path, url in self.saved_repos])
for path, url in self.saved_repos:
path = path.ljust(max_length)
print " ".join((path, url))
def list_repos_verbose(self):
for path, url in self.saved_repos:
print path
print '\t[remote url]:', url
try:
git_dir = Repo(path).git_dir
print '\t [git_dir]:', Repo(path).git_dir
except:
print '\t [git_dir]: not sotred'
def check_stored(self):
for path, url in self.saved_repos:
try:
git_dir = Repo(path).git_dir
except:
git_dir = None
if not git_dir:
print 'repo path %s is not stored.' % (path)
def iter_unstored_repos(self):
for path, url in self.saved_repos:
try:
git_dir = Repo(path).git_dir
except:
yield (path, url)
def fix_unstored(self):
for path, url in self.iter_unstored_repos():
print '%s is not stored.' % (path)
parent_dir, tail = os.path.split(path)
repos_dir = os.path.join(self.repo_root, path)
parent_dir, dummy = os.path.split(repos_dir)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
if not self.check_dir_structure(parent_dir, self.repo_root):
raise Exception('Invalid Directory Structure!')
self.clone_unstored_repo(path, repos_dir, url)
def check_dir_structure(self, path, until):
while path != until:
if not os.path.isdir(path):
print '%s is not a directory!' % (path)
return False
if os.path.isdir(path) and is_git_repo(path):
print "Wrong Repositories Structure! %s is a git repository." % (path)
return False
path, dummy = os.path.split(path)
return True
def clone_unstored_repo(self, worktree_path, git_dir, url):
print 'clone repository %s from %s' % (worktree_path, url)
tmp_dir = tempfile.mkdtemp()
try:
kargs = {'separate-git-dir' : git_dir}
repo = Repo.clone_from(url, tmp_dir, None, **kargs)
workingdir = os.path.join(self.worktree_root, worktree_path)
repo.config_writer().set('core', 'worktree', workingdir)
except:
print 'clone is failed!'
finally:
shutil.rmtree(tmp_dir)
def clone_new_repo(self, path, url):
if any(p for p, url in self.saved_repos if p == path):
print 'You already cloned %s' % (path)
return
worktree_path = os.path.join(self.worktree_root, path)
git_dir = os.path.join(self.repo_root, path)
worktree_parent, tail = os.path.split(worktree_path)
if not self.check_dir_structure(worktree_parent, self.worktree_root):
return
git_dir_parent, tail = os.path.split(git_dir)
if not self.check_dir_structure(git_dir_parent, self.repo_root):
return
if os.path.exists(worktree_path):
print '%s is already exist' % (worktree_path)
return
if os.path.exists(git_dir):
print '%s is already exist' % (git_dir)
return
if not os.path.exists(worktree_parent):
os.makedirs(worktree_parent)
if not os.path.exists(git_dir_parent):
os.makedirs(git_dir_parent)
print 'clone %s to %s' % (url, path)
kargs = {'separate-git-dir' : git_dir}
Repo.clone_from(url, worktree_path, None, **kargs)
self.save_repo(url, path)
def save_repo(self, path, url):
parser = SafeConfigParser()
with open(self.repo_db) as f:
parser.readfp(f)
parser.add_section(path)
parser.set(path, 'url', url)
with open(self.repo_db, 'w') as f:
parser.write(f)
def is_git_repo(path):
git = os.path.join(path, '.git')
return os.path.exists(git)
def list_repos_old():
target_list = os.listdir('.')
while target_list:
dirname = target_list.pop(0)
if not os.path.isdir(dirname):
continue
if not is_git_repo(os.path.join(dirname, '.git')):
for next_dir in os.listdir(dirname):
target_list.append(os.path.join(dirname, next_dir))
continue
repo = Repo(dirname)
for remote in repo.remotes:
if remote.name == 'origin':
yield (dirname, remote.url)
def list_repos():
target_list = os.listdir('.')
while target_list:
dirname = target_list.pop(0)
if not os.path.isdir(dirname):
continue
gitfile = os.path.join(dirname, '.git')
if not os.path.isfile(gitfile):
for next_dir in os.listdir(dirname):
target_list.append(os.path.join(dirname, next_dir))
continue
if read_gitfile(gitfile, '.') is None:
continue
repo = Repo(dirname)
for remote in repo.remotes:
if remote.name == 'origin':
yield (dirname, remote.url)
def update_repos_list():
saved_repos = set(load_repos_list('repositories.csv'))
current_repos = set(list_repos())
writer = csv.writer(open('repositories.csv', 'w'))
for row in current_repos | saved_repos:
writer.writerow(row)
def remove():
saved_repos = set(load_repos_list('repositories.csv'))
current_repos = set(list_repos())
for path, url in current_repos - saved_repos:
repo = Repo(path)
print 'remove:', path, repo.git_dir
shutil.rmtree(repo.git_dir)
shutil.rmtree(path)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Git Repository Manager')
manager = Manager('repos.conf')
subparsers = parser.add_subparsers()
sub_parser = subparsers.add_parser('fix')
sub_parser.set_defaults(func= lambda args: manager.fix_unstored())
sub_parser = subparsers.add_parser('list')
sub_parser.add_argument('-v', '--verbose', action='store_true')
sub_parser.set_defaults(func= lambda args: manager.list_repos(args.verbose))
sub_parser = subparsers.add_parser('check')
sub_parser.set_defaults(func= lambda args: manager.check_stored())
sub_parser = subparsers.add_parser('convert')
sub_parser.set_defaults(func= lambda args: manager.convert_csv2conf())
sub_parser = subparsers.add_parser('clone')
sub_parser.add_argument('path')
sub_parser.add_argument('url')
sub_parser.set_defaults(func= lambda args: manager.clone_new_repo(args.path, args.url))
args = parser.parse_args()
args.func(args)