Coverage for gpc/helpers/gitlab_helper.py: 55%

149 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-02 09:39 +0000

1# Standard Library 

2import re 

3 

4from typing import Iterator 

5from typing import Optional 

6from typing import Union # pylint: disable=unused-import 

7 

8# Third Party Libraries 

9import boltons.cacheutils 

10import gitlab.const 

11 

12from boltons.urlutils import URL 

13from boltons.urlutils import parse_url 

14from gitlab import Gitlab # pylint: disable=unused-import 

15from gitlab.exceptions import GitlabGetError 

16from gitlab.v4.objects import Group as GitlabGroup 

17from gitlab.v4.objects import GroupProject as GitlabGroupProject 

18from gitlab.v4.objects import Project as GitlabProject 

19from wcmatch.fnmatch import fnmatch 

20 

21# Gitlab-Project-Configurator Modules 

22from gpc.helpers.exceptions import GpcUserError 

23from gpc.helpers.types import ProjectName 

24from gpc.helpers.types import Url 

25 

26 

27cache_users = boltons.cacheutils.LRI(10000) 

28cache_users_id = boltons.cacheutils.LRI(10000) 

29cache_groups = boltons.cacheutils.LRI(10000) 

30cache_subgroups = boltons.cacheutils.LRI(10000) 

31cache_allgroups = boltons.cacheutils.LRI(10000) 

32 

33VISIBILITY_VALUES = ["internal", "private", "public"] 

34MERGE_METHODS = ["merge", "rebase_merge", "ff"] 

35SQUASH_OPTIONS = { 

36 "do not allow": "never", 

37 "allow": "default_off", 

38 "encourage": "default_on", 

39 "require": "always", 

40} 

41INV_SQUASH_OPTIONS = { 

42 "never": "do not allow", 

43 "default_off": "allow", 

44 "default_on": "encourage", 

45 "always": "require", 

46} 

47 

48MAP_ACCESS = { 

49 "no one": 0, 

50 "none": 0, 

51 "maintainers": gitlab.const.MAINTAINER_ACCESS, 

52 "guests": gitlab.const.GUEST_ACCESS, 

53 "reporters": gitlab.const.REPORTER_ACCESS, 

54 "owners": gitlab.const.OWNER_ACCESS, 

55 "developers": gitlab.const.DEVELOPER_ACCESS, 

56 "admins": 60, 

57} 

58 

59MAP_ACCESS_REVERT = { 

60 0: "no one", 

61 gitlab.const.MAINTAINER_ACCESS: "maintainers", 

62 gitlab.const.GUEST_ACCESS: "guests", 

63 gitlab.const.REPORTER_ACCESS: "reporters", 

64 gitlab.const.OWNER_ACCESS: "owners", 

65 gitlab.const.DEVELOPER_ACCESS: "developers", 

66 60: "admins", 

67} 

68 

69 

70@boltons.cacheutils.cached(cache_users) 

71def get_user_by_username(gl: Gitlab, username): 

72 # In some cases Name and Username are different 

73 # jhon.smith may have jhon.slith1 as username, 

74 # we may use gl.users.list(search=...) in this case 

75 users = gl.users.list(username=username, retry_transient_errors=True) or gl.users.list( 

76 search=username, retry_transient_errors=True 

77 ) 

78 if users: 78 ↛ 82line 78 didn't jump to line 82 because the condition on line 78 was always true

79 # The username is an unique field 

80 return users[0] # type: ignore 

81 

82 raise GpcUserError(f"User {username} does not exist") 

83 

84 

85@boltons.cacheutils.cached(cache_users_id) 

86def get_user_by_id(gl: Gitlab, user_id): 

87 return gl.users.get(user_id, retry_transient_errors=True) 

88 

89 

90@boltons.cacheutils.cached(cache_groups) 

91def get_group(gl: Gitlab, group_path): 

92 return gl.groups.get(group_path, retry_transient_errors=True) 

93 

94 

95@boltons.cacheutils.cached(cache_subgroups) 

96def _get_subgroups(gl: Gitlab, group_path): 

97 group = get_group(gl, group_path) 

98 subgroups = [] 

99 if group.shared_with_groups: 

100 subgroups = [x.get("group_full_path") for x in group.shared_with_groups] 

101 return subgroups 

102 

103 

104@boltons.cacheutils.cached(cache_allgroups) 

105def get_subgroups(gl: Gitlab, group_path): 

106 all_groups = [] 

107 subgroups = _get_subgroups(gl, group_path) 

108 if not subgroups: 

109 return [] 

110 all_groups.extend(subgroups) 

111 for subgroup in subgroups: 

112 all_groups.extend(_get_subgroups(gl, subgroup)) 

113 return all_groups 

114 

115 

116def clean_gitlab_project_name(project_name_or_url: Union[ProjectName, Url]) -> ProjectName: 

117 if project_name_or_url.startswith("https://"): 

118 o = parse_url(project_name_or_url) 

119 project_name = o["path"] 

120 else: 

121 project_name = project_name_or_url 

122 project_name = project_name.strip("/").lower() 

123 if project_name.endswith(".git"): 

124 project_name = project_name[:-4] 

125 return project_name 

126 

127 

128def is_archived_project(gl: Gitlab, project_path): 

129 gl_project = gl.projects.get(project_path) 

130 return gl_project.archived 

131 

132 

133def is_shared_project(project, group): 

134 return group.full_path in (sg["group_full_path"] for sg in project.shared_with_groups) 

135 

136 

137def is_existing_project(gl: Gitlab, project_path): 

138 try: 

139 gl.projects.get(project_path) 

140 return True 

141 except GitlabGetError: 

142 return False 

143 

144 

145def is_existing_group(gl: Gitlab, group_path): 

146 try: 

147 gl.groups.get(group_path) 

148 return True 

149 except GitlabGetError: 

150 return False 

151 

152 

153def is_bot_user_for_project_member(name): 

154 """ 

155 Check if a member name has the format: project_{project_id}_bot_{random_string} 

156 

157 Parameters: 

158 name (str): The member name to check. 

159 

160 Returns: 

161 bool: True if the name matches the pattern, False otherwise. 

162 """ 

163 # See format here 

164 # https://docs.gitlab.com/ee/user/project/settings/project_access_tokens.html#bot-users-for-projects 

165 pattern = r"^project_\d+_bot_[a-zA-Z0-9]+$" 

166 return bool(re.match(pattern, name)) 

167 

168 

169def remove_creds_in_url(url: str) -> str: 

170 """Remove the credential section of a URL.""" 

171 uurl = URL(url) 

172 uurl.username = None 

173 uurl.password = None 

174 return uurl.to_text() 

175 

176 

177def url_path_relative_to(url: str, url_base: str) -> str: 

178 """Return the path of a URL relative to another one. 

179 

180 It automatically ignores user credentials in the urls, and also abstract the 

181 schemes (http/https). 

182 

183 Arguments: 

184 url: the candidate URL to compute the relative path from 

185 url_base: base URL (with a path section if wanted) to compute against. 

186 

187 Returns: 

188 The relative path of the candiate URL (so without any leading '/') 

189 if the base url belong to the candidate URL. 

190 If both URL are on different host, it will return the untouched candidate URL. 

191 """ 

192 orig_url = url 

193 url = remove_creds_in_url(url) 

194 url_base = remove_creds_in_url(url_base) 

195 url = url.replace("https://", "http://").strip("/") 

196 url_base = url_base.replace("https://", "http://").strip("/") 

197 if not url.startswith(url_base): 

198 return orig_url 

199 return url[len(url_base) :].strip("/") 

200 

201 

202def maybe_get_project( 

203 gl: Gitlab, 

204 path: str, 

205 lazy=False, 

206) -> Optional[GitlabProject]: 

207 """ 

208 Retrieve a project by its name or id or return None without failing. 

209 """ 

210 try: 

211 return gl.projects.get( 

212 id=path, 

213 lazy=lazy, 

214 ) 

215 except GitlabGetError: 

216 pass 

217 return None 

218 

219 

220def maybe_get_group( 

221 gl: Gitlab, 

222 path: str, 

223 lazy=False, 

224) -> Optional[GitlabGroup]: 

225 """ 

226 Retrieve a group by its name or id or return None without failing. 

227 """ 

228 try: 

229 return gl.groups.get( 

230 id=path, 

231 lazy=lazy, 

232 ) 

233 except GitlabGetError: 

234 pass 

235 return None 

236 

237 

238def walk_gitlab_projects( 

239 gl: Gitlab, 

240 full_path: str, 

241) -> Iterator[GitlabProject]: 

242 

243 base_path = URL(full_path).path.strip("/") 

244 before_globbing, _, pattern_match = base_path.partition("*") 

245 pattern_match = "*" + pattern_match 

246 

247 fixed_base, _, project_or_group_chunk = before_globbing.rpartition("/") 

248 if not fixed_base: 

249 raise ValueError("Cannot search from root with globbing, you need at minimum a group !") 

250 if project_or_group_chunk: 

251 pattern_match = project_or_group_chunk + pattern_match 

252 

253 yield from _walk_gitlab_projects( 

254 gl=gl, 

255 fixed_base=fixed_base, 

256 pattern_match=pattern_match, 

257 base_path=fixed_base, 

258 ) 

259 

260 

261def _walk_gitlab_projects( 

262 gl: Gitlab, 

263 fixed_base: str, 

264 pattern_match: str, 

265 base_path: str, 

266) -> Iterator[GitlabProject]: 

267 base_path = base_path.strip() 

268 project = maybe_get_project(gl, fixed_base, lazy=False) 

269 if project: 

270 yield project.path_with_namespace 

271 return 

272 

273 group_base = maybe_get_group(gl, fixed_base, lazy=False) 

274 if not group_base: 

275 raise ValueError(f"Find base group: {fixed_base}") 

276 

277 project_candidate: "GitlabGroupProject" 

278 for project_candidate in group_base.projects.list( # type: ignore 

279 all=True, 

280 as_list=False, 

281 lazy=False, 

282 ): 

283 if pattern_match: 

284 project_path_rel_to_base_path = url_path_relative_to( 

285 project_candidate.path_with_namespace, 

286 base_path, 

287 ) 

288 if not fnmatch(project_path_rel_to_base_path, pattern_match): 

289 continue 

290 # The shared projects of a group are excluded 

291 if is_shared_project(project_candidate, group_base): 

292 continue 

293 if project_candidate.archived: 

294 continue 

295 

296 gitlab_project_path = project_candidate.path_with_namespace 

297 if gitlab_project_path: 

298 yield gitlab_project_path 

299 

300 for subgrp in group_base.subgroups.list(all=True, lazy=True, as_list=False): 

301 if pattern_match: 

302 subgroup_path_rel_to_base_path = url_path_relative_to( 

303 subgrp.full_path, 

304 base_path, 

305 ) 

306 if not fnmatch(subgroup_path_rel_to_base_path, pattern_match.partition("/")[0]): 

307 continue 

308 yield from _walk_gitlab_projects( 

309 gl=gl, 

310 fixed_base=subgrp.full_path, 

311 pattern_match=pattern_match, 

312 base_path=base_path, 

313 )