merge out.github_fanxin into out.github go-sdk日志消费 Created-by: gongdinghui lts-go-sdkAuthor-id: lts MR-id: lts Commit-by: lts-go-sdkMerged-by: fanxin lts-go-sdkEltsE-issues: WIlts Description: feature go-sdk日志消费 See merge request: applicationplatform/APM/ICAgent/lts-go-sdk!lts Match-id-cc7bad0bd025dd5b7e2f595601b67348812c5495
merge out.github_fanxin into out.github
go-sdk日志消费
Created-by: gongdinghui lts-go-sdkAuthor-id: lts MR-id: lts Commit-by: lts-go-sdkMerged-by: fanxin lts-go-sdkEltsE-issues: WIlts Description: feature go-sdk日志消费
See merge request: applicationplatform/APM/ICAgent/lts-go-sdk!lts
Match-id-cc7bad0bd025dd5b7e2f595601b67348812c5495
go get github.com/huaweicloud/huaweicloud-lts-sdk-go
import github.com/huaweicloud/huaweicloud-lts-sdk-go
import ( "fmt" "github.com/huaweicloud/huaweicloud-lts-sdk-go/producer" "sync" "time" ) func ProduceLog(endpoint, ak, sk, region, projectId, logGroup, logStream string) { producerConfig := producer.GetConfig() producerConfig.Endpoint = endpoint producerConfig.AccessKeyID = ak producerConfig.AccessKeySecret = sk producerConfig.RegionId = region producerConfig.ProjectId = projectId producerInstance := producer.InitProducer(producerConfig) producerInstance.Start() wg := sync.WaitGroup{} for i := 0; i < 10; i++ { wg.Add(1) go func() { for j := 0; j < 1000; j++ { labels := make(map[string]string) labels["keyA"] = "valueA" labels["keyB"] = "valueB" labels["keyC"] = "valueC" customLogs := producer.CustomLog{ LogTimeNs: time.Now().UnixNano(), Log: fmt.Sprintf("content for this test [%d]", j), } log := producer.GenerateLogWithCustomTime([]producer.CustomLog{customLogs}, labels) err := producerInstance.SendLog(logGroup, logStream, log) err = producerInstance.SendLog(logGroup, logStream, log) // send log with callback,user can deal error by self handle := ErrorHandle{} err = producerInstance.SendLogWithCallBack(logGroup, logStream, log, handle) err = producerInstance.SendLogWithCallBack(logGroup, logStream, log, handle) if err != nil { fmt.Println(err) } var sLog producer.StructLog logContent := make(map[string]string) logContent["keyA"] = "valueA" logContent["keyB"] = "valueB" logContent1 := make(map[string]string) logContent["keyA1"] = "valueA1" logContent["keyB1"] = "valueB1" sLog.Contents = append(sLog.Contents, logContent1) err = producerInstance.SendLogStruct(logGroup, logStream, &sLog) if nil != err { continue } time.Sleep(100 * time.Microsecond) } wg.Done() fmt.Printf("test func finished\n") }() } wg.Wait() fmt.Printf("send all complate ...") time.Sleep(10 * 60 * time.Second) } type ErrorHandle struct{} func (ErrorHandle) Success(result *producer.Result) { fmt.Printf("send log to lts success, success flag: %v\n", result.IsSuccessful()) } func (ErrorHandle) Fail(result *producer.Result) { fmt.Printf("send log to lts error, success flag: %v requestId: %s, httpcode: %d, errorCode: %s, errorMsg: %s\n", result.IsSuccessful(), result.GetRequestId(), result.GetHttpCode(), result.GetErrorCode(), result.GetErrorMessage()) }
lts-go-sdk提供了日志发送失败时的回调方法,只需要实现CallBack接口的两个方法,就可以自行处理发送失败后的错误,使用方法参见sample下样例
func ConsumeLog(regionName, projectId, logGroupId, logStreamId, ak, sk, consumerGroupName, logLevel, logDest string, consumeCount, batchSize int, startTime, endTime int64) { // 消费开始时间 括号中填毫秒值 var StartTime time.Time if startTime != 0 { StartTime = time.UnixMilli(startTime) } // 消费结束时间 var EndTime time.Time if endTime != 0 { EndTime = time.UnixMilli(endTime) } var logConfig producer.LogConf if logDest == "file" { logConfig = producer.LogConf{ Dir: "/opt/clouds", Name: "lts-go-sdk.log", Level: logLevel, MaxSize: 100, } producer.InitLoggerFile(logConfig) } else { logConfig = producer.LogConf{ Dir: "", Name: "", Level: logLevel, MaxSize: 100, } producer.InitLoggerStd(logConfig) } slog.Info("region is: ", "region", regionName) slog.Info("projectId is: ", "region", projectId) slog.Info("logGroupId is: ", "logGroupId", logGroupId) slog.Info("ak is: ", "ak", ak) slog.Info("sk is: ", "sk", sk) slog.Info("consumerGroupName is: ", "consumerGroupName", consumerGroupName) slog.Info("consumerCount is: ", "consumerCount", consumeCount) slog.Info("batchSize is: ", "batchSize", batchSize) slog.Info("start time is:", "startTime", StartTime) slog.Info("end time:", "endTime", EndTime, "endTime is Zero", EndTime.IsZero()) workers := make([]*consumer.ClientConsumerWorker, 0) for i := 0; i < consumeCount; i++ { config := consumer.GetConsumerConfig() // 构建消费者配置, 参数有必填的:regionName, projectId, logGroupId, logStreamId, ak, sk, consumerGroupName, startTime config.ProjectId = projectId config.LogGroupId = logGroupId config.LogStreamId = logStreamId config.AccessKeyId = ak config.AccessKeySecret = sk config.BatchSize = batchSize //BatchSize默认值1000 config.StartTimeNs = StartTime config.EndTimeNs = EndTime config.ConsumerGroupName = consumerGroupName config.RegionName = regionName // 构建消费者的工作者 worker := consumer.GetClientConsumerWorker(new(DemoLogConsumerProcessorFactory), config) workers = append(workers, worker) } for _, work := range workers { // 启动消费者, ClientConsumerWorker启动后, 内置的消费任务会自动运行 work.Run() } time.Sleep(30 * time.Minute) for _, work := range workers { // 调用ClientConsumerWorker的shutdown方法, 安全的关闭消费者, 消费者中启动的内置线程也会自动停止 work.Shutdown() } // 调用ClientConsumerWorker的shutdown方法后, 由于消费者内置多个异步任务, 建议停止1分钟在关闭整个服务, 目的就是让消费者完成后台的异步任务, 安全的退出 // 如果消费者突然关闭, 没有调用shutdown方法; 或者调用shutdown方法之后, 没有等待一定的时间. 那么可能造成下次消费时, 会有一定的重复数据, 因为消费者后台的异步任务没有保存checkPoint点 time.Sleep(time.Minute) } type DemoLogConsumerProcessor struct { LogCount int } // Initialize 这个方法给您回调返回的ShardId, 是告诉您当前这个shard-consumer在消费那个shard func (processor *DemoLogConsumerProcessor) Initialize(shardId string) { } // Process 数据处理方法, logGroups为拉取到的日志 func (processor *DemoLogConsumerProcessor) Process(logGroups []consumer.LogData, checkPointTracker consumer.ILogConsumerCheckPointTracker) string { atomic.AddInt64(&allShardLogCount, int64(len(logGroups))) processor.LogCount = processor.LogCount + len(logGroups) slog.Info("this time process log", "consume log", len(logGroups), "total log num", processor.LogCount) slog.Info("after this consume", "consume log", len(logGroups), "all shard consume total log num", allShardLogCount) logrus.WithField("consume log", len(logGroups)).WithField("total log num", processor.LogCount).Info("this time process log") logrus.WithField("consume log", len(logGroups)).WithField("all shard consume total log num", allShardLogCount).Info("after this consume") //for _, logData := range logGroups { // // logData为您的一条日志,日志内容在Labels属性中。 // // Labels为一个JSON,存放您的这个条日志的内容,比如: "log_content": "日志内容" // fmt.Println(fmt.Sprintf("日志内容:%v", logData.Labels)) //} // 方法的返回值为一个checkPoint // 如果您在处理这批数据的时候, 遇到什么异常或者说想重新获取这一次的数据, 那么 return checkPointTracker.GetCurrentCursor(); return "" } // Shutdown 当调用ClientConsumerWorker的shutdown方法, 会调用此函数, 您可以在此处写一些关闭流程 func (processor *DemoLogConsumerProcessor) Shutdown(checkPointTracker consumer.ILogConsumerCheckPointTracker) error { // 关闭前, 立即保存checkPoint return checkPointTracker.SaveCheckPoint(true) } type DemoLogConsumerProcessorFactory struct { } func (processor *DemoLogConsumerProcessorFactory) GeneratorProcessor() consumer.ILogConsumerProcessor { demoProcessor := new(DemoLogConsumerProcessor) demoProcessor.LogCount = 0 return demoProcessor }
版权所有:中国计算机学会技术支持:开源发展技术委员会 京ICP备13000930号-9 京公网安备 11010802032778号
lts-go-sdk
1. 使用前提
2、使用方法
go get github.com/huaweicloud/huaweicloud-lts-sdk-go
import github.com/huaweicloud/huaweicloud-lts-sdk-go
3、参数配置以及代码样例
1.日志发送场景
参数配置
代码样例
错误处理
lts-go-sdk提供了日志发送失败时的回调方法,只需要实现CallBack接口的两个方法,就可以自行处理发送失败后的错误,使用方法参见sample下样例
2.日志消费场景
参数配置
代码样例
使用约束