目录

Opentelemetry 6-续3-告警和通知

在上一篇文章中,成功将JVM的监控集成到整个系统当中,最后只剩下告警和通知两块没做了,在我的这套系统中,常规监控告警可以直接通过Prometheus-kube的Alertmanager集中监控,而日志监控也可以通过Loki的Ruler接入Alertmanager,将告警转发给Alertmanager,统一告警通知手段。但是我这次没这么做,主要原因是因为我部署的Loki不在Prometheus-kube的RBAC中,接入Alertmanager时出现问题,而我又着急落地项目,有些赶时间,便通过其他手段来做,本篇简单介绍下我的实现。

告警

告警在我这里主要分为两块,一块是监控的告警,包括系统监控、集群、流量、资源的监控进行告警;另一块是日志的告警。

监控告警

监控告警已经是非常成熟了的,在我们安装Prometheus时就带了很多例子,这里不再过多赘述。

日志告警

日志告警,在我们的需求中,主要是在日志中过滤带有特定标识的进行分类告警,将过滤出来的告警发送至我自己编写的webhook,对告警进行一定的策略,然后发出钉钉通知。因为我们日志存储用的是Loki,故首推Loki自带的告警手段以降低损耗,通过翻阅官方文档,配置一下Ruler,示例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
auth_enabled: false
server:
  http_listen_port: 3100
schema_config:
  configs:
    - from: 2021-08-01
      store: boltdb-shipper
      object_store: s3
      schema: v11
      index:
        prefix: index_
        period: 24h
common:
  path_prefix: /loki
  replication_factor: 1
  storage:
    s3:
      endpoint: oss-cn-hangzhou-internal.aliyuncs.com
      insecure: true
      bucketnames: BUCKET
      access_key_id: KEY
      secret_access_key: SECRET
      s3forcepathstyle: true
  ring:
    kvstore:
      store: memberlist
ruler:
  alertmanager_url: https://WEBHOOK
  enable_alertmanager_v2: true
  enable_api: true
  enable_sharding: true
  rule_path: /tmp/rules
  flush_period: 1m
  ring:
    kvstore:
      store: memberlist
  storage:
    s3:
      bucketnames: BUCKET
table_manager:
  retention_deletes_enabled: true
  retention_period: 1h

配置之后需要创建告警规则,告警规则与Alertmanager兼容,可模仿监控告警来写。而配置的地方也有很多,如HTTP API,容器内文件写入,我使用grafana作为webUI,刚好也提供配置告警的页面,所以直接使用webUI来新建告警规则了。写完之后yaml如下:

1
2
3
4
5
6
7
8
alert: Log
annotations:
  summary: '订单中心出现level为ERROR的日志'
labels:
  belong: order
expr: >-
    sum by(body,job, traceid)(count_over_time({level="ERROR", job="order-service"} | json [10s])) > 0

这段配置通过json的方式将我需要的日志内容转接到了label上,在后续的告警中,就会将我需要内容从labels中提取组合为通知内容文字。

有一点需要注意,这段配置文件中与监控告警不同的是少了一个参数for,for的作用是降噪,让告警持续一段时间再发出告警,而对于日志来说不是这样的,只要有一条ERROR告警就要告警,不存在降噪一说。

通知

通知手段也是与监控告警一致,如果是通过Alertmanager统一管控的话,则共用告警媒介,我们使用的是钉钉,而且日志告警和监控告警是分开的,在不同的群,而且在上面配置中可以看到,我模仿alertmanager的接口写了一个webhook,Loki直接将告警发送至我的webhook,所以没有统一管控。我写的webhook比较简单,仅仅能看到效果。

原本写的代码比较简单,下面是不断增加需求后的代码。

示例代码:

loki.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package loki

import (
	"time"
)

var (
	AlertChan = make(chan []LokiRuleAlertStruct)
)

func Server() {
	for {
		select {
		case alerts := <-AlertChan:
			for _, alert := range alerts {
				traceid := alert.Labels.Traceid
				if Channel.processed.Contains(traceid + alert.EndsAt.String()) {
					continue
				}
				Channel.processed.Enqueue(traceid + alert.EndsAt.String())
				Channel.c <- alert
			}
		}
	}
}

type LokiRuleAlertStruct struct {
	Annotations struct {
		Summary string `json:"summary"`
	} `json:"annotations"`
	EndsAt       time.Time `json:"endsAt"`
	StartsAt     time.Time `json:"startsAt"`
	GeneratorURL string    `json:"generatorURL"`
	Labels       struct {
		Alertname string `json:"alertname"`
		Belong    string `json:"belong"`
		Body      string `json:"body"`
		Job       string `json:"job"`
		Traceid   string `json:"traceid"`
		Stack     string `json:"attributes_exception_stacktrace"`
	} `json:"labels"`
}

factory.go

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package loki

import (
	"context"
	"fmt"
	"golang.org/x/time/rate"
	"operation-webapi/models/utils"
	"sync"
	"time"
)

const (
	logMaxLength            = 400
	notificationChannelSize = 10000
)

var (
	webHooks = map[string]string{
		"payment": "DINGDINGWEBHOOK",
		"user":    "DINGDINGWEBHOOK",
		"message": "DINGDINGWEBHOOK",
		"order":   "DINGDINGWEBHOOK",
		"hm":      "DINGDINGWEBHOOK",
		"hc":      "DINGDINGWEBHOOK",
		"ark":     "DINGDINGWEBHOOK",
		"my":      "DINGDINGWEBHOOK",
	}

	Channel      = NewNotificationChannel(notificationChannelSize) 
    // 初始化告警通道
	notifiers    = make(map[string]Notifier)                       
    // 存储已创建的notifier
	notifierLock sync.Mutex                                        
    // 保证并发安全
)

// 运行程序先自动初始化钉钉通知通道
func init() {
	for name := range webHooks {
		notifiers[name] = createNotifier(name)
	}
	go Channel.StartNotifier()
}

// 告警媒介接口,这里只实现了钉钉的
type Notifier interface {
	Notify(ctx context.Context, msg interface{}) error
	Close()
}

type DingDingNotifier struct {
	WebHook string
	stopCh  chan struct{} // 用于停止通知goroutine
	limiter *rate.Limiter // 添加Limiter成员变量
}

func (cn *DingDingNotifier) Notify(ctx context.Context, msg interface{}) error {
	for {
		select {
		case <-ctx.Done():
			return nil
		case <-cn.stopCh:
			return nil
		default:
			if err := cn.limiter.Wait(context.Background()); err != nil {
				fmt.Printf("Error waiting for limiter: %v\n", err)
			}
			err := utils.SendDingTalkResolvedWithWebHook(cn.WebHook, "告警", msg)
			if err != nil {
				fmt.Printf("Error sending notification: %v\n", err)
			} else {
				return nil
			}
		}
	}
}

func (cn *DingDingNotifier) Close() {
	close(cn.stopCh)
}

type NotifierFactory struct{}

func (nf NotifierFactory) GetNotifier(belong string) Notifier {
	if notifier, ok := notifiers[belong]; ok {
		return notifier
	}
	return nil
}

func createNotifier(key string) Notifier {
	notifierLock.Lock()
	defer notifierLock.Unlock()

	n := &DingDingNotifier{
		WebHook: webHooks[key],
		stopCh:  make(chan struct{}),
		// 每秒钟生成0.3个令牌,最多能存一个令牌,约等于 4秒钟一个令牌,钉钉api限流为1分钟20次
		limiter: rate.NewLimiter(0.3, 1),
	}
	notifiers[key] = n
	return n

}

type NotificationChannel struct {
	c         chan LokiRuleAlertStruct
	wg        sync.WaitGroup
	processed *FixedSizeQueue
	closed    bool
	mutex     sync.Mutex
}

func NewNotificationChannel(fixedSize int) *NotificationChannel {
	return &NotificationChannel{
		c:         make(chan LokiRuleAlertStruct),
		closed:    false,
		processed: NewFixedSizeQueue(fixedSize),
		mutex:     sync.Mutex{},
	}
}

func (nc *NotificationChannel) StartNotifier() {
    // 重复的告警,一小时只通知一次
	cache := NewCache(time.Hour)

	for {
		select {
		case notification, ok := <-nc.c:
			if !ok {
				return
			}

			// 避免接受大量无效重复告警
			if ok, _ := cache.getValue(notification); ok {
				continue
			}
			cache.setValue(notification)

			ctx, cancel := context.WithCancel(context.Background())
			defer cancel()

			go func() {
				nf := NotifierFactory{}
				notifier := nf.GetNotifier(notification.Labels.Belong)
				// 如果在Loki的告警规则中没有配置belong这个label,或webHooks里没有定义的,均不进行告警
				if notifier == nil {
					return
				}
				// 防止钉钉接口异常报错
				if len(notification.Labels.Stack) > logMaxLength {
					notification.Labels.Stack = notification.Labels.Stack[:logMaxLength] + "..."
				}
				if len(notification.Labels.Body) > logMaxLength {
					notification.Labels.Body = notification.Labels.Body[:logMaxLength] + "..."
				}

				err := notifier.Notify(ctx, map[string]interface{}{
					"msgtype": "text",
					"text": map[string]string{
						"content": "日志告警\n" + fmt.Sprintf("告警\n应用: %s\ntraceid: %s\n日志内容: %s\n日志描述: %s\n堆栈信息: %s\n", notification.Labels.Job, notification.Labels.Traceid, notification.Labels.Body, notification.Annotations.Summary, notification.Labels.Stack),
					},
				},
				)
				if err != nil {
					fmt.Printf("Error sending notification: %v\n", err)
				}
			}()
		}
	}
}

func (nc *NotificationChannel) Close() {
	if !nc.closed {
		close(nc.c)
		nc.closed = true
		nc.wg.Wait()
	}
}

cache.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package loki

import (
	"crypto/md5"
	"encoding/hex"
	"fmt"
	"sync"
	"time"
)

type Cache struct {
	values sync.Map
	expiry time.Duration
}

func NewCache(expiry time.Duration) *Cache {
	return &Cache{
		expiry: expiry,
	}
}

func (c *Cache) setValue(obj LokiRuleAlertStruct) {
	key := getObjectKey(obj)
	c.values.Store(key, time.Now().Add(c.expiry))
}

func (c *Cache) getValue(obj LokiRuleAlertStruct) (bool, error) {
	key := getObjectKey(obj)
	value, ok := c.values.Load(key)
	if !ok {
		return false, nil
	}

	expiryTime, ok := value.(time.Time)
	if !ok {
		return false, fmt.Errorf("value has incorrect type")
	}
	// 过期删除,返回false
	if time.Now().After(expiryTime) {
		c.values.Delete(key)
		return false, nil
	}

	return true, nil
}

func getObjectKey(obj LokiRuleAlertStruct) string {
	h := md5.New()
	h.Write([]byte(obj.Labels.Stack))
	return hex.EncodeToString(h.Sum(nil))
}

queue.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package loki

type FixedSizeQueue struct {
	size  int
	queue []string
	head  int
	tail  int
}

func NewFixedSizeQueue(size int) *FixedSizeQueue {
	return &FixedSizeQueue{
		size:  size,
		queue: make([]string, size),
		head:  0,
		tail:  0,
	}
}

func (q *FixedSizeQueue) Enqueue(value string) {
	if q.head == (q.tail+1)%q.size {
		q.Dequeue()
	}
	q.queue[q.tail] = value
	q.tail = (q.tail + 1) % q.size
}

func (q *FixedSizeQueue) Dequeue() string {
	value := q.queue[q.head]
	q.head = (q.head + 1) % q.size
	return value
}

func (q *FixedSizeQueue) Contains(value string) bool {
	for i := q.head; i != q.tail; i = (i + 1) % q.size {
		if q.queue[i] == value {
			return true
		}
	}
	return false
}