前言
kylin是一个开源的OLAP分析引擎,具有亚秒级查询大表的能力
通过kylin提供的cube预构建功能,省去了不断写sql查询hive的麻烦,强化了任务统一管理和结果快速呈现的效果
kylin官网: https://kylin.apache.org/cn/
任务
当kylin集群比较大,和有多个kylin集群时,说明cube也越来越多,几百上千个cube便是常用便饭了
这些任务的运行就成了难题,人工去界面上点点点完全不实现了。此时就需要做成自动化周期性的任务
因为官方没有提供Go的客户端,只提供了http的api请求。下列例子使用Go中的http包来实现自动化任务
自动化实现
初始化
使用第三方http包(HttpRequest)来做http相关的请求,该包支持GET,POST,DELETE,PUT等四种请求方法,正好完全满足请求kylin的要求
1
2
3
4
5
6
7
8
9
10
11
12
|
var ( url = "http://ip:7070/kylin/" username = "ADMIN" password = "Password" req *HttpRequest.Request ) func init() { req = HttpRequest.NewRequest().Debug( false ).SetTimeout(time.Second* 5 ). SetHeaders( map [ string ] string { "Content-Type" : "application/json;charset=utf-8" , }).SetBasicAuth(username, password) } |
cube提交build
该方法接收三个参数,需要构建的cube名称,以及开始时间戳和结束时间戳
调用示例:
cubeBuild("dwd_jd_order","1637193600000","1637280000000")
时间戳获取方法,在第6小节
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func cubeBuild(cube,startTime,endTime string ) { m := map [ string ] string { "startTime" : startTime, "endTime" : endTime, "buildType" : "BUILD" , } resp, err := req.JSON().Put(url+ "api/cubes/" +cube+ "/build" , m) if err != nil { fmt. Println ( "cube构建请求错误: " , err) } if resp.StatusCode() != 200 { fmt. Println ( "cube构建状态码不符期望: " ,resp.StatusCode()) } } |
cube运行结果检查
检查cube运行结果,是成功还是失败了,还提供一个重新构建开关,如果cube失败,调用重构
kylin job检查接口属性说明
jobSearchMode
搜索模式(检查点和cubeing两种) ALL所有模式的数据
limit
限制返回条数
offset
位置(0是从第一条开始)
status
状态类型(8是错误类型,0是new,1是pending,2是running,32是stopped,4是finished,16是discarded)
timeFilter
时间范围过滤(1是一天,2是一周,3是一月,4是一年,5是全部)
调用示例: jobCheck(false)
为什么要在检查里面调重构方法,是因为重构cube需要拿到uuid,但uuid只能在这个接口中获取到,且uuid不是固定的,需要运cube运行后才可得到
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func jobCheck(resumeSwitch bool ) { resp, err := req.Get(url+ "api/jobs?jobSearchMode=ALL&limit=15&offset=0&status=8&timeFilter=1" ) if err != nil { fmt. Println ( "job检查请求错误: " , err) } if resp.StatusCode() != 200 { fmt. Println ( "job检查状态码不符期望: " ,resp.StatusCode()) } body, _ := resp.Body() var i interface {} json.Unmarshal(body,&i) uuid, err := jmespath.Search( "[0].uuid" , i) if err != nil { fmt. Println ( "search err: " ,err) } fmt. Println (uuid) if resumeSwitch { cubeResume( "uuid" ) } } |
重构cube
重构cube在job失败后,自动构建非常有用,避免人工频繁介入到这些工作中,是自动化中关键一步
调用示例: cubeResume("uuid")
1
2
3
4
5
6
7
8
9
|
func cubeResume(uuid string ) { resp, err := req.Put(url+ "api/jobs/" +uuid+ "/resume" ) if err != nil { fmt. Println ( "cube重新build请求错误: " , err) } if resp.StatusCode() != 200 { fmt. Println ( "cube重新build状态码不符期望: " ,resp.StatusCode()) } } |
历史job清理
kylin在运行一段时间后,就会产生很多冗余,且时需要周期性的清理这些历史job
调用示例: jobHistoryDelete("uuid")
需要先检查job,获取uuid,然后再删除历史job
1
2
3
4
5
6
7
8
9
|
func jobHistoryDelete(uuid string ) { resp, err := req. Delete (url+ "api/jobs/" +uuid+ "/drop" ) if err != nil { fmt. Println ( "历史job清理请求错误: " , err) } if resp.StatusCode() != 200 { fmt. Println ( "历史job清理状态码不符期望: " ,resp.StatusCode()) } } |
时间戳
kylin要求的时间毫秒,这里使用纳秒时间戳方法除一下就得到了毫秒
1
2
3
4
5
6
7
8
9
10
11
12
|
func timestamp() { year := time.Now().Year() month := time.Now().Month() day := time.Now().Day() //今天的时间戳 today := time.Date(year, month, day, 8 , 0 , 0 , 0 , time.Local).UnixNano() / 1e6 fmt. Println (today) //昨天的时间戳 iDay := time.Now().AddDate( 0 , 0 , - 1 ).Day() yesterday := time.Date(year, month, iDay, 8 , 0 , 0 , 0 , time.Local).UnixNano() / 1e6 fmt. Println (yesterday) } |
小结
以上方法配合定时任务,就可以实现kylin自动化运维工作了,当然kylin官网还提供了更多接口,有需求的同学可以看看
传送门: https://kylin.apache.org/cn/docs31/howto/howto_use_restapi.html
更多关于Go语言kylin任务自动化的资料请关注服务器之家其它相关文章!
原文链接:https://juejin.cn/post/7035129079816781861