一、编写Python脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
[root@lidabai ~] # vim harbor_clearimage.py # -*- coding:utf-8 -*- import requests from requests.auth import HTTPBasicAuth from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry import os import time import logging from logging.handlers import RotatingFileHandler class Harbor( object ): def __init__( self , api_url, api_user, api_passwd, tag_num, proj_exclude): self .api_url = api_url self .api_user = api_user self .api_passwd = api_passwd self .api_auth = HTTPBasicAuth( self .api_user, self .api_passwd) self .tag_num = int (tag_num) self .proj_exclude = proj_exclude self .proj_url = self .api_url + "/projects" self .repos_url = self .api_url + "/repositories" self .header_dict = { 'User-Agent' : 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.80 Safari/537.36' , 'Content-Type' : 'application/x-www-form-urlencoded' } self .deldata = [] self .session = requests.Session() self .session.auth = self .api_auth retry = Retry(connect = 3 , backoff_factor = 1 ) adapter = HTTPAdapter(max_retries = retry) self .session.mount( 'http://' , adapter) self .session.keep_alive = False def soft_del_repos( self ): try : projresp = self .session.get( self .proj_url, headers = self .header_dict) if projresp.status_code = = 200 : projdata = projresp.json() for proj in projdata: if proj[ 'name' ] not in self .proj_exclude # and proj['name'] == "gxjxhwebtest-28003" : try : reporesp = self .session.get( self .repos_url, params = { "project_id" : proj[ 'project_id' ]} , headers = self .header_dict) if reporesp.status_code = = 200 : repodata = reporesp.json() for repo in repodata: if repo[ "tags_count" ] > self .tag_num: tag_url = self .repos_url + "/" + repo[ 'name' ] + "/tags" tags = self .session.get(tag_url).json() tagdata = sorted (tags, key = lambda a: a[ "created" ]) del_tags = tagdata[ 0 : len (tagdata) - self .tag_num] for tag in del_tags: del_repo_tag_url = tag_url + "/" + tag[ 'name' ] cmd = 'curl -v -X DELETE -u "' + self .api_user + ": " + self.api_passwd + '" - H "accept: application/json" ' + del_repo_tag_url try : #del_resp = self.session.delete(del_repo_tag_url,headers=self.header_dict) ok = os.system(cmd) if ok = = 0 : logging.info( "httpdel:" + del_repo_tag_url ) _deldata = { "project_id" :proj[ 'project_id' ], "project_name" :proj[ 'name' ], "repo_name" :repo[ 'name' ], "tag_name" :tag[ 'name' ]} self .deldata.append(_deldata) logging.info( "httpdel project_id=" + str (proj[ 'project_id' ]) + ",project_name=" + proj[ 'name' ] + ",repo_name=" + repo[ 'name' ] + ",tag_name=" + tag[ 'name' ]) else : logging.error( "exec_cmd fail:" + cmd ) except : logging.error( "exec_cmd fail:" + cmd ) except : logging.error( "httpget fail:" + self .repos_url) else : logging.error( "httpget fail:" + self .proj_url ) except : logging.error( "apilogin fail:" + self .api_url ) return self .deldata def hard_del_repo( self ): pwd_cmd = "cd /dcos/app/harbor/ " #进入到Harbor安装目录 stop_cmd = "docker-compose stop" #停止Harbor服务 del_cmd = "docker run -it --name gc --rm --volumes-from registry goharbor/registry-photon:v2.7.1-patch-2819-v1.8.6 garbage-collect /etc/registryctl/config.yml" start_cmd = "docker-compose start" #启动Harbor服务 os.system(pwd_cmd) ok1 = os.system(stop_cmd) if ok1 = = 0 : time.sleep( 10 ) ok2 = os.system(del_cmd) ok3 = os.system(start_cmd) if ok3 = = 0 : logging.info( "hard_del_repo ok:" ) else : logging.error( "hard_del_repo fail:" ) if __name__ = = "__main__" : Rthandler = RotatingFileHandler( 'harbor_repo_clear.log' , maxBytes = 10 * 1024 * 1024 ,backupCount = 5 ) logging.basicConfig(level = logging.INFO) formatter = logging.Formatter( '%(levelname)s %(asctime)s %(process)d %(thread)d %(pathname)s %(filename)s %(funcName)s[line:%(lineno)d] %(message)s' ) Rthandler.setFormatter(formatter) logging.getLogger('').addHandler(Rthandler) api_url = "http://192.168,2,66:443/api" #Harbor服务的API URL api_user = "admin" #超级管理员 api_passwd = "Harbor12345" #超级管理员的用户密码 tag_num = 20 #保留的tag数量 proj_exclude = [ "library" ] harborClient = Harbor(api_url,api_user,api_passwd,tag_num,proj_exclude) data = harborClient.soft_del_repos() if len (data) > 0 : #harborClient.hard_del_repos() logging.info( "hard_del_repo:" ) |
二、执行Python脚本
1
|
[root@lidabai ~] # python harbor_clearimage.py |
到此这篇关于Python实现Harbor私有镜像仓库垃圾自动化清理详情的文章就介绍到这了,更多相关 Harbor垃圾清理内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://blog.51cto.com/lidabai/5320660