//k8s集成企业微信通知第一版 //目的: 想将k8s的更新信息通知到企业微信里面 //现状: 目前是第一版,可以正常运行,可能存在bug,也需要进一步调优 //设计思路: //1.通过informer去获取k8s状态 //2.将service,pod,deployment,statefulset,job,crontab,sa,configmap,secret,ingress进行工厂实例化 //3.通过AddEventHandler去获取add,update,delete状态 //4.通过反射将判断资源类型,然后发送到企业微信通知 //5.update有这一块存在很多问题,一个资源更新可能不止更新一次,所以我通过map[key],占坑资源20s,然后释放。期间所有重复的key都不会重复添加了,但资源弹出是循序性的,所以使用goroutine //6.程序启动会同步所有信息,所以我的想法是hold住sendmessage 10s,此期间不发送任何数据 //7.pod新增的时候,延迟31秒,大于同步状态去获取pod是否正常启动。同样是也是hold10s不发信息。
package main
import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"k8s.io/client-go/util/homedir"
"log"
"net/http"
"os"
"path/filepath"
"reflect"
"strings"
"sync"
"time"
v1 "k8s.io/api/apps/v1"
v13 "k8s.io/api/batch/v1"
apiv1 "k8s.io/api/core/v1"
v12 "k8s.io/api/networking/v1"
"k8s.io/client-go/informers"
vv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
//保存resource状态
//string=namespace+deployment.name
//据我观察一次deployment apply,所有关联状态同步需要1s可行了,我想hold住20s内的数据,只保留发送最后一次状态
//当然你也可以将两个结构体简化为一个 UpdateResouceMap map[string] func(*v1.Deployment,time.Duration)
type UpdateResouceMap map[string] AcountTime
type AcountTime struct {
FirstTime time.Time //暂无意义
Trigger func(string,string, string,time.Duration) //触发函数
}
//初始化ResourceMap状态
var ResourceMap = make(UpdateResouceMap)
//并发锁
var mx sync.Mutex
//企业微信数据结构定义text
type Message struct {
MsgType string `json:"msgtype"`
Text struct {
Content string `json:"content"`
Mentioned_list []string `json:"mentioned_list"`
} `json:"text"`
}
//企业微信数据结构定义markdown
type MessageMD struct {
MsgType string `json:"msgtype"`
Markdown struct {
Content string `json:"content"`
Mentioned_list []string `json:"mentioned_list"`
} `json:"markdown"`
}
//计时器
//开机启动的时候会同步所有的信息,但我不想发送这部分数据,
//目前的办法是计算启动时间,程序启动10s内不发送任何数据,达到我的目的
//后面有好的想法再修改
var StartTime time.Time
var Resourcethreshold bool
var Podthreshold bool
//podinformer
var PodInformer vv1.PodInformer
//参数列表,目的是为了根据需求进行排除,例如不监控特定namespace和特定ns空间的信息
var (
url = flag.String("url", "", "请输入要发送的webhook地址")
ns = flag.String("ns", "", "输入需要排除监控的命名空间,n1,n2")
n = flag.String("n", "", "输入需要排除监控的资源名称,n1,n2")
kubeconfig = flag.String("k", "", "输入需要排除监控的资源名称,path")
NSArray []string
NArray []string
)
func main() {
flag.Parse()
//通过命令行输入src地址和目的地址
GetArray()
//webhook主函数
ResouceInformer()
}
//资源监控主函数
func ResouceInformer() {
var err error
var config *rest.Config
StartTime = time.Now()
var configfile string
//configfile = "G:\\xrp\\NT\\once\\config"
//configfile = "D:\\gitlab\\k8s\\config"
configfile = "G:\\xrp\\NT\\once\\config"
if len(*kubeconfig) != 0 {
_, err = os.Stat(*kubeconfig)
CheckError("指定的kubeconfig不存在", err)
*kubeconfig = configfile
} else if len(configfile) != 0 {
*kubeconfig = configfile
} else {
home := homedir.HomeDir()
if home != "" {
*kubeconfig = filepath.Join(home, ".kube", "config")
}
}
// 初始化 rest.Config 对象
if config, err = rest.InClusterConfig(); err != nil {
if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
CheckError("读取kubeconfig配置出错",err)
}
}
// 创建 Clientset 对象
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
CheckError("设置clinetset出错",err)
}
// 初始化 informer factory(为了测试方便这里设置每30s重新 List 一次)
informerFactory := informers.NewSharedInformerFactory(clientset, time.Second*30)
// 对 Deployment 监听
deployInformer := informerFactory.Apps().V1().Deployments()
// 创建 Informer(相当于注册到工厂中去,这样下面启动的时候就会去 List & Watch 对应的资源)
informer := deployInformer.Informer()
// 创建 Lister
//deployLister := deployInformer.Lister()
// 注册事件处理程序
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
UpdateFunc: onUpdate,
DeleteFunc: onDelete,
})
// 直接注册pod处理时间
PodInformer = informerFactory.Core().V1().Pods()
podinformer := PodInformer.Informer()
podinformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
UpdateFunc: onUpdate,
DeleteFunc: onDelete,
})
svcinformer := informerFactory.Core().V1().Services().Informer()
svcinformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
UpdateFunc: onUpdate,
DeleteFunc: onDelete,
})
cminformer := informerFactory.Core().V1().ConfigMaps().Informer()
cminformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
UpdateFunc: onUpdate,
DeleteFunc: onDelete,
})
secretinformer := informerFactory.Core().V1().Secrets().Informer()
secretinformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
UpdateFunc: onUpdate,
DeleteFunc: onDelete,
})
sainformer := informerFactory.Core().V1().ServiceAccounts().Informer()
sainformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
UpdateFunc: onUpdate,
DeleteFunc: onDelete,
})
stsinformer := informerFactory.Apps().V1().StatefulSets().Informer()
stsinformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
UpdateFunc: onUpdate,
DeleteFunc: onDelete,
})
ingressinformer := informerFactory.Networking().V1().Ingresses().Informer()
ingressinformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
UpdateFunc: onUpdate,
DeleteFunc: onDelete,
})
jobinformer := informerFactory.Batch().V1().Jobs().Informer()
jobinformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
UpdateFunc: onUpdate,
DeleteFunc: onDelete,
})
cronjobinformer := informerFactory.Batch().V1().CronJobs().Informer()
cronjobinformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
UpdateFunc: onUpdate,
DeleteFunc: onDelete,
})
stopper := make(chan struct{})
defer close(stopper)
// 启动 informer,List & Watch
informerFactory.Start(stopper)
// 等待所有启动的 Informer 的缓存被同步
informerFactory.WaitForCacheSync(stopper)
//从本地缓存中获取 default 中的所有 deployment 列表
//这个地方可以不写,直接到stopper结束也行
//deployments, err := deployLister.Deployments("default").List(labels.Everything())
//if err != nil {
// panic(err)
//}
//for idx, deploy := range deployments {
// fmt.Printf("%d -> %s\n", idx+1, deploy.Name)
//}
<-stopper
}
//处理添加时间,通过反射来判断增加类型,减少代码量
//添加自己想监控的服务,比如svc,pvc,sts等等
func onAdd(obj interface{}) {
switch obj.(type) {
case *v1.Deployment:
deploy := obj.(*v1.Deployment)
k8sSendMessage(deploy.Namespace,"Deployment",deploy.Name,"正在新增")
case *v1.DaemonSet:
deploy := obj.(*v1.DaemonSet)
k8sSendMessage(deploy.Namespace,"DaemonSet",deploy.Name,"正在新增")
case *apiv1.Pod:
pod := obj.(*apiv1.Pod)
go AddPodStatus(pod)
case *apiv1.ConfigMap:
cm := obj.(*apiv1.ConfigMap)
k8sSendMessage(cm.Namespace,"ConfigMap",cm.Name,"正在新增")
case *apiv1.Service:
svc := obj.(*apiv1.Service)
k8sSendMessage(svc.Namespace,"Service",svc.Name,"正在新增")
case *apiv1.Secret:
secret := obj.(*apiv1.Secret)
k8sSendMessage(secret.Namespace,"Secret",secret.Name,"正在新增")
case *apiv1.ServiceAccount:
sa := obj.(*apiv1.ServiceAccount)
k8sSendMessage(sa.Namespace,"ServiceAccount",sa.Name,"正在新增")
case *v1.StatefulSet:
sts := obj.(*v1.StatefulSet)
k8sSendMessage(sts.Namespace,"StatefulSet",sts.Name,"正在新增")
case *v12.Ingress:
ingress := obj.(*v12.Ingress)
k8sSendMessage(ingress.Namespace,"Ingress",ingress.Name,"正在新增")
case *v13.Job:
job := obj.(*v13.Job)
k8sSendMessage(job.Namespace,"Job",job.Name,"正在新增")
case *v13.CronJob:
cronjob := obj.(*v13.CronJob)
k8sSendMessage(cronjob.Namespace,"CronJob",cronjob.Name,"正在新增")
default:
fmt.Println("onadd 其他类型")
}
}
//onUpdate
// * OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
// were combined together, so you can't use this to see every single
// change. OnUpdate is also called when a re-list happens, and it will
// get called even if nothing changed. This is useful for periodically
// evaluating or syncing something.
//这里有两个很重要的层面
//第一点:即使你没有更新deployment,也会收到deployment变化的值,可能是re-list导致的,所以我这里通过reflect。deepEqual去判断底层值是否相同
//第二点:你会发现你更新一个deployment之后,不止一个deployment会触发,这个控制器逻辑的问题,没办法解决,所以我只能想办法合并一些数据,就是通过map去存key值。
//因为我发现触发几乎是同时的基本1s内,所以我要将第一个map[key]保持住15s(自己可以接受的范围),然后删除掉,就行了
//参考: https://github.com/kubernetes/client-go/issues/529
func onUpdate(old, new interface{}) {
switch new.(type) {
case *v1.Deployment:
oldDeploy := old.(*v1.Deployment)
newDeploy := new.(*v1.Deployment)
equal := reflect.DeepEqual(oldDeploy.ResourceVersion, newDeploy.ResourceVersion)
if !equal{
//没注意到取数据是按照顺序的,前面的处理完,才能处理后面的,所以直接丢给go携程处理
go UpdateDeployStatus(newDeploy)
}
case *v1.DaemonSet:
oldDeploy := old.(*v1.DaemonSet)
newDeploy := new.(*v1.DaemonSet)
equal := reflect.DeepEqual(oldDeploy.ResourceVersion, newDeploy.ResourceVersion)
if !equal{
//没注意到取数据是按照顺序的,前面的处理完,才能处理后面的,所以直接丢给go携程处理
go UpdateDaemonSetStatus(newDeploy)
}
//pod不存在更新的概念,就不更新了,可能存在pod状态异常,但是我想通过新增pod位置去获取,这里面,就不要了
case *apiv1.Pod:
//oldDeploy := old.(*apiv1.Pod)
//newDeploy := new.(*apiv1.Pod)
//equal := reflect.DeepEqual(oldDeploy.ResourceVersion, newDeploy.ResourceVersion)
//if !equal{
// //没注意到取数据是按照顺序的,前面的处理完,才能处理后面的,所以直接丢给go携程处理
// go UpdatePodStatus(newDeploy)
//}
case *apiv1.ConfigMap:
oldDeploy := old.(*apiv1.ConfigMap)
newDeploy := new.(*apiv1.ConfigMap)
equal := reflect.DeepEqual(oldDeploy.ResourceVersion, newDeploy.ResourceVersion)
if !equal{
//没注意到取数据是按照顺序的,前面的处理完,才能处理后面的,所以直接丢给go携程处理
go UpdateConfigMapStatus(newDeploy)
}
case *apiv1.Secret:
oldDeploy := old.(*apiv1.Secret)
newDeploy := new.(*apiv1.Secret)
equal := reflect.DeepEqual(oldDeploy.ResourceVersion, newDeploy.ResourceVersion)
if !equal{
//没注意到取数据是按照顺序的,前面的处理完,才能处理后面的,所以直接丢给go携程处理
go UpdateSecretStatus(newDeploy)
}
case *apiv1.Service:
oldDeploy := old.(*apiv1.Service)
newDeploy := new.(*apiv1.Service)
equal := reflect.DeepEqual(oldDeploy.ResourceVersion, newDeploy.ResourceVersion)
if !equal{
//没注意到取数据是按照顺序的,前面的处理完,才能处理后面的,所以直接丢给go携程处理
go UpdateServiceStatus(newDeploy)
}
case *apiv1.ServiceAccount:
oldDeploy := old.(*apiv1.ServiceAccount)
newDeploy := new.(*apiv1.ServiceAccount)
equal := reflect.DeepEqual(oldDeploy.ResourceVersion, newDeploy.ResourceVersion)
if !equal{
//没注意到取数据是按照顺序的,前面的处理完,才能处理后面的,所以直接丢给go携程处理
go UpdateSAStatus(newDeploy)
}
case *v1.StatefulSet:
oldDeploy := old.(*v1.StatefulSet)
newDeploy := new.(*v1.StatefulSet)
equal := reflect.DeepEqual(oldDeploy.ResourceVersion, newDeploy.ResourceVersion)
if !equal{
//没注意到取数据是按照顺序的,前面的处理完,才能处理后面的,所以直接丢给go携程处理
go UpdateStatefulsetStatus(newDeploy)
}
case *v12.Ingress:
oldDeploy := old.(*v12.Ingress)
newDeploy := new.(*v12.Ingress)
equal := reflect.DeepEqual(oldDeploy.ResourceVersion, newDeploy.ResourceVersion)
if !equal{
//没注意到取数据是按照顺序的,前面的处理完,才能处理后面的,所以直接丢给go携程处理
go UpdateInressStatus(newDeploy)
}
case *v13.Job:
oldDeploy := old.(*v13.Job)
newDeploy := new.(*v13.Job)
equal := reflect.DeepEqual(oldDeploy.ResourceVersion, newDeploy.ResourceVersion)
if !equal{
//没注意到取数据是按照顺序的,前面的处理完,才能处理后面的,所以直接丢给go携程处理
go UpdateJobStatus(newDeploy)
}
case *v13.CronJob:
oldDeploy := old.(*v13.CronJob)
newDeploy := new.(*v13.CronJob)
equal := reflect.DeepEqual(oldDeploy.ResourceVersion, newDeploy.ResourceVersion)
if !equal{
//没注意到取数据是按照顺序的,前面的处理完,才能处理后面的,所以直接丢给go携程处理
go UpdateCronJobStatus(newDeploy)
}
default:
fmt.Println("update 其他类型")
}
}
func onDelete(obj interface{}) {
switch obj.(type) {
case *v1.Deployment:
deploy := obj.(*v1.Deployment)
k8sSendMessage(deploy.Namespace,"Deployment",deploy.Name,"已经删除")
//pod不存在更新的概念,就不更新了
case *apiv1.Pod:
pod := obj.(*apiv1.Pod)
k8sSendMessage(pod.Namespace,"Pod",pod.Name,"已经删除")
case *apiv1.ConfigMap:
cm := obj.(*apiv1.ConfigMap)
k8sSendMessage(cm.Namespace,"ConfigMap",cm.Name,"已经删除")
case *v1.DaemonSet:
deploy := obj.(*v1.DaemonSet)
k8sSendMessage(deploy.Namespace,"DaemonSet",deploy.Name,"已经删除")
case *apiv1.Service:
svc := obj.(*apiv1.Service)
k8sSendMessage(svc.Namespace,"Service",svc.Name,"已经删除")
case *apiv1.Secret:
secret := obj.(*apiv1.Secret)
k8sSendMessage(secret.Namespace,"Secret",secret.Name,"已经删除")
case *apiv1.ServiceAccount:
sa := obj.(*apiv1.ServiceAccount)
k8sSendMessage(sa.Namespace,"ServiceAccount",sa.Name,"已经删除")
case *v1.StatefulSet:
sts := obj.(*v1.StatefulSet)
k8sSendMessage(sts.Namespace,"StatefulSet",sts.Name,"已经删除")
case *v12.Ingress:
ingress := obj.(*v12.Ingress)
k8sSendMessage(ingress.Namespace,"Ingress",ingress.Name,"已经删除")
case *v13.Job:
job := obj.(*v13.Job)
k8sSendMessage(job.Namespace,"Job",job.Name,"已经删除")
case *v13.CronJob:
cronjob := obj.(*v13.CronJob)
k8sSendMessage(cronjob.Namespace,"CronJob",cronjob.Name,"已经删除")
default:
fmt.Println("delete 其他类型")
}
}
//pod状态延迟检测
func AddPodStatus(resouce *apiv1.Pod) {
k8sSendMessage(resouce.Namespace,"Pod",resouce.Name,"正在新增")
go TriggerPodcheck(resouce.Namespace,"Pod",resouce.Name,time.Second*35)
}
//发送deployment更细状态
//只触发一次,
//更新Deployment
func UpdateDeployStatus(resouce *v1.Deployment) {
key := resouce.Namespace+"-"+resouce.Name
if _,ok := ResourceMap[key]; !ok {
mx.Lock()
defer mx.Unlock()
ResourceMap[key]=AcountTime{
Trigger: TriggerResource,
}
namespace := resouce.Namespace
name := resouce.Name
go ResourceMap[key].Trigger(namespace,"Deployment",name,time.Second*20)
}
}
//更新DaemonSet
func UpdateDaemonSetStatus(resouce *v1.DaemonSet) {
key := resouce.Namespace+"-"+resouce.Name
if _,ok := ResourceMap[key]; !ok {
mx.Lock()
defer mx.Unlock()
ResourceMap[key]=AcountTime{
Trigger: TriggerResource,
}
namespace := resouce.Namespace
name := resouce.Name
go ResourceMap[key].Trigger(namespace,"DaemonSet",name,time.Second*20)
}
}
//更新configmap
func UpdateConfigMapStatus(resouce *apiv1.ConfigMap) {
//临时解决只监控特定namespace问题
//抛弃configmap锁的问题
key := resouce.Namespace+"-"+resouce.Name
if _,ok := ResourceMap[key]; !ok {
mx.Lock()
defer mx.Unlock()
ResourceMap[key]=AcountTime{
Trigger: TriggerResource,
}
namespace := resouce.Namespace
name := resouce.Name
go ResourceMap[key].Trigger(namespace,"ConfigMap",name,time.Second*20)
}
}
//更新configmap
func UpdatePodStatus(resouce *apiv1.Pod) {
//临时解决只监控特定namespace问题
//抛弃configmap锁的问题
key := resouce.Namespace+"-"+resouce.Name
if _,ok := ResourceMap[key]; !ok {
mx.Lock()
defer mx.Unlock()
ResourceMap[key]=AcountTime{
Trigger: TriggerResource,
}
namespace := resouce.Namespace
name := resouce.Name
go ResourceMap[key].Trigger(namespace,"ConfigMap",name,time.Second*20)
}
}
//更新secret
func UpdateSecretStatus(resouce *apiv1.Secret) {
key := resouce.Namespace+"-"+resouce.Name
if _,ok := ResourceMap[key]; !ok {
mx.Lock()
defer mx.Unlock()
ResourceMap[key]=AcountTime{
Trigger: TriggerResource,
}
namespace := resouce.Namespace
name := resouce.Name
go ResourceMap[key].Trigger(namespace,"Secret",name,time.Second*20)
}
}
//更新sa
func UpdateSAStatus(resouce *apiv1.ServiceAccount) {
key := resouce.Namespace+"-"+resouce.Name
if _,ok := ResourceMap[key]; !ok {
mx.Lock()
defer mx.Unlock()
ResourceMap[key]=AcountTime{
Trigger: TriggerResource,
}
namespace := resouce.Namespace
name := resouce.Name
go ResourceMap[key].Trigger(namespace,"ServiceAccount",name,time.Second*20)
}
}
//更新service
func UpdateServiceStatus(resouce *apiv1.Service) {
key := resouce.Namespace+"-"+resouce.Name
if _,ok := ResourceMap[key]; !ok {
mx.Lock()
defer mx.Unlock()
ResourceMap[key]=AcountTime{
Trigger: TriggerResource,
}
namespace := resouce.Namespace
name := resouce.Name
go ResourceMap[key].Trigger(namespace,"Service",name,time.Second*20)
}
}
//更新statefulset
func UpdateStatefulsetStatus(resouce *v1.StatefulSet) {
key := resouce.Namespace+"-"+resouce.Name
if _,ok := ResourceMap[key]; !ok {
mx.Lock()
defer mx.Unlock()
ResourceMap[key]=AcountTime{
Trigger: TriggerResource,
}
namespace := resouce.Namespace
name := resouce.Name
go ResourceMap[key].Trigger(namespace,"StatefulSet",name,time.Second*20)
}
}
//更新Ingress
func UpdateInressStatus(resouce *v12.Ingress) {
key := resouce.Namespace+"-"+resouce.Name
if _,ok := ResourceMap[key]; !ok {
mx.Lock()
defer mx.Unlock()
ResourceMap[key]=AcountTime{
Trigger: TriggerResource,
}
namespace := resouce.Namespace
name := resouce.Name
go ResourceMap[key].Trigger(namespace,"Ingress",name,time.Second*20)
}
}
//更新Ingress
func UpdateJobStatus(resouce *v13.Job) {
key := resouce.Namespace+"-"+resouce.Name
if _,ok := ResourceMap[key]; !ok {
mx.Lock()
defer mx.Unlock()
ResourceMap[key]=AcountTime{
Trigger: TriggerResource,
}
namespace := resouce.Namespace
name := resouce.Name
go ResourceMap[key].Trigger(namespace,"Job",name,time.Second*20)
}
}
//更新Ingress
func UpdateCronJobStatus(resouce *v13.CronJob) {
key := resouce.Namespace+"-"+resouce.Name
if _,ok := ResourceMap[key]; !ok {
mx.Lock()
defer mx.Unlock()
ResourceMap[key]=AcountTime{
Trigger: TriggerResource,
}
namespace := resouce.Namespace
name := resouce.Name
go ResourceMap[key].Trigger(namespace,"CronJob",name,time.Second*20)
}
}
//pod检查
func TriggerPodcheck(namespace,kind,name string,timeafter time.Duration) {
//可能会造成内存泄露,不再使用
//<- time.After(timeafter)
//也是为了排除刚开始的一些数据
if !Podthreshold{
now1 := time.Now()
if now1.Sub(StartTime) < time.Second*10 {
return
}else {
Podthreshold = true
}
}
time.Sleep(timeafter)
pod, err := PodInformer.Lister().Pods(namespace).Get(name)
if err != nil{
fmt.Println(err)
return
}
css := pod.Status.ContainerStatuses
//https://kubernetes.io/zh-cn/docs/concepts/workloads/pods/pod-lifecycle/
var ty,msg,reason string
//随便获取一个容器状态不对,就返回
for _,cs := range css{
if cs.State.Waiting != nil{
ty = "Waiting"
msg =cs.State.Waiting.Message
reason = cs.State.Waiting.Reason
break
}
if cs.State.Terminated != nil{
ty = "terminated"
reason = cs.State.Terminated.Reason
break
}
}
//如果不是waiting,terminated.就是running状态
if len(ty) == 0{
ty = "runing"
msg = ""
reason = ""
}
go k8sSendMessagePod(namespace,name,ty,reason,msg)
}
//实际出发函数
//##### 这里面有很大优化空间
//#####
func TriggerResource(namespace,kind,name string,timeafter time.Duration) {
//可能会造成内存泄露,不再使用
//<- time.After(timeafter)
key := namespace+"-"+name
//我自己的监控排除,
NSArray = append(NSArray,"rook-ceph")
NArray = append(NArray,"ingress-controller-leader","operator-lock","ingress-controller-leader-nginx")
//排除
if ExcludeNSS(namespace,name){
}else {
go k8sSendMessage(namespace,kind,name,"正在更新")
}
time.Sleep(timeafter)
delete(ResourceMap,key)
}
//实际出发函数
func k8sSendMessagePod(namespace,name,ty,reason,msg string) {
timenow := GetNowTimeFormat()
var context string
if ty =="runing"{
context = "Pod状态确认:" + "\nPod名称: " +"**"+ name + "**"+ "\n命名空间: " +"<font color=\"comment\">"+ namespace +"</font>"+ "\nPod状态: " +"<font color=\"info\">"+ ty +"</font>"+"\n信息描述: " +"<font color=\"warning\">"+ reason+": "+msg+"</font>" +"\n现在时间: " + "<font color=\"comment\">"+timenow+"</font>"
}else {
context = "Pod状态确认:" + "\nPod名称: " +"**"+ name + "**"+ "\n命名空间: " +"<font color=\"comment\">"+ namespace +"</font>"+ "\nPod状态: " +"<font color=\"warning\">"+ ty +"</font>"+"\n信息描述: " +"<font color=\"warning\">"+ reason+": "+msg+"</font>" +"\n现在时间: " + "<font color=\"comment\">"+timenow+"</font>"
}
//个人测试
//url1 := "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=yyyyyyyy"
//公司
if len(*url)!=0{
url1 := "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxxx"
url = &url1
}
SendMessage(*url,context)
}
//实际出发函数
func k8sSendMessage(namespace,resouretype,name,event string) {
timenow := GetNowTimeFormat()
var context string
if event == "正在新增" {
context = "K8S消息提醒:" + "\n资源类型: " +"<font color=\"comment\">"+ resouretype +"</font>"+ "\n资源名称: " +"**"+ name + "**"+ "\n命名空间: " +"<font color=\"comment\">"+ namespace +"</font>"+"\n发生状态: " +"<font color=\"info\">"+ event+"</font>" +"\n现在时间: " + "<font color=\"comment\">"+timenow+"</font>"
}else {
context = "K8S消息提醒:" + "\n资源类型: " +"<font color=\"comment\">"+ resouretype +"</font>"+ "\n资源名称: " +"**"+ name + "**"+ "\n命名空间: " +"<font color=\"comment\">"+ namespace +"</font>"+"\n发生状态: " +"<font color=\"warning\">"+ event+"</font>" +"\n现在时间: " + "<font color=\"comment\">"+timenow+"</font>"
}
//个人测试
//url1 := "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=yyyyyyyy"
//公司
if len(*url)!=0{
url1 := "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxxx"
url = &url1
}
SendMessage(*url,context)
}
//
//context := "容器状态: 容器已经关闭" + "\n现在时间: " + fmt.Sprint(now) + "\n公司机器: " + hostname + "\n内网IP地址: " + PrivateIP + "\n外网IP地址: " + PublicIP + "\n容器名称: " + name + "\ndocker关闭时间: " + stopTime
//发送信息到企业微信webhook+开机缓起作用
func SendMessage(url, msg string) {
//我这一步是为了不让刚开始的数据去通知
if !Resourcethreshold{
now1 := time.Now()
if now1.Sub(StartTime) < time.Second*10 {
return
}else {
Resourcethreshold = true
}
}
now1 := time.Now()
if now1.Sub(StartTime) < time.Second*10{
return
}
//var m Message
//m.MsgType = "text"
//m.Text.Content = msg
//jsons, err := json.Marshal(m)
//if err != nil {
// log.Println(err)
// return
//}
var m MessageMD
m.MsgType = "markdown"
m.Markdown.Content = msg
jsons, err := json.Marshal(m)
if err != nil {
log.Println(err)
return
}
resp := string(jsons)
client := &http.Client{
//Transport: &http.Transport{
//
// DisableKeepAlives: true,
//},
Timeout: 10 * time.Second,
}
req, err := http.NewRequest("POST", url, strings.NewReader(resp))
if err != nil {
log.Println(err)
return
}
req.Header.Set("Content-Type", "application/json")
r, err := client.Do(req)
if err != nil {
log.Println(err)
return
}
defer r.Body.Close()
_, err = ioutil.ReadAll(r.Body)
if err != nil {
log.Println(err)
return
}
}
//调整时间格式
func GetNowTimeFormat() string {
t := time.Now()
format := t.Format("2006-01-02 15:04:05")
return format
}
//通过参数传入的字符拿来判断
func GetArray() {
//namespace字符串分割
trimns := strings.Trim(*ns, `"`)
trimns = strings.Trim(trimns, `'`)
splitns := strings.Split(trimns, ",")
for _, v := range splitns {
NSArray = append(NSArray, v)
}
//name字符分割
trimn := strings.Trim(*n, `"`)
trimn = strings.Trim(trimn, `'`)
splitn := strings.Split(trimn, ",")
for _, v := range splitn {
NArray = append(NArray, v)
}
}
//排除不监控的namespace和name
func ExcludeNSS(namespace,name string) bool {
for _, v := range NSArray {
if namespace==v{
return true
}
}
for _, v := range NArray {
if name==v{
return true
}
}
return false
}
//错误检查,以及退出
func CheckError(s string, err error) {
if err == nil {
return
}
fmt.Println(s, err.Error())
os.Exit(1)
}