获取k8s中pod日志上传oss输出到企业微信 //目的: 一键获取Pod日志,方便给其他人查看 //方式: //1.通过client-go获取pod日志 //2.将pod日志上传值oss //3.将返回的链接输出到企业微信 //注意点: //1.参考kubectl log源码去写的 //2.上传值Oss信息PutOSSFile函数我涂抹了,需要自己写
package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"io"
"io/ioutil"
"k8s.io/client-go/util/homedir"
"log"
"math"
"net/http"
"os"
"path/filepath"
"strings"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"time"
)
//企业微信数据结构定义markdown
type MessageMD struct {
MsgType string `json:"msgtype"`
Markdown struct {
Content string `json:"content"`
Mentioned_list []string `json:"mentioned_list"`
} `json:"markdown"`
}
//参数列表,目的是为了根据需求进行排除,例如不监控特定namespace和特定ns空间的信息
var (
url = flag.String("url", "", "请输入企业微信webhook地址")
ns = flag.String("ns", "", "请输入命名空间名称,n1,n2")
n = flag.String("n", "", "请输入Pod名称,可以只写deployment名称,n1,n2")
follow = flag.Bool("f", false, "是否需要指定日志为流模式")
timestamps = flag.Bool("t", false, "是否输出时间戳")
tail = flag.Int64("l", -1, "请输入想显示最后多少行数据")
sincetime = flag.String("st", "", "请输入日志起始时间,时间格式为2021-12-31T11:10:19")
since = flag.String("s", "", "请指定某段时间内的日志, 5s, 2m, or 3h")
container = flag.String("c", "", "请指定pod中容器名称")
kubeconfig = flag.String("k", "", "请输入kubeconfig路径")
)
//设置logOption
var logOptions = &apiv1.PodLogOptions{}
func main() {
//默认参数解析
flag.Parse()
//设置默认logOption
SetlogOption()
//获取日志主函数
GetPodLog()
}
//资源监控主函数
func GetPodLog() {
var err error
var config *rest.Config
var pod apiv1.Pod
var podIsExist bool
// 初始化 rest.Config 对象
if config, err = rest.InClusterConfig(); err != nil {
if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
CheckError("kubeconfig配置文件出错: ", err)
}
}
// 创建 Clientset 对象
clientset, err := kubernetes.NewForConfig(config)
CheckError("获取clientset出错:", err)
ctx := context.Background()
list, err := clientset.CoreV1().Pods(*ns).List(ctx, metav1.ListOptions{})
CheckError("获取Podlist出错:", err)
for _, v := range list.Items {
//暂时返回第一个匹配的进行处理,还没想好怎么解决
if strings.Contains(v.Name, *n) {
pod = v
nsn := v.Namespace + "/" + v.Name
fmt.Println("该pod名称:", nsn)
podIsExist = true
break
}
}
if !podIsExist {
s := "命名空间:" + *ns
CheckError(s, errors.New("没有这个Pod,或者Pod名输入错误!"))
}
//设置默认获取第一个容器地址
if len(*container) == 0 {
// [CONTAINER] (container as arg not flag) is supported as legacy behavior. See PR #10519 for more details.
if len(pod.Spec.Containers) != 1 {
podContainersNames := []string{}
for _, container := range pod.Spec.Containers {
podContainersNames = append(podContainersNames, container.Name)
}
} else {
container = &pod.Spec.Containers[0].Name
}
}
//这里container获取contanier的值
logOptions.Container = *container
//真正获取pod的日志了
stream, err := clientset.CoreV1().Pods(*ns).GetLogs(pod.Name, logOptions).Stream(context.TODO())
CheckError("打开pod日志出错", err)
defer stream.Close()
buf := new(bytes.Buffer)
_, err = io.Copy(buf, stream)
CheckError("拷贝pod日志信息出错", err)
str := buf.String()
//fmt.Println(str)
//上传至oss返回地址
logOss, err := PutOSSFile(pod.Namespace, pod.Name, str)
CheckError("上传Oss地址出错", err)
timenow := time.Now().Format("2006-01-02 15:04:05")
urladdr := "[链接地址](" + logOss + ")"
context := "Pod日志获取信息:" + "\nPod名称: " + "**" + pod.Name + "**" + "\n命名空间: " + pod.Namespace + "\n现在时间: " + timenow + "\n日志地址: " + urladdr
//设置默认webhoo地址
if len(*url) == 0 {
CheckError("请输入webhook地址",errors.New(""))
//url1 := "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxx"
//url = &url1
}
SendMessage(*url, context)
}
//上传pod文件
func PutOSSFile(namespace, podname, body string) (string, error) {
timenow := time.Now().Format("2006-01-02_15-04-05")
endpoint := "xxx"
accessKeyId := "yyy"
accessKeySecret := "zzz"
bucketName := "ddd"
client1 := &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
},
}
client, err := oss.New(endpoint, accessKeyId, accessKeySecret, oss.HTTPClient(client1))
bucket, err := client.Bucket(bucketName)
if err != nil {
return "", err
}
objectName := "eee/" + namespace + "-" + podname + "_" + timenow + ".log"
err = bucket.PutObject(objectName, bytes.NewReader([]byte(body)))
if err != nil {
return "", err
}
downloadurl := `https://ffff/` + objectName
return downloadurl, nil
}
//错误检查,以及退出
func CheckError(s string, err error) {
if err == nil {
return
}
fmt.Println(s, err.Error())
os.Exit(1)
}
//k8s源码摘抄
//https://github.com/kubernetes/Kubernetes/blob/v1.7.0-alpha.4/pkg/api/helper/helpers.go#L366
func ParseRFC3339(s string, nowFn func() metav1.Time) (metav1.Time, error) {
if t, timeErr := time.Parse(time.RFC3339Nano, s); timeErr == nil {
return metav1.Time{Time: t}, nil
}
t, err := time.Parse(time.RFC3339, s)
if err != nil {
return metav1.Time{}, err
}
return metav1.Time{Time: t}, nil
}
func TimeNow() metav1.Time {
return metav1.Time{time.Now()}
}
//logOption 默认设置
func SetlogOption() {
var err error
//正则匹配pod名称
if *n == "" {
CheckError("请通过-n podname指定pod", errors.New(""))
}
if *ns == "" {
*ns = "default"
}
//为了方便自己测试添加的
var configfile string
//configfile = "D:\\config"
if len(*kubeconfig) != 0 {
_, err = os.Stat(*kubeconfig)
CheckError("指定的kubeconfig不存在", err)
*kubeconfig = configfile
} else if len(configfile) != 0 {
*kubeconfig = configfile
} else {
home := homedir.HomeDir()
if home != "" {
*kubeconfig = filepath.Join(home, ".kube", "config")
}
}
//设置日志参数
logOptions.Follow = *follow //是否开启持续输出
logOptions.Timestamps = *timestamps //是否打开时间戳
//时间相关参数设置
var sinceSeconds time.Duration
//同时设置就会报错
if len(*sincetime) > 0 && len(*since) > 0 {
CheckError("只能指定-s,-st其中之一", errors.New(""))
}
//如果都没有设置,就设置默认5分钟
if len(*sincetime) == 0 && len(*since) == 0 {
sinceSeconds, err = time.ParseDuration("5m")
CheckError("since默认解析5m出错", err)
}
//如果since设置,则解析
if len(*since) != 0 {
sinceSeconds, err = time.ParseDuration(*since)
CheckError("since指定时间格式不正确", err)
}
//如果sincetime设置,则默认+8小时
if len(*sincetime) != 0 {
if !strings.Contains(*sincetime, "+") {
*sincetime = *sincetime + "+08:00"
}
}
if sinceSeconds > 0 {
// round up to the nearest second
sec := int64(math.Ceil(float64(sinceSeconds) / float64(time.Second)))
logOptions.SinceSeconds = &sec
}
//解析起始时间
if ts, err := ParseRFC3339(*sincetime, TimeNow); err == nil {
logOptions.SinceTime = &ts
}
//赋值获取行数,默认是全部
if *tail > 0 {
logOptions.TailLines = tail
}
}
//发送信息到企业微信webhook+开机缓起作用
func SendMessage(url, msg string) {
var m MessageMD
m.MsgType = "markdown"
m.Markdown.Content = msg
jsons, err := json.Marshal(m)
if err != nil {
log.Println(err)
return
}
resp := string(jsons)
client := &http.Client{
Timeout: 10 * time.Second,
}
req, err := http.NewRequest("POST", url, strings.NewReader(resp))
if err != nil {
log.Println(err)
return
}
req.Header.Set("Content-Type", "application/json")
r, err := client.Do(req)
if err != nil {
log.Println(err)
return
}
defer r.Body.Close()
_, err = ioutil.ReadAll(r.Body)
if err != nil {
log.Println(err)
return
}
}