获取k8s中pod日志上传oss输出到企业微信


获取k8s中pod日志上传oss输出到企业微信 //目的: 一键获取Pod日志,方便给其他人查看 //方式: //1.通过client-go获取pod日志 //2.将pod日志上传值oss //3.将返回的链接输出到企业微信 //注意点: //1.参考kubectl log源码去写的 //2.上传值Oss信息PutOSSFile函数我涂抹了,需要自己写

package main

import (
    "bytes"
    "context"
    "encoding/json"
    "errors"
    "flag"
    "fmt"
    "github.com/aliyun/aliyun-oss-go-sdk/oss"
    "io"
    "io/ioutil"
    "k8s.io/client-go/util/homedir"
    "log"
    "math"
    "net/http"
    "os"
    "path/filepath"
    "strings"

    apiv1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
    "time"
)

//企业微信数据结构定义markdown
type MessageMD struct {
    MsgType  string `json:"msgtype"`
    Markdown struct {
        Content        string   `json:"content"`
        Mentioned_list []string `json:"mentioned_list"`
    } `json:"markdown"`
}

//参数列表,目的是为了根据需求进行排除,例如不监控特定namespace和特定ns空间的信息
var (
    url        = flag.String("url", "", "请输入企业微信webhook地址")
    ns         = flag.String("ns", "", "请输入命名空间名称,n1,n2")
    n          = flag.String("n", "", "请输入Pod名称,可以只写deployment名称,n1,n2")
    follow     = flag.Bool("f", false, "是否需要指定日志为流模式")
    timestamps = flag.Bool("t", false, "是否输出时间戳")
    tail       = flag.Int64("l", -1, "请输入想显示最后多少行数据")
    sincetime  = flag.String("st", "", "请输入日志起始时间,时间格式为2021-12-31T11:10:19")
    since      = flag.String("s", "", "请指定某段时间内的日志, 5s, 2m, or 3h")
    container  = flag.String("c", "", "请指定pod中容器名称")
    kubeconfig = flag.String("k", "", "请输入kubeconfig路径")
)

//设置logOption
var logOptions = &apiv1.PodLogOptions{}

func main() {
    //默认参数解析
    flag.Parse()
    //设置默认logOption
    SetlogOption()
    //获取日志主函数
    GetPodLog()

}

//资源监控主函数
func GetPodLog() {
    var err error
    var config *rest.Config
    var pod apiv1.Pod
    var podIsExist bool


    // 初始化 rest.Config 对象
    if config, err = rest.InClusterConfig(); err != nil {
        if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
            CheckError("kubeconfig配置文件出错: ", err)
        }
    }
    // 创建 Clientset 对象
    clientset, err := kubernetes.NewForConfig(config)
    CheckError("获取clientset出错:", err)

    ctx := context.Background()
    list, err := clientset.CoreV1().Pods(*ns).List(ctx, metav1.ListOptions{})
    CheckError("获取Podlist出错:", err)
    for _, v := range list.Items {
        //暂时返回第一个匹配的进行处理,还没想好怎么解决
        if strings.Contains(v.Name, *n) {
            pod = v
            nsn := v.Namespace + "/" + v.Name
            fmt.Println("该pod名称:", nsn)
            podIsExist = true
            break
        }
    }
    if !podIsExist {
        s := "命名空间:" + *ns
        CheckError(s, errors.New("没有这个Pod,或者Pod名输入错误!"))
    }

    //设置默认获取第一个容器地址
    if len(*container) == 0 {
        // [CONTAINER] (container as arg not flag) is supported as legacy behavior. See PR #10519 for more details.
        if len(pod.Spec.Containers) != 1 {
            podContainersNames := []string{}
            for _, container := range pod.Spec.Containers {
                podContainersNames = append(podContainersNames, container.Name)
            }
        } else {
            container = &pod.Spec.Containers[0].Name
        }
    }
    //这里container获取contanier的值
    logOptions.Container = *container

    //真正获取pod的日志了
    stream, err := clientset.CoreV1().Pods(*ns).GetLogs(pod.Name, logOptions).Stream(context.TODO())
    CheckError("打开pod日志出错", err)

    defer stream.Close()
    buf := new(bytes.Buffer)
    _, err = io.Copy(buf, stream)
    CheckError("拷贝pod日志信息出错", err)
    str := buf.String()
    //fmt.Println(str)

    //上传至oss返回地址
    logOss, err := PutOSSFile(pod.Namespace, pod.Name, str)
    CheckError("上传Oss地址出错", err)

    timenow := time.Now().Format("2006-01-02 15:04:05")
    urladdr := "[链接地址](" + logOss + ")"
    context := "Pod日志获取信息:" + "\nPod名称:  " + "**" + pod.Name + "**" + "\n命名空间:  " + pod.Namespace + "\n现在时间:  " + timenow + "\n日志地址: " + urladdr

    //设置默认webhoo地址
    if len(*url) == 0 {
        CheckError("请输入webhook地址",errors.New(""))
        //url1 := "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxx"
        //url = &url1
    }
    SendMessage(*url, context)
}

//上传pod文件
func PutOSSFile(namespace, podname, body string) (string, error) {
    timenow := time.Now().Format("2006-01-02_15-04-05")
    endpoint := "xxx"
    accessKeyId := "yyy"
    accessKeySecret := "zzz"
    bucketName := "ddd"
    client1 := &http.Client{
        Transport: &http.Transport{
            DisableKeepAlives: true,
        },
    }
    client, err := oss.New(endpoint, accessKeyId, accessKeySecret, oss.HTTPClient(client1))
    bucket, err := client.Bucket(bucketName)
    if err != nil {
        return "", err
    }
    objectName := "eee/" + namespace + "-" + podname + "_" + timenow + ".log"
    err = bucket.PutObject(objectName, bytes.NewReader([]byte(body)))
    if err != nil {
        return "", err
    }
    downloadurl := `https://ffff/` + objectName
    return downloadurl, nil
}

//错误检查,以及退出
func CheckError(s string, err error) {
    if err == nil {
        return
    }
    fmt.Println(s, err.Error())
    os.Exit(1)
}

//k8s源码摘抄
//https://github.com/kubernetes/Kubernetes/blob/v1.7.0-alpha.4/pkg/api/helper/helpers.go#L366
func ParseRFC3339(s string, nowFn func() metav1.Time) (metav1.Time, error) {
    if t, timeErr := time.Parse(time.RFC3339Nano, s); timeErr == nil {
        return metav1.Time{Time: t}, nil
    }
    t, err := time.Parse(time.RFC3339, s)
    if err != nil {
        return metav1.Time{}, err
    }
    return metav1.Time{Time: t}, nil
}
func TimeNow() metav1.Time {
    return metav1.Time{time.Now()}
}

//logOption 默认设置
func SetlogOption() {
    var err error
    //正则匹配pod名称
    if *n == "" {
        CheckError("请通过-n podname指定pod", errors.New(""))
    }

    if *ns == "" {
        *ns = "default"
    }

    //为了方便自己测试添加的
    var configfile string
    //configfile = "D:\\config"
    if len(*kubeconfig) != 0 {
        _, err = os.Stat(*kubeconfig)
        CheckError("指定的kubeconfig不存在", err)
        *kubeconfig = configfile
    } else if len(configfile) != 0 {
        *kubeconfig = configfile
    } else {
        home := homedir.HomeDir()
        if home != "" {
            *kubeconfig = filepath.Join(home, ".kube", "config")
        }
    }

    //设置日志参数
    logOptions.Follow = *follow   //是否开启持续输出
    logOptions.Timestamps = *timestamps //是否打开时间戳

    //时间相关参数设置
    var sinceSeconds time.Duration
    //同时设置就会报错
    if len(*sincetime) > 0 && len(*since) > 0 {
        CheckError("只能指定-s,-st其中之一", errors.New(""))
    }
    //如果都没有设置,就设置默认5分钟
    if len(*sincetime) == 0 && len(*since) == 0 {
        sinceSeconds, err = time.ParseDuration("5m")
        CheckError("since默认解析5m出错", err)
    }

    //如果since设置,则解析
    if len(*since) != 0 {
        sinceSeconds, err = time.ParseDuration(*since)
        CheckError("since指定时间格式不正确", err)
    }
    //如果sincetime设置,则默认+8小时
    if len(*sincetime) != 0 {
        if !strings.Contains(*sincetime, "+") {
            *sincetime = *sincetime + "+08:00"
        }
    }
    if sinceSeconds > 0 {
        // round up to the nearest second
        sec := int64(math.Ceil(float64(sinceSeconds) / float64(time.Second)))
        logOptions.SinceSeconds = &sec
    }
    //解析起始时间
    if ts, err := ParseRFC3339(*sincetime, TimeNow); err == nil {
        logOptions.SinceTime = &ts
    }

    //赋值获取行数,默认是全部
    if *tail > 0 {
        logOptions.TailLines = tail
    }

}

//发送信息到企业微信webhook+开机缓起作用
func SendMessage(url, msg string) {
    var m MessageMD
    m.MsgType = "markdown"
    m.Markdown.Content = msg
    jsons, err := json.Marshal(m)
    if err != nil {
        log.Println(err)
        return
    }
    resp := string(jsons)
    client := &http.Client{
        Timeout: 10 * time.Second,
    }
    req, err := http.NewRequest("POST", url, strings.NewReader(resp))
    if err != nil {
        log.Println(err)
        return
    }
    req.Header.Set("Content-Type", "application/json")
    r, err := client.Do(req)
    if err != nil {
        log.Println(err)
        return
    }
    defer r.Body.Close()
    _, err = ioutil.ReadAll(r.Body)
    if err != nil {
        log.Println(err)
        return
    }

}