diff --git a/backend/kitex_gen/coze/loop/observability/domain/common/common.go b/backend/kitex_gen/coze/loop/observability/domain/common/common.go index 48e80a3af..51650efc3 100644 --- a/backend/kitex_gen/coze/loop/observability/domain/common/common.go +++ b/backend/kitex_gen/coze/loop/observability/domain/common/common.go @@ -1431,3 +1431,184 @@ func (p *BaseInfo) Field4DeepEqual(src *int64) bool { } return true } + +type Session struct { + UserID *string `thrift:"user_id,1,optional" frugal:"1,optional,string" form:"user_id" json:"user_id,omitempty" query:"user_id"` +} + +func NewSession() *Session { + return &Session{} +} + +func (p *Session) InitDefault() { +} + +var Session_UserID_DEFAULT string + +func (p *Session) GetUserID() (v string) { + if p == nil { + return + } + if !p.IsSetUserID() { + return Session_UserID_DEFAULT + } + return *p.UserID +} +func (p *Session) SetUserID(val *string) { + p.UserID = val +} + +var fieldIDToName_Session = map[int16]string{ + 1: "user_id", +} + +func (p *Session) IsSetUserID() bool { + return p.UserID != nil +} + +func (p *Session) Read(iprot thrift.TProtocol) (err error) { + var fieldTypeId thrift.TType + var fieldId int16 + + if _, err = iprot.ReadStructBegin(); err != nil { + goto ReadStructBeginError + } + + for { + _, fieldTypeId, fieldId, err = iprot.ReadFieldBegin() + if err != nil { + goto ReadFieldBeginError + } + if fieldTypeId == thrift.STOP { + break + } + + switch fieldId { + case 1: + if fieldTypeId == thrift.STRING { + if err = p.ReadField1(iprot); err != nil { + goto ReadFieldError + } + } else if err = iprot.Skip(fieldTypeId); err != nil { + goto SkipFieldError + } + default: + if err = iprot.Skip(fieldTypeId); err != nil { + goto SkipFieldError + } + } + if err = iprot.ReadFieldEnd(); err != nil { + goto ReadFieldEndError + } + } + if err = iprot.ReadStructEnd(); err != nil { + goto ReadStructEndError + } + + return nil +ReadStructBeginError: + return thrift.PrependError(fmt.Sprintf("%T read struct begin error: ", p), err) +ReadFieldBeginError: + return thrift.PrependError(fmt.Sprintf("%T read field %d begin error: ", p, fieldId), err) +ReadFieldError: + return thrift.PrependError(fmt.Sprintf("%T read field %d '%s' error: ", p, fieldId, fieldIDToName_Session[fieldId]), err) +SkipFieldError: + return thrift.PrependError(fmt.Sprintf("%T field %d skip type %d error: ", p, fieldId, fieldTypeId), err) + +ReadFieldEndError: + return thrift.PrependError(fmt.Sprintf("%T read field end error", p), err) +ReadStructEndError: + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) +} + +func (p *Session) ReadField1(iprot thrift.TProtocol) error { + + var _field *string + if v, err := iprot.ReadString(); err != nil { + return err + } else { + _field = &v + } + p.UserID = _field + return nil +} + +func (p *Session) Write(oprot thrift.TProtocol) (err error) { + var fieldId int16 + if err = oprot.WriteStructBegin("Session"); err != nil { + goto WriteStructBeginError + } + if p != nil { + if err = p.writeField1(oprot); err != nil { + fieldId = 1 + goto WriteFieldError + } + } + if err = oprot.WriteFieldStop(); err != nil { + goto WriteFieldStopError + } + if err = oprot.WriteStructEnd(); err != nil { + goto WriteStructEndError + } + return nil +WriteStructBeginError: + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) +WriteFieldError: + return thrift.PrependError(fmt.Sprintf("%T write field %d error: ", p, fieldId), err) +WriteFieldStopError: + return thrift.PrependError(fmt.Sprintf("%T write field stop error: ", p), err) +WriteStructEndError: + return thrift.PrependError(fmt.Sprintf("%T write struct end error: ", p), err) +} + +func (p *Session) writeField1(oprot thrift.TProtocol) (err error) { + if p.IsSetUserID() { + if err = oprot.WriteFieldBegin("user_id", thrift.STRING, 1); err != nil { + goto WriteFieldBeginError + } + if err := oprot.WriteString(*p.UserID); err != nil { + return err + } + if err = oprot.WriteFieldEnd(); err != nil { + goto WriteFieldEndError + } + } + return nil +WriteFieldBeginError: + return thrift.PrependError(fmt.Sprintf("%T write field 1 begin error: ", p), err) +WriteFieldEndError: + return thrift.PrependError(fmt.Sprintf("%T write field 1 end error: ", p), err) +} + +func (p *Session) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("Session(%+v)", *p) + +} + +func (p *Session) DeepEqual(ano *Session) bool { + if p == ano { + return true + } else if p == nil || ano == nil { + return false + } + if !p.Field1DeepEqual(ano.UserID) { + return false + } + return true +} + +func (p *Session) Field1DeepEqual(src *string) bool { + + if p.UserID == src { + return true + } else if p.UserID == nil || src == nil { + return false + } + if strings.Compare(*p.UserID, *src) != 0 { + return false + } + return true +} diff --git a/backend/kitex_gen/coze/loop/observability/domain/common/common_validator.go b/backend/kitex_gen/coze/loop/observability/domain/common/common_validator.go index 948f7a9b5..ba42d258b 100644 --- a/backend/kitex_gen/coze/loop/observability/domain/common/common_validator.go +++ b/backend/kitex_gen/coze/loop/observability/domain/common/common_validator.go @@ -40,3 +40,6 @@ func (p *BaseInfo) IsValid() error { } return nil } +func (p *Session) IsValid() error { + return nil +} diff --git a/backend/kitex_gen/coze/loop/observability/domain/common/k-common.go b/backend/kitex_gen/coze/loop/observability/domain/common/k-common.go index 47de5bad4..a11bc0cbd 100644 --- a/backend/kitex_gen/coze/loop/observability/domain/common/k-common.go +++ b/backend/kitex_gen/coze/loop/observability/domain/common/k-common.go @@ -985,3 +985,123 @@ func (p *BaseInfo) DeepCopy(s interface{}) error { return nil } + +func (p *Session) FastRead(buf []byte) (int, error) { + + var err error + var offset int + var l int + var fieldTypeId thrift.TType + var fieldId int16 + for { + fieldTypeId, fieldId, l, err = thrift.Binary.ReadFieldBegin(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldBeginError + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if fieldTypeId == thrift.STRING { + l, err = p.FastReadField1(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + default: + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + } + + return offset, nil +ReadFieldBeginError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d begin error: ", p, fieldId), err) +ReadFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d '%s' error: ", p, fieldId, fieldIDToName_Session[fieldId]), err) +SkipFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T field %d skip type %d error: ", p, fieldId, fieldTypeId), err) +} + +func (p *Session) FastReadField1(buf []byte) (int, error) { + offset := 0 + + var _field *string + if v, l, err := thrift.Binary.ReadString(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + _field = &v + } + p.UserID = _field + return offset, nil +} + +func (p *Session) FastWrite(buf []byte) int { + return p.FastWriteNocopy(buf, nil) +} + +func (p *Session) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p != nil { + offset += p.fastWriteField1(buf[offset:], w) + } + offset += thrift.Binary.WriteFieldStop(buf[offset:]) + return offset +} + +func (p *Session) BLength() int { + l := 0 + if p != nil { + l += p.field1Length() + } + l += thrift.Binary.FieldStopLength() + return l +} + +func (p *Session) fastWriteField1(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p.IsSetUserID() { + offset += thrift.Binary.WriteFieldBegin(buf[offset:], thrift.STRING, 1) + offset += thrift.Binary.WriteStringNocopy(buf[offset:], w, *p.UserID) + } + return offset +} + +func (p *Session) field1Length() int { + l := 0 + if p.IsSetUserID() { + l += thrift.Binary.FieldBeginLength() + l += thrift.Binary.StringLengthNocopy(*p.UserID) + } + return l +} + +func (p *Session) DeepCopy(s interface{}) error { + src, ok := s.(*Session) + if !ok { + return fmt.Errorf("%T's type not matched %T", s, p) + } + + if src.UserID != nil { + var tmp string + if *src.UserID != "" { + tmp = kutils.StringDeepCopy(*src.UserID) + } + p.UserID = &tmp + } + + return nil +} diff --git a/backend/kitex_gen/coze/loop/observability/domain/task/k-task.go b/backend/kitex_gen/coze/loop/observability/domain/task/k-task.go index 93b314663..e7001c375 100644 --- a/backend/kitex_gen/coze/loop/observability/domain/task/k-task.go +++ b/backend/kitex_gen/coze/loop/observability/domain/task/k-task.go @@ -192,6 +192,20 @@ func (p *Task) FastRead(buf []byte) (int, error) { goto SkipFieldError } } + case 11: + if fieldTypeId == thrift.STRING { + l, err = p.FastReadField11(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } case 100: if fieldTypeId == thrift.STRUCT { l, err = p.FastReadField100(buf[offset:]) @@ -367,6 +381,20 @@ func (p *Task) FastReadField10(buf []byte) (int, error) { return offset, nil } +func (p *Task) FastReadField11(buf []byte) (int, error) { + offset := 0 + + var _field *TaskSource + if v, l, err := thrift.Binary.ReadString(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + _field = &v + } + p.TaskSource = _field + return offset, nil +} + func (p *Task) FastReadField100(buf []byte) (int, error) { offset := 0 _field := common.NewBaseInfo() @@ -396,6 +424,7 @@ func (p *Task) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { offset += p.fastWriteField8(buf[offset:], w) offset += p.fastWriteField9(buf[offset:], w) offset += p.fastWriteField10(buf[offset:], w) + offset += p.fastWriteField11(buf[offset:], w) offset += p.fastWriteField100(buf[offset:], w) } offset += thrift.Binary.WriteFieldStop(buf[offset:]) @@ -415,6 +444,7 @@ func (p *Task) BLength() int { l += p.field8Length() l += p.field9Length() l += p.field10Length() + l += p.field11Length() l += p.field100Length() } l += thrift.Binary.FieldStopLength() @@ -507,6 +537,15 @@ func (p *Task) fastWriteField10(buf []byte, w thrift.NocopyWriter) int { return offset } +func (p *Task) fastWriteField11(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p.IsSetTaskSource() { + offset += thrift.Binary.WriteFieldBegin(buf[offset:], thrift.STRING, 11) + offset += thrift.Binary.WriteStringNocopy(buf[offset:], w, *p.TaskSource) + } + return offset +} + func (p *Task) fastWriteField100(buf []byte, w thrift.NocopyWriter) int { offset := 0 if p.IsSetBaseInfo() { @@ -602,6 +641,15 @@ func (p *Task) field10Length() int { return l } +func (p *Task) field11Length() int { + l := 0 + if p.IsSetTaskSource() { + l += thrift.Binary.FieldBeginLength() + l += thrift.Binary.StringLengthNocopy(*p.TaskSource) + } + return l +} + func (p *Task) field100Length() int { l := 0 if p.IsSetBaseInfo() { @@ -682,6 +730,11 @@ func (p *Task) DeepCopy(s interface{}) error { } p.BackfillTaskDetail = _backfillTaskDetail + if src.TaskSource != nil { + tmp := *src.TaskSource + p.TaskSource = &tmp + } + var _baseInfo *common.BaseInfo if src.BaseInfo != nil { _baseInfo = &common.BaseInfo{} diff --git a/backend/kitex_gen/coze/loop/observability/domain/task/task.go b/backend/kitex_gen/coze/loop/observability/domain/task/task.go index 8f82e0063..44dad94e2 100644 --- a/backend/kitex_gen/coze/loop/observability/domain/task/task.go +++ b/backend/kitex_gen/coze/loop/observability/domain/task/task.go @@ -41,6 +41,10 @@ const ( RunStatusRunning = "running" RunStatusDone = "done" + + TaskSourceUser = "user" + + TaskSourceWorkflow = "workflow" ) type TimeUnit = string @@ -53,6 +57,8 @@ type TaskStatus = string type RunStatus = string +type TaskSource = string + // Task type Task struct { // 任务 id @@ -75,6 +81,8 @@ type Task struct { TaskDetail *RunDetail `thrift:"task_detail,9,optional" frugal:"9,optional,RunDetail" form:"task_detail" json:"task_detail,omitempty" query:"task_detail"` // 任务历史数据执行详情 BackfillTaskDetail *RunDetail `thrift:"backfill_task_detail,10,optional" frugal:"10,optional,RunDetail" form:"backfill_task_detail" json:"backfill_task_detail,omitempty" query:"backfill_task_detail"` + // 创建来源 + TaskSource *TaskSource `thrift:"task_source,11,optional" frugal:"11,optional,string" form:"task_source" json:"task_source,omitempty" query:"task_source"` // 基础信息 BaseInfo *common.BaseInfo `thrift:"base_info,100,optional" frugal:"100,optional,common.BaseInfo" form:"base_info" json:"base_info,omitempty" query:"base_info"` } @@ -196,6 +204,18 @@ func (p *Task) GetBackfillTaskDetail() (v *RunDetail) { return p.BackfillTaskDetail } +var Task_TaskSource_DEFAULT TaskSource + +func (p *Task) GetTaskSource() (v TaskSource) { + if p == nil { + return + } + if !p.IsSetTaskSource() { + return Task_TaskSource_DEFAULT + } + return *p.TaskSource +} + var Task_BaseInfo_DEFAULT *common.BaseInfo func (p *Task) GetBaseInfo() (v *common.BaseInfo) { @@ -237,6 +257,9 @@ func (p *Task) SetTaskDetail(val *RunDetail) { func (p *Task) SetBackfillTaskDetail(val *RunDetail) { p.BackfillTaskDetail = val } +func (p *Task) SetTaskSource(val *TaskSource) { + p.TaskSource = val +} func (p *Task) SetBaseInfo(val *common.BaseInfo) { p.BaseInfo = val } @@ -252,6 +275,7 @@ var fieldIDToName_Task = map[int16]string{ 8: "task_config", 9: "task_detail", 10: "backfill_task_detail", + 11: "task_source", 100: "base_info", } @@ -287,6 +311,10 @@ func (p *Task) IsSetBackfillTaskDetail() bool { return p.BackfillTaskDetail != nil } +func (p *Task) IsSetTaskSource() bool { + return p.TaskSource != nil +} + func (p *Task) IsSetBaseInfo() bool { return p.BaseInfo != nil } @@ -393,6 +421,14 @@ func (p *Task) Read(iprot thrift.TProtocol) (err error) { } else if err = iprot.Skip(fieldTypeId); err != nil { goto SkipFieldError } + case 11: + if fieldTypeId == thrift.STRING { + if err = p.ReadField11(iprot); err != nil { + goto ReadFieldError + } + } else if err = iprot.Skip(fieldTypeId); err != nil { + goto SkipFieldError + } case 100: if fieldTypeId == thrift.STRUCT { if err = p.ReadField100(iprot); err != nil { @@ -539,6 +575,17 @@ func (p *Task) ReadField10(iprot thrift.TProtocol) error { p.BackfillTaskDetail = _field return nil } +func (p *Task) ReadField11(iprot thrift.TProtocol) error { + + var _field *TaskSource + if v, err := iprot.ReadString(); err != nil { + return err + } else { + _field = &v + } + p.TaskSource = _field + return nil +} func (p *Task) ReadField100(iprot thrift.TProtocol) error { _field := common.NewBaseInfo() if err := _field.Read(iprot); err != nil { @@ -594,6 +641,10 @@ func (p *Task) Write(oprot thrift.TProtocol) (err error) { fieldId = 10 goto WriteFieldError } + if err = p.writeField11(oprot); err != nil { + fieldId = 11 + goto WriteFieldError + } if err = p.writeField100(oprot); err != nil { fieldId = 100 goto WriteFieldError @@ -792,6 +843,24 @@ WriteFieldBeginError: WriteFieldEndError: return thrift.PrependError(fmt.Sprintf("%T write field 10 end error: ", p), err) } +func (p *Task) writeField11(oprot thrift.TProtocol) (err error) { + if p.IsSetTaskSource() { + if err = oprot.WriteFieldBegin("task_source", thrift.STRING, 11); err != nil { + goto WriteFieldBeginError + } + if err := oprot.WriteString(*p.TaskSource); err != nil { + return err + } + if err = oprot.WriteFieldEnd(); err != nil { + goto WriteFieldEndError + } + } + return nil +WriteFieldBeginError: + return thrift.PrependError(fmt.Sprintf("%T write field 11 begin error: ", p), err) +WriteFieldEndError: + return thrift.PrependError(fmt.Sprintf("%T write field 11 end error: ", p), err) +} func (p *Task) writeField100(oprot thrift.TProtocol) (err error) { if p.IsSetBaseInfo() { if err = oprot.WriteFieldBegin("base_info", thrift.STRUCT, 100); err != nil { @@ -855,6 +924,9 @@ func (p *Task) DeepEqual(ano *Task) bool { if !p.Field10DeepEqual(ano.BackfillTaskDetail) { return false } + if !p.Field11DeepEqual(ano.TaskSource) { + return false + } if !p.Field100DeepEqual(ano.BaseInfo) { return false } @@ -951,6 +1023,18 @@ func (p *Task) Field10DeepEqual(src *RunDetail) bool { } return true } +func (p *Task) Field11DeepEqual(src *TaskSource) bool { + + if p.TaskSource == src { + return true + } else if p.TaskSource == nil || src == nil { + return false + } + if strings.Compare(*p.TaskSource, *src) != 0 { + return false + } + return true +} func (p *Task) Field100DeepEqual(src *common.BaseInfo) bool { if !p.BaseInfo.DeepEqual(src) { diff --git a/backend/kitex_gen/coze/loop/observability/task/coze.loop.observability.task.go b/backend/kitex_gen/coze/loop/observability/task/coze.loop.observability.task.go index 6d21a1664..c207e5e0d 100644 --- a/backend/kitex_gen/coze/loop/observability/task/coze.loop.observability.task.go +++ b/backend/kitex_gen/coze/loop/observability/task/coze.loop.observability.task.go @@ -14,8 +14,9 @@ import ( ) type CreateTaskRequest struct { - Task *task.Task `thrift:"task,1,required" frugal:"1,required,task.Task" form:"task,required" json:"task,required"` - Base *base.Base `thrift:"base,255,optional" frugal:"255,optional,base.Base" form:"base" json:"base,omitempty" query:"base"` + Task *task.Task `thrift:"task,1,required" frugal:"1,required,task.Task" form:"task,required" json:"task,required"` + Session *common.Session `thrift:"session,2,optional" frugal:"2,optional,common.Session" form:"session" json:"session,omitempty"` + Base *base.Base `thrift:"base,255,optional" frugal:"255,optional,base.Base" form:"base" json:"base,omitempty" query:"base"` } func NewCreateTaskRequest() *CreateTaskRequest { @@ -37,6 +38,18 @@ func (p *CreateTaskRequest) GetTask() (v *task.Task) { return p.Task } +var CreateTaskRequest_Session_DEFAULT *common.Session + +func (p *CreateTaskRequest) GetSession() (v *common.Session) { + if p == nil { + return + } + if !p.IsSetSession() { + return CreateTaskRequest_Session_DEFAULT + } + return p.Session +} + var CreateTaskRequest_Base_DEFAULT *base.Base func (p *CreateTaskRequest) GetBase() (v *base.Base) { @@ -51,12 +64,16 @@ func (p *CreateTaskRequest) GetBase() (v *base.Base) { func (p *CreateTaskRequest) SetTask(val *task.Task) { p.Task = val } +func (p *CreateTaskRequest) SetSession(val *common.Session) { + p.Session = val +} func (p *CreateTaskRequest) SetBase(val *base.Base) { p.Base = val } var fieldIDToName_CreateTaskRequest = map[int16]string{ 1: "task", + 2: "session", 255: "base", } @@ -64,6 +81,10 @@ func (p *CreateTaskRequest) IsSetTask() bool { return p.Task != nil } +func (p *CreateTaskRequest) IsSetSession() bool { + return p.Session != nil +} + func (p *CreateTaskRequest) IsSetBase() bool { return p.Base != nil } @@ -96,6 +117,14 @@ func (p *CreateTaskRequest) Read(iprot thrift.TProtocol) (err error) { } else if err = iprot.Skip(fieldTypeId); err != nil { goto SkipFieldError } + case 2: + if fieldTypeId == thrift.STRUCT { + if err = p.ReadField2(iprot); err != nil { + goto ReadFieldError + } + } else if err = iprot.Skip(fieldTypeId); err != nil { + goto SkipFieldError + } case 255: if fieldTypeId == thrift.STRUCT { if err = p.ReadField255(iprot); err != nil { @@ -147,6 +176,14 @@ func (p *CreateTaskRequest) ReadField1(iprot thrift.TProtocol) error { p.Task = _field return nil } +func (p *CreateTaskRequest) ReadField2(iprot thrift.TProtocol) error { + _field := common.NewSession() + if err := _field.Read(iprot); err != nil { + return err + } + p.Session = _field + return nil +} func (p *CreateTaskRequest) ReadField255(iprot thrift.TProtocol) error { _field := base.NewBase() if err := _field.Read(iprot); err != nil { @@ -166,6 +203,10 @@ func (p *CreateTaskRequest) Write(oprot thrift.TProtocol) (err error) { fieldId = 1 goto WriteFieldError } + if err = p.writeField2(oprot); err != nil { + fieldId = 2 + goto WriteFieldError + } if err = p.writeField255(oprot); err != nil { fieldId = 255 goto WriteFieldError @@ -204,6 +245,24 @@ WriteFieldBeginError: WriteFieldEndError: return thrift.PrependError(fmt.Sprintf("%T write field 1 end error: ", p), err) } +func (p *CreateTaskRequest) writeField2(oprot thrift.TProtocol) (err error) { + if p.IsSetSession() { + if err = oprot.WriteFieldBegin("session", thrift.STRUCT, 2); err != nil { + goto WriteFieldBeginError + } + if err := p.Session.Write(oprot); err != nil { + return err + } + if err = oprot.WriteFieldEnd(); err != nil { + goto WriteFieldEndError + } + } + return nil +WriteFieldBeginError: + return thrift.PrependError(fmt.Sprintf("%T write field 2 begin error: ", p), err) +WriteFieldEndError: + return thrift.PrependError(fmt.Sprintf("%T write field 2 end error: ", p), err) +} func (p *CreateTaskRequest) writeField255(oprot thrift.TProtocol) (err error) { if p.IsSetBase() { if err = oprot.WriteFieldBegin("base", thrift.STRUCT, 255); err != nil { @@ -240,6 +299,9 @@ func (p *CreateTaskRequest) DeepEqual(ano *CreateTaskRequest) bool { if !p.Field1DeepEqual(ano.Task) { return false } + if !p.Field2DeepEqual(ano.Session) { + return false + } if !p.Field255DeepEqual(ano.Base) { return false } @@ -253,6 +315,13 @@ func (p *CreateTaskRequest) Field1DeepEqual(src *task.Task) bool { } return true } +func (p *CreateTaskRequest) Field2DeepEqual(src *common.Session) bool { + + if !p.Session.DeepEqual(src) { + return false + } + return true +} func (p *CreateTaskRequest) Field255DeepEqual(src *base.Base) bool { if !p.Base.DeepEqual(src) { @@ -518,6 +587,7 @@ type UpdateTaskRequest struct { Description *string `thrift:"description,4,optional" frugal:"4,optional,string" form:"description" json:"description,omitempty"` EffectiveTime *task.EffectiveTime `thrift:"effective_time,5,optional" frugal:"5,optional,task.EffectiveTime" form:"effective_time" json:"effective_time,omitempty"` SampleRate *float64 `thrift:"sample_rate,6,optional" frugal:"6,optional,double" form:"sample_rate" json:"sample_rate,omitempty"` + Session *common.Session `thrift:"session,7,optional" frugal:"7,optional,common.Session" form:"session" json:"session,omitempty"` Base *base.Base `thrift:"base,255,optional" frugal:"255,optional,base.Base" form:"base" json:"base,omitempty" query:"base"` } @@ -590,6 +660,18 @@ func (p *UpdateTaskRequest) GetSampleRate() (v float64) { return *p.SampleRate } +var UpdateTaskRequest_Session_DEFAULT *common.Session + +func (p *UpdateTaskRequest) GetSession() (v *common.Session) { + if p == nil { + return + } + if !p.IsSetSession() { + return UpdateTaskRequest_Session_DEFAULT + } + return p.Session +} + var UpdateTaskRequest_Base_DEFAULT *base.Base func (p *UpdateTaskRequest) GetBase() (v *base.Base) { @@ -619,6 +701,9 @@ func (p *UpdateTaskRequest) SetEffectiveTime(val *task.EffectiveTime) { func (p *UpdateTaskRequest) SetSampleRate(val *float64) { p.SampleRate = val } +func (p *UpdateTaskRequest) SetSession(val *common.Session) { + p.Session = val +} func (p *UpdateTaskRequest) SetBase(val *base.Base) { p.Base = val } @@ -630,6 +715,7 @@ var fieldIDToName_UpdateTaskRequest = map[int16]string{ 4: "description", 5: "effective_time", 6: "sample_rate", + 7: "session", 255: "base", } @@ -649,6 +735,10 @@ func (p *UpdateTaskRequest) IsSetSampleRate() bool { return p.SampleRate != nil } +func (p *UpdateTaskRequest) IsSetSession() bool { + return p.Session != nil +} + func (p *UpdateTaskRequest) IsSetBase() bool { return p.Base != nil } @@ -723,6 +813,14 @@ func (p *UpdateTaskRequest) Read(iprot thrift.TProtocol) (err error) { } else if err = iprot.Skip(fieldTypeId); err != nil { goto SkipFieldError } + case 7: + if fieldTypeId == thrift.STRUCT { + if err = p.ReadField7(iprot); err != nil { + goto ReadFieldError + } + } else if err = iprot.Skip(fieldTypeId); err != nil { + goto SkipFieldError + } case 255: if fieldTypeId == thrift.STRUCT { if err = p.ReadField255(iprot); err != nil { @@ -834,6 +932,14 @@ func (p *UpdateTaskRequest) ReadField6(iprot thrift.TProtocol) error { p.SampleRate = _field return nil } +func (p *UpdateTaskRequest) ReadField7(iprot thrift.TProtocol) error { + _field := common.NewSession() + if err := _field.Read(iprot); err != nil { + return err + } + p.Session = _field + return nil +} func (p *UpdateTaskRequest) ReadField255(iprot thrift.TProtocol) error { _field := base.NewBase() if err := _field.Read(iprot); err != nil { @@ -873,6 +979,10 @@ func (p *UpdateTaskRequest) Write(oprot thrift.TProtocol) (err error) { fieldId = 6 goto WriteFieldError } + if err = p.writeField7(oprot); err != nil { + fieldId = 7 + goto WriteFieldError + } if err = p.writeField255(oprot); err != nil { fieldId = 255 goto WriteFieldError @@ -999,6 +1109,24 @@ WriteFieldBeginError: WriteFieldEndError: return thrift.PrependError(fmt.Sprintf("%T write field 6 end error: ", p), err) } +func (p *UpdateTaskRequest) writeField7(oprot thrift.TProtocol) (err error) { + if p.IsSetSession() { + if err = oprot.WriteFieldBegin("session", thrift.STRUCT, 7); err != nil { + goto WriteFieldBeginError + } + if err := p.Session.Write(oprot); err != nil { + return err + } + if err = oprot.WriteFieldEnd(); err != nil { + goto WriteFieldEndError + } + } + return nil +WriteFieldBeginError: + return thrift.PrependError(fmt.Sprintf("%T write field 7 begin error: ", p), err) +WriteFieldEndError: + return thrift.PrependError(fmt.Sprintf("%T write field 7 end error: ", p), err) +} func (p *UpdateTaskRequest) writeField255(oprot thrift.TProtocol) (err error) { if p.IsSetBase() { if err = oprot.WriteFieldBegin("base", thrift.STRUCT, 255); err != nil { @@ -1050,6 +1178,9 @@ func (p *UpdateTaskRequest) DeepEqual(ano *UpdateTaskRequest) bool { if !p.Field6DeepEqual(ano.SampleRate) { return false } + if !p.Field7DeepEqual(ano.Session) { + return false + } if !p.Field255DeepEqual(ano.Base) { return false } @@ -1113,6 +1244,13 @@ func (p *UpdateTaskRequest) Field6DeepEqual(src *float64) bool { } return true } +func (p *UpdateTaskRequest) Field7DeepEqual(src *common.Session) bool { + + if !p.Session.DeepEqual(src) { + return false + } + return true +} func (p *UpdateTaskRequest) Field255DeepEqual(src *base.Base) bool { if !p.Base.DeepEqual(src) { diff --git a/backend/kitex_gen/coze/loop/observability/task/coze.loop.observability.task_validator.go b/backend/kitex_gen/coze/loop/observability/task/coze.loop.observability.task_validator.go index 94ddf6901..8e025bfa9 100644 --- a/backend/kitex_gen/coze/loop/observability/task/coze.loop.observability.task_validator.go +++ b/backend/kitex_gen/coze/loop/observability/task/coze.loop.observability.task_validator.go @@ -27,6 +27,11 @@ func (p *CreateTaskRequest) IsValid() error { return fmt.Errorf("field Task not valid, %w", err) } } + if p.Session != nil { + if err := p.Session.IsValid(); err != nil { + return fmt.Errorf("field Session not valid, %w", err) + } + } if p.Base != nil { if err := p.Base.IsValid(); err != nil { return fmt.Errorf("field Base not valid, %w", err) @@ -51,6 +56,11 @@ func (p *UpdateTaskRequest) IsValid() error { return fmt.Errorf("field EffectiveTime not valid, %w", err) } } + if p.Session != nil { + if err := p.Session.IsValid(); err != nil { + return fmt.Errorf("field Session not valid, %w", err) + } + } if p.Base != nil { if err := p.Base.IsValid(); err != nil { return fmt.Errorf("field Base not valid, %w", err) diff --git a/backend/kitex_gen/coze/loop/observability/task/k-coze.loop.observability.task.go b/backend/kitex_gen/coze/loop/observability/task/k-coze.loop.observability.task.go index 3732777b3..17a7d2dd6 100644 --- a/backend/kitex_gen/coze/loop/observability/task/k-coze.loop.observability.task.go +++ b/backend/kitex_gen/coze/loop/observability/task/k-coze.loop.observability.task.go @@ -66,6 +66,20 @@ func (p *CreateTaskRequest) FastRead(buf []byte) (int, error) { goto SkipFieldError } } + case 2: + if fieldTypeId == thrift.STRUCT { + l, err = p.FastReadField2(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } case 255: if fieldTypeId == thrift.STRUCT { l, err = p.FastReadField255(buf[offset:]) @@ -116,6 +130,18 @@ func (p *CreateTaskRequest) FastReadField1(buf []byte) (int, error) { return offset, nil } +func (p *CreateTaskRequest) FastReadField2(buf []byte) (int, error) { + offset := 0 + _field := common.NewSession() + if l, err := _field.FastRead(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + } + p.Session = _field + return offset, nil +} + func (p *CreateTaskRequest) FastReadField255(buf []byte) (int, error) { offset := 0 _field := base.NewBase() @@ -136,6 +162,7 @@ func (p *CreateTaskRequest) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) i offset := 0 if p != nil { offset += p.fastWriteField1(buf[offset:], w) + offset += p.fastWriteField2(buf[offset:], w) offset += p.fastWriteField255(buf[offset:], w) } offset += thrift.Binary.WriteFieldStop(buf[offset:]) @@ -146,6 +173,7 @@ func (p *CreateTaskRequest) BLength() int { l := 0 if p != nil { l += p.field1Length() + l += p.field2Length() l += p.field255Length() } l += thrift.Binary.FieldStopLength() @@ -159,6 +187,15 @@ func (p *CreateTaskRequest) fastWriteField1(buf []byte, w thrift.NocopyWriter) i return offset } +func (p *CreateTaskRequest) fastWriteField2(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p.IsSetSession() { + offset += thrift.Binary.WriteFieldBegin(buf[offset:], thrift.STRUCT, 2) + offset += p.Session.FastWriteNocopy(buf[offset:], w) + } + return offset +} + func (p *CreateTaskRequest) fastWriteField255(buf []byte, w thrift.NocopyWriter) int { offset := 0 if p.IsSetBase() { @@ -175,6 +212,15 @@ func (p *CreateTaskRequest) field1Length() int { return l } +func (p *CreateTaskRequest) field2Length() int { + l := 0 + if p.IsSetSession() { + l += thrift.Binary.FieldBeginLength() + l += p.Session.BLength() + } + return l +} + func (p *CreateTaskRequest) field255Length() int { l := 0 if p.IsSetBase() { @@ -199,6 +245,15 @@ func (p *CreateTaskRequest) DeepCopy(s interface{}) error { } p.Task = _task + var _session *common.Session + if src.Session != nil { + _session = &common.Session{} + if err := _session.DeepCopy(src.Session); err != nil { + return err + } + } + p.Session = _session + var _base *base.Base if src.Base != nil { _base = &base.Base{} @@ -488,6 +543,20 @@ func (p *UpdateTaskRequest) FastRead(buf []byte) (int, error) { goto SkipFieldError } } + case 7: + if fieldTypeId == thrift.STRUCT { + l, err = p.FastReadField7(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } case 255: if fieldTypeId == thrift.STRUCT { l, err = p.FastReadField255(buf[offset:]) @@ -613,6 +682,18 @@ func (p *UpdateTaskRequest) FastReadField6(buf []byte) (int, error) { return offset, nil } +func (p *UpdateTaskRequest) FastReadField7(buf []byte) (int, error) { + offset := 0 + _field := common.NewSession() + if l, err := _field.FastRead(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + } + p.Session = _field + return offset, nil +} + func (p *UpdateTaskRequest) FastReadField255(buf []byte) (int, error) { offset := 0 _field := base.NewBase() @@ -638,6 +719,7 @@ func (p *UpdateTaskRequest) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) i offset += p.fastWriteField3(buf[offset:], w) offset += p.fastWriteField4(buf[offset:], w) offset += p.fastWriteField5(buf[offset:], w) + offset += p.fastWriteField7(buf[offset:], w) offset += p.fastWriteField255(buf[offset:], w) } offset += thrift.Binary.WriteFieldStop(buf[offset:]) @@ -653,6 +735,7 @@ func (p *UpdateTaskRequest) BLength() int { l += p.field4Length() l += p.field5Length() l += p.field6Length() + l += p.field7Length() l += p.field255Length() } l += thrift.Binary.FieldStopLength() @@ -709,6 +792,15 @@ func (p *UpdateTaskRequest) fastWriteField6(buf []byte, w thrift.NocopyWriter) i return offset } +func (p *UpdateTaskRequest) fastWriteField7(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p.IsSetSession() { + offset += thrift.Binary.WriteFieldBegin(buf[offset:], thrift.STRUCT, 7) + offset += p.Session.FastWriteNocopy(buf[offset:], w) + } + return offset +} + func (p *UpdateTaskRequest) fastWriteField255(buf []byte, w thrift.NocopyWriter) int { offset := 0 if p.IsSetBase() { @@ -768,6 +860,15 @@ func (p *UpdateTaskRequest) field6Length() int { return l } +func (p *UpdateTaskRequest) field7Length() int { + l := 0 + if p.IsSetSession() { + l += thrift.Binary.FieldBeginLength() + l += p.Session.BLength() + } + return l +} + func (p *UpdateTaskRequest) field255Length() int { l := 0 if p.IsSetBase() { @@ -814,6 +915,15 @@ func (p *UpdateTaskRequest) DeepCopy(s interface{}) error { p.SampleRate = &tmp } + var _session *common.Session + if src.Session != nil { + _session = &common.Session{} + if err := _session.DeepCopy(src.Session); err != nil { + return err + } + } + p.Session = _session + var _base *base.Base if src.Base != nil { _base = &base.Base{} diff --git a/backend/modules/observability/application/convertor/task/task.go b/backend/modules/observability/application/convertor/task/task.go index 38831ee19..c969881b3 100644 --- a/backend/modules/observability/application/convertor/task/task.go +++ b/backend/modules/observability/application/convertor/task/task.go @@ -73,6 +73,11 @@ func TaskDO2DTO(ctx context.Context, v *entity.ObservabilityTask, userMap map[st UpdatedBy: UserInfoPO2DO(userMap[v.UpdatedBy], v.UpdatedBy), }, } + + if v.TaskSource != nil { + taskInfo.TaskSource = gptr.Of(*v.TaskSource) + } + return taskInfo } @@ -319,7 +324,7 @@ func TaskDTO2DO(taskDTO *task.Task) *entity.ObservabilityTask { spanFilterDO := SpanFilterDTO2DO(taskDTO.GetRule().GetSpanFilters()) - return &entity.ObservabilityTask{ + entityTask := &entity.ObservabilityTask{ ID: taskDTO.GetID(), WorkspaceID: taskDTO.GetWorkspaceID(), Name: taskDTO.GetName(), @@ -337,6 +342,12 @@ func TaskDTO2DO(taskDTO *task.Task) *entity.ObservabilityTask { UpdatedBy: updatedBy, BackfillEffectiveTime: EffectiveTimeDTO2DO(taskDTO.GetRule().GetBackfillEffectiveTime()), } + + if taskDTO.TaskSource != nil { + entityTask.TaskSource = ptr.Of(*taskDTO.TaskSource) + } + + return entityTask } func SpanFilterDTO2DO(spanFilterFields *filter.SpanFilterFields) *entity.SpanFilterFields { diff --git a/backend/modules/observability/application/task.go b/backend/modules/observability/application/task.go index 679d9b974..bbd8179f1 100644 --- a/backend/modules/observability/application/task.go +++ b/backend/modules/observability/application/task.go @@ -6,9 +6,11 @@ package application import ( "context" "strconv" + "strings" "time" "github.com/coze-dev/coze-loop/backend/infra/middleware/session" + "github.com/coze-dev/coze-loop/backend/kitex_gen/coze/loop/observability/domain/common" "github.com/coze-dev/coze-loop/backend/kitex_gen/coze/loop/observability/task" "github.com/coze-dev/coze-loop/backend/modules/data/pkg/errno" "github.com/coze-dev/coze-loop/backend/modules/observability/application/convertor" @@ -116,10 +118,9 @@ func (t *TaskApplication) CreateTask(ctx context.Context, req *task.CreateTaskRe false); err != nil { return resp, err } - - userID := session.UserIDInCtxOrEmpty(ctx) - if userID == "" { - return nil, errorx.NewByCode(obErrorx.UserParseFailedCode) + userID, err := GetUserID(ctx, req.GetSession()) + if err != nil { + return nil, err } // 创建task @@ -135,6 +136,20 @@ func (t *TaskApplication) CreateTask(ctx context.Context, req *task.CreateTaskRe return &task.CreateTaskResponse{TaskID: sResp.TaskID}, nil } +func GetUserID(ctx context.Context, sessionReq *common.Session) (string, error) { + if userID := session.UserIDInCtxOrEmpty(ctx); userID != "" { + return userID, nil + } + if sessionReq == nil { + return "", errorx.NewByCode(obErrorx.UserParseFailedCode) + } + userID := strings.TrimSpace(sessionReq.GetUserID()) + if userID == "" { + return "", errorx.NewByCode(obErrorx.UserParseFailedCode) + } + return userID, nil +} + func (t *TaskApplication) validateCreateTaskReq(ctx context.Context, req *task.CreateTaskRequest) error { // 参数验证 if req == nil || req.GetTask() == nil { @@ -177,17 +192,22 @@ func (t *TaskApplication) UpdateTask(ctx context.Context, req *task.UpdateTaskRe strconv.FormatInt(req.GetTaskID(), 10)); err != nil { return nil, err } + userID, err := GetUserID(ctx, req.GetSession()) + if err != nil { + return nil, err + } var taskStatus *entity.TaskStatus if req.TaskStatus != nil { taskStatus = lo.ToPtr(entity.TaskStatus(req.GetTaskStatus())) } - err := t.taskSvc.UpdateTask(ctx, &service.UpdateTaskReq{ + err = t.taskSvc.UpdateTask(ctx, &service.UpdateTaskReq{ TaskID: req.GetTaskID(), WorkspaceID: req.GetWorkspaceID(), TaskStatus: taskStatus, Description: req.Description, EffectiveTime: tconv.EffectiveTimeDTO2DO(req.EffectiveTime), SampleRate: req.SampleRate, + UserID: userID, }) if err != nil { return resp, err diff --git a/backend/modules/observability/application/task_test.go b/backend/modules/observability/application/task_test.go index 2c8208312..b83b00051 100755 --- a/backend/modules/observability/application/task_test.go +++ b/backend/modules/observability/application/task_test.go @@ -198,6 +198,7 @@ func TestTaskApplication_CreateTask(t *testing.T) { EndAt: gptr.Of(time.Now().Add(2 * time.Hour).UnixMilli()), }, }, + TaskStatus: gptr.Of(taskdto.TaskStatusPending), } } @@ -262,6 +263,19 @@ func TestTaskApplication_CreateTask(t *testing.T) { return svcMock, auth }, }, + { + name: "error with invalid user id", + ctx: context.Background(), + req: &taskapi.CreateTaskRequest{Task: taskForSuccess}, + expectResp: nil, + expectErrCode: obErrorx.UserParseFailedCode, + fieldsBuilder: func(ctrl *gomock.Controller) (svc.ITaskService, rpc.IAuthProvider) { + auth := rpcmock.NewMockIAuthProvider(ctrl) + auth.EXPECT().CheckWorkspacePermission(gomock.Any(), rpc.AuthActionTraceTaskCreate, strconv.FormatInt(123, 10), false).Return(nil) + svcMock := svcmock.NewMockITaskService(ctrl) + return svcMock, auth + }, + }, { name: "success with trace app", ctx: ctxWithAppID(717152), @@ -275,6 +289,19 @@ func TestTaskApplication_CreateTask(t *testing.T) { return svcMock, auth }, }, + { + name: "success with user id", + ctx: context.Background(), + req: &taskapi.CreateTaskRequest{Task: taskForSuccess, Session: &commondomain.Session{UserID: gptr.Of("1")}}, + expectResp: &taskapi.CreateTaskResponse{TaskID: gptr.Of(int64(1000))}, + fieldsBuilder: func(ctrl *gomock.Controller) (svc.ITaskService, rpc.IAuthProvider) { + auth := rpcmock.NewMockIAuthProvider(ctrl) + auth.EXPECT().CheckWorkspacePermission(gomock.Any(), rpc.AuthActionTraceTaskCreate, strconv.FormatInt(123, 10), false).Return(nil) + svcMock := svcmock.NewMockITaskService(ctrl) + svcMock.EXPECT().CreateTask(gomock.Any(), gomock.AssignableToTypeOf(&svc.CreateTaskReq{})).Return(&svc.CreateTaskResp{TaskID: gptr.Of(int64(1000))}, nil) + return svcMock, auth + }, + }, } for _, tt := range tests { @@ -357,7 +384,7 @@ func TestTaskApplication_UpdateTask(t *testing.T) { { name: "service error", ctx: context.Background(), - req: &taskapi.UpdateTaskRequest{TaskID: 33, WorkspaceID: 44}, + req: &taskapi.UpdateTaskRequest{TaskID: 33, WorkspaceID: 44, Session: &commondomain.Session{UserID: gptr.Of("1")}}, expectResp: taskapi.NewUpdateTaskResponse(), expectErr: errors.New("svc error"), fieldsBuilder: func(ctrl *gomock.Controller) (svc.ITaskService, rpc.IAuthProvider) { @@ -369,6 +396,7 @@ func TestTaskApplication_UpdateTask(t *testing.T) { WorkspaceID: 44, TaskStatus: nil, Description: nil, + UserID: "1", }).Return(errors.New("svc error")) return s, auth }, @@ -376,7 +404,7 @@ func TestTaskApplication_UpdateTask(t *testing.T) { { name: "success", ctx: context.Background(), - req: &taskapi.UpdateTaskRequest{TaskID: 55, WorkspaceID: 66}, + req: &taskapi.UpdateTaskRequest{TaskID: 55, WorkspaceID: 66, Session: &commondomain.Session{UserID: gptr.Of("1")}}, expectResp: taskapi.NewUpdateTaskResponse(), fieldsBuilder: func(ctrl *gomock.Controller) (svc.ITaskService, rpc.IAuthProvider) { auth := rpcmock.NewMockIAuthProvider(ctrl) @@ -387,6 +415,7 @@ func TestTaskApplication_UpdateTask(t *testing.T) { WorkspaceID: 66, TaskStatus: nil, Description: nil, + UserID: "1", }).Return(nil) return s, auth }, diff --git a/backend/modules/observability/domain/task/entity/filter.go b/backend/modules/observability/domain/task/entity/filter.go index 23f515597..8f5d7306d 100755 --- a/backend/modules/observability/domain/task/entity/filter.go +++ b/backend/modules/observability/domain/task/entity/filter.go @@ -15,6 +15,8 @@ type FieldType string // TaskFieldName defines the supported task field names for filtering. type TaskFieldName string +type TaskSourceValue string + const ( QueryTypeMatch QueryType = "match" QueryTypeEq QueryType = "eq" @@ -42,6 +44,9 @@ const ( TaskFieldNameTaskType TaskFieldName = "task_type" TaskFieldNameSampleRate TaskFieldName = "sample_rate" TaskFieldNameCreatedBy TaskFieldName = "created_by" + TaskFieldNameTaskSource TaskFieldName = "task_source" + + TaskSourceUser TaskSourceValue = "user" ) // TaskFilterFields aggregates multiple TaskFilterField expressions. diff --git a/backend/modules/observability/domain/task/entity/task.go b/backend/modules/observability/domain/task/entity/task.go index fa39b7445..f6e27a841 100644 --- a/backend/modules/observability/domain/task/entity/task.go +++ b/backend/modules/observability/domain/task/entity/task.go @@ -78,6 +78,7 @@ type ObservabilityTask struct { UpdatedAt time.Time // 更新时间 CreatedBy string // 创建人 UpdatedBy string // 更新人 + TaskSource *string // 创建来源 TaskRuns []*TaskRun } @@ -150,6 +151,7 @@ type BackfillDetail struct { BackfillStatus *string `json:"backfill_status"` LastSpanPageToken *string `json:"last_span_page_token"` } + type TaskRunConfig struct { AutoEvaluateRunConfig *AutoEvaluateRunConfig `json:"auto_evaluate_run_config"` DataReflowRunConfig *DataReflowRunConfig `json:"data_reflow_run_config"` diff --git a/backend/modules/observability/domain/task/service/task_service.go b/backend/modules/observability/domain/task/service/task_service.go index 74bc33ae1..f743a392e 100644 --- a/backend/modules/observability/domain/task/service/task_service.go +++ b/backend/modules/observability/domain/task/service/task_service.go @@ -39,6 +39,7 @@ type UpdateTaskReq struct { Description *string EffectiveTime *entity.EffectiveTime SampleRate *float64 + UserID string } type ListTasksReq struct { WorkspaceID int64 @@ -152,7 +153,7 @@ func (t *TaskServiceImpl) CreateTask(ctx context.Context, req *CreateTaskReq) (r TaskID: id, } - if err := t.SendBackfillMessage(context.Background(), backfillEvent); err != nil { + if err := t.SendBackfillMessage(ctx, backfillEvent); err != nil { // 失败了会有定时任务进行补偿 logs.CtxWarn(ctx, "send backfill message failed, task_id=%d, err=%v", id, err) } @@ -217,7 +218,7 @@ func (t *TaskServiceImpl) UpdateTask(ctx context.Context, req *UpdateTaskReq) (e } } } - taskDO.UpdatedBy = userID + taskDO.UpdatedBy = req.UserID taskDO.UpdatedAt = time.Now() if err = t.TaskRepo.UpdateTask(ctx, taskDO); err != nil { return err @@ -226,9 +227,19 @@ func (t *TaskServiceImpl) UpdateTask(ctx context.Context, req *UpdateTaskReq) (e } func (t *TaskServiceImpl) ListTasks(ctx context.Context, req *ListTasksReq) (resp *ListTasksResp, err error) { + taskFilters := &entity.TaskFilterFields{} + if req.TaskFilters != nil { + taskFilters = req.TaskFilters + } + taskFilters.FilterFields = append(taskFilters.FilterFields, &entity.TaskFilterField{ + FieldName: gptr.Of(entity.TaskFieldNameTaskSource), + FieldType: gptr.Of(entity.FieldTypeString), + Values: []string{string(entity.TaskSourceUser)}, + QueryType: gptr.Of(entity.QueryTypeIn), + }) taskDOs, total, err := t.TaskRepo.ListTasks(ctx, repo.ListTaskParam{ WorkspaceIDs: []int64{req.WorkspaceID}, - TaskFilters: req.TaskFilters, + TaskFilters: taskFilters, ReqLimit: req.Limit, ReqOffset: req.Offset, OrderBy: req.OrderBy, diff --git a/backend/modules/observability/domain/task/service/task_service_test.go b/backend/modules/observability/domain/task/service/task_service_test.go index 26cbb4819..36cb002d4 100755 --- a/backend/modules/observability/domain/task/service/task_service_test.go +++ b/backend/modules/observability/domain/task/service/task_service_test.go @@ -310,7 +310,7 @@ func TestTaskServiceImpl_UpdateTask(t *testing.T) { Sampler: &entity.Sampler{SampleRate: 0.1}, TaskRuns: []*entity.TaskRun{{RunStatus: entity.TaskRunStatusRunning}}, UpdatedAt: now, - UpdatedBy: "", + UpdatedBy: "user1", } repoMock.EXPECT().GetTask(gomock.Any(), int64(1), gomock.Any(), gomock.Nil()).Return(taskDO, nil) @@ -332,6 +332,7 @@ func TestTaskServiceImpl_UpdateTask(t *testing.T) { EffectiveTime: &entity.EffectiveTime{StartAt: newStart, EndAt: newEnd}, SampleRate: &sampleRate, TaskStatus: gptr.Of(entity.TaskStatusDisabled), + UserID: "user1", }) assert.NoError(t, err) assert.True(t, proc.onFinishRunCalled) @@ -373,6 +374,7 @@ func TestTaskServiceImpl_UpdateTask(t *testing.T) { WorkspaceID: 2, SampleRate: &sampleRate, TaskStatus: gptr.Of(entity.TaskStatusDisabled), + UserID: "user", }) assert.NoError(t, err) assert.True(t, proc.onFinishRunCalled) @@ -409,6 +411,7 @@ func TestTaskServiceImpl_UpdateTask(t *testing.T) { EffectiveTime: &entity.EffectiveTime{StartAt: newStart, EndAt: newEnd}, SampleRate: &sampleRate, TaskStatus: gptr.Of(entity.TaskStatusDisabled), + UserID: "user", }) assert.EqualError(t, err, "finish fail") }) diff --git a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go index 881e6a8b3..26d60fa75 100644 --- a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go +++ b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go @@ -24,7 +24,7 @@ import ( ) const ( - pageSize = 500 + pageSize = 100 backfillLockKeyTemplate = "observability:tracehub:backfill:%d" backfillLockMaxHold = 24 * time.Hour backfillLockTTL = 3 * time.Minute @@ -174,6 +174,15 @@ func (h *TraceHubServiceImpl) listAndSendSpans(ctx context.Context, sub *spanSub totalCount += int64(len(spans)) logs.CtxInfo(ctx, "Processed %d spans completed, total=%d, task_id=%d", len(spans), totalCount, sub.t.ID) + // todo 不应该这里直接写po字段 + err = h.taskRepo.UpdateTaskRunWithOCC(ctx, sub.tr.ID, sub.tr.WorkspaceID, map[string]interface{}{ + "backfill_detail": ToJSONString(ctx, sub.tr.BackfillDetail), + }) + if err != nil { + logs.CtxError(ctx, "update task run failed, task_id=%d, err=%v", sub.t.ID, err) + return err + } + if pageToken == "" || shouldFinish { logs.CtxInfo(ctx, "no more spans to process, task_id=%d", sub.t.ID) if err = sub.processor.OnTaskFinished(ctx, taskexe.OnTaskFinishedReq{ @@ -186,6 +195,7 @@ func (h *TraceHubServiceImpl) listAndSendSpans(ctx context.Context, sub *spanSub return nil } listParam.PageToken = pageToken + sub.tr.BackfillDetail.LastSpanPageToken = &pageToken } } @@ -346,15 +356,6 @@ func (h *TraceHubServiceImpl) flushSpans(ctx context.Context, spans []*loop_span return } - // todo 不应该这里直接写po字段 - err = h.taskRepo.UpdateTaskRunWithOCC(ctx, sub.tr.ID, sub.tr.WorkspaceID, map[string]interface{}{ - "backfill_detail": ToJSONString(ctx, sub.tr.BackfillDetail), - }) - if err != nil { - logs.CtxError(ctx, "update task run failed, task_id=%d, err=%v", sub.t.ID, err) - return - } - logs.CtxInfo(ctx, "successfully processed %d spans (sampled from %d), task_id=%d", len(sampledSpans), len(spans), sub.t.ID) return @@ -392,7 +393,7 @@ func (h *TraceHubServiceImpl) applySampling(spans []*loop_span.Span, sub *spanSu // processSpansForBackfill handles spans for backfill func (h *TraceHubServiceImpl) processSpansForBackfill(ctx context.Context, spans []*loop_span.Span, sub *spanSubscriber) (err error, shouldFinish bool) { // Batch processing spans for efficiency - const batchSize = 50 + const batchSize = 10 for i := 0; i < len(spans); i += batchSize { end := i + batchSize @@ -407,11 +408,13 @@ func (h *TraceHubServiceImpl) processSpansForBackfill(ctx context.Context, spans sub.t.ID, i, err) return } + if shouldFinish { return } + // ml_flow rate-limited: 50/5s - time.Sleep(5 * time.Second) + time.Sleep(1 * time.Second) } return err, shouldFinish @@ -452,7 +455,7 @@ func (h *TraceHubServiceImpl) onHandleDone(ctx context.Context, err error, sub * } if time.Now().UnixMilli()-(sub.tr.RunEndAt.UnixMilli()-sub.tr.RunStartAt.UnixMilli()) < sub.tr.RunEndAt.UnixMilli() { - if sendErr := h.sendBackfillMessage(context.Background(), backfillEvent); sendErr != nil { + if sendErr := h.sendBackfillMessage(ctx, backfillEvent); sendErr != nil { logs.CtxWarn(ctx, "send backfill message failed, task_id=%d, err=%v", sub.t.ID, sendErr) return sendErr } diff --git a/backend/modules/observability/infra/mq/producer/backfill_producer.go b/backend/modules/observability/infra/mq/producer/backfill_producer.go index 2724df6a3..6ae02f3e4 100644 --- a/backend/modules/observability/infra/mq/producer/backfill_producer.go +++ b/backend/modules/observability/infra/mq/producer/backfill_producer.go @@ -42,7 +42,7 @@ func (b *BackfillProducerImpl) SendBackfill(ctx context.Context, message *entity logs.CtxWarn(ctx, "send annotation msg err: %v", err) return errorx.WrapByCode(err, obErrorx.CommercialCommonRPCErrorCodeCode) } - logs.CtxInfo(ctx, "send annotation msg %s successfully, msgId: %s", string(bytes), sendMsg.MessageID) + logs.CtxInfo(ctx, "send backfill msg %s successfully, msgId: %s", string(bytes), sendMsg.MessageID) return nil } diff --git a/backend/modules/observability/infra/repo/mysql/convertor/task.go b/backend/modules/observability/infra/repo/mysql/convertor/task.go index a788a3719..7c735e108 100644 --- a/backend/modules/observability/infra/repo/mysql/convertor/task.go +++ b/backend/modules/observability/infra/repo/mysql/convertor/task.go @@ -29,6 +29,7 @@ func TaskDO2PO(task *entity.ObservabilityTask) *model.ObservabilityTask { UpdatedAt: task.UpdatedAt, CreatedBy: task.CreatedBy, UpdatedBy: task.UpdatedBy, + TaskSource: task.TaskSource, } } @@ -50,6 +51,7 @@ func TaskPO2DO(task *model.ObservabilityTask) *entity.ObservabilityTask { UpdatedAt: task.UpdatedAt, CreatedBy: task.CreatedBy, UpdatedBy: task.UpdatedBy, + TaskSource: task.TaskSource, } } diff --git a/backend/modules/observability/infra/repo/mysql/gorm_gen/model/task.gen.go b/backend/modules/observability/infra/repo/mysql/gorm_gen/model/task.gen.go index 89a228b1b..5906633a5 100644 --- a/backend/modules/observability/infra/repo/mysql/gorm_gen/model/task.gen.go +++ b/backend/modules/observability/infra/repo/mysql/gorm_gen/model/task.gen.go @@ -28,6 +28,7 @@ type ObservabilityTask struct { UpdatedAt time.Time `gorm:"column:updated_at;type:timestamp;not null;default:CURRENT_TIMESTAMP;comment:更新时间" json:"updated_at"` // 更新时间 CreatedBy string `gorm:"column:created_by;type:varchar(128);not null;comment:创建人" json:"created_by"` // 创建人 UpdatedBy string `gorm:"column:updated_by;type:varchar(128);not null;comment:更新人" json:"updated_by"` // 更新人 + TaskSource *string `gorm:"column:task_source;type:varchar(50);default:user;comment:任务来源" json:"task_source"` // 任务来源 } // TableName ObservabilityTask's table name diff --git a/backend/modules/observability/infra/repo/mysql/gorm_gen/query/task.gen.go b/backend/modules/observability/infra/repo/mysql/gorm_gen/query/task.gen.go index 26e5002f6..4ce7251e8 100644 --- a/backend/modules/observability/infra/repo/mysql/gorm_gen/query/task.gen.go +++ b/backend/modules/observability/infra/repo/mysql/gorm_gen/query/task.gen.go @@ -43,6 +43,7 @@ func newObservabilityTask(db *gorm.DB, opts ...gen.DOOption) observabilityTask { _observabilityTask.UpdatedAt = field.NewTime(tableName, "updated_at") _observabilityTask.CreatedBy = field.NewString(tableName, "created_by") _observabilityTask.UpdatedBy = field.NewString(tableName, "updated_by") + _observabilityTask.TaskSource = field.NewString(tableName, "task_source") _observabilityTask.fillFieldMap() @@ -70,6 +71,7 @@ type observabilityTask struct { UpdatedAt field.Time // 更新时间 CreatedBy field.String // 创建人 UpdatedBy field.String // 更新人 + TaskSource field.String // 任务来源 fieldMap map[string]field.Expr } @@ -102,6 +104,7 @@ func (o *observabilityTask) updateTableName(table string) *observabilityTask { o.UpdatedAt = field.NewTime(table, "updated_at") o.CreatedBy = field.NewString(table, "created_by") o.UpdatedBy = field.NewString(table, "updated_by") + o.TaskSource = field.NewString(table, "task_source") o.fillFieldMap() @@ -130,7 +133,7 @@ func (o *observabilityTask) GetFieldByName(fieldName string) (field.OrderExpr, b } func (o *observabilityTask) fillFieldMap() { - o.fieldMap = make(map[string]field.Expr, 16) + o.fieldMap = make(map[string]field.Expr, 17) o.fieldMap["id"] = o.ID o.fieldMap["workspace_id"] = o.WorkspaceID o.fieldMap["name"] = o.Name @@ -147,6 +150,7 @@ func (o *observabilityTask) fillFieldMap() { o.fieldMap["updated_at"] = o.UpdatedAt o.fieldMap["created_by"] = o.CreatedBy o.fieldMap["updated_by"] = o.UpdatedBy + o.fieldMap["task_source"] = o.TaskSource } func (o observabilityTask) clone(db *gorm.DB) observabilityTask { diff --git a/backend/modules/observability/infra/repo/mysql/task.go b/backend/modules/observability/infra/repo/mysql/task.go index efa8ce8b8..837f585a2 100644 --- a/backend/modules/observability/infra/repo/mysql/task.go +++ b/backend/modules/observability/infra/repo/mysql/task.go @@ -199,6 +199,8 @@ func (v *TaskDaoImpl) buildSingleFilterExpr(q *genquery.Query, f *entity.TaskFil return v.buildTaskIDFilter(q, f) case "updated_at": return v.buildUpdateAtFilter(q, f) + case "task_source": + return v.buildTaskSourceFilter(q, f) default: return nil, errorx.NewByCode(obErrorx.CommonInvalidParamCode, errorx.WithMsgParam("invalid filter field name: %s", string(*f.FieldName))) } @@ -345,6 +347,14 @@ func (v *TaskDaoImpl) buildUpdateAtFilter(q *genquery.Query, f *entity.TaskFilte } } +func (v *TaskDaoImpl) buildTaskSourceFilter(q *genquery.Query, f *entity.TaskFilterField) (field.Expr, error) { + if len(f.Values) == 0 { + return nil, errorx.NewByCode(obErrorx.CommonInvalidParamCode, errorx.WithExtraMsg("no value provided for task source")) + } + + return q.ObservabilityTask.TaskSource.In(f.Values...), nil +} + // 计算分页参数 func calculatePagination(reqLimit, reqOffset int32) (int, int) { limit := DefaultLimit diff --git a/idl/thrift/coze/loop/observability/coze.loop.observability.task.thrift b/idl/thrift/coze/loop/observability/coze.loop.observability.task.thrift index 4c887775b..fe78ffbfa 100644 --- a/idl/thrift/coze/loop/observability/coze.loop.observability.task.thrift +++ b/idl/thrift/coze/loop/observability/coze.loop.observability.task.thrift @@ -7,6 +7,7 @@ include "./domain/common.thrift" struct CreateTaskRequest { 1: required task.Task task (api.body = "task"), + 2: optional common.Session session (api.body="session"), 255: optional base.Base base, } @@ -24,6 +25,7 @@ struct UpdateTaskRequest { 4: optional string description (api.body = "description"), 5: optional task.EffectiveTime effective_time (api.body = "effective_time"), 6: optional double sample_rate (api.body = "sample_rate"), + 7: optional common.Session session (api.body="session"), 255: optional base.Base base, } diff --git a/idl/thrift/coze/loop/observability/domain/common.thrift b/idl/thrift/coze/loop/observability/domain/common.thrift index 1ca2d3e49..13aacfa34 100644 --- a/idl/thrift/coze/loop/observability/domain/common.thrift +++ b/idl/thrift/coze/loop/observability/domain/common.thrift @@ -49,4 +49,8 @@ typedef string ContentType(ts.enum="true") const ContentType ContentType_Text = "Text" // 空间 const ContentType ContentType_Image = "Image" const ContentType ContentType_Audio = "Audio" -const ContentType ContentType_MultiPart = "MultiPart" \ No newline at end of file +const ContentType ContentType_MultiPart = "MultiPart" + +struct Session { + 1: optional string user_id +} \ No newline at end of file diff --git a/idl/thrift/coze/loop/observability/domain/task.thrift b/idl/thrift/coze/loop/observability/domain/task.thrift index ff9df13cd..67f2cb2e9 100644 --- a/idl/thrift/coze/loop/observability/domain/task.thrift +++ b/idl/thrift/coze/loop/observability/domain/task.thrift @@ -29,6 +29,10 @@ typedef string RunStatus (ts.enum="true") const RunStatus RunStatus_Running = "running" // 正在运行 const RunStatus RunStatus_Done = "done" // 完成运行 +typedef string TaskSource (ts.enum="true") +const TaskSource TaskSource_User = "user" // 用户创建 +const TaskSource TaskSource_Workflow = "workflow" // 工作流创建 + // Task struct Task { 1: optional i64 id (api.js_conv="true", go.tag='json:"id"') // 任务 id @@ -41,6 +45,7 @@ struct Task { 8: optional TaskConfig task_config // 配置 9: optional RunDetail task_detail // 任务状态详情 10: optional RunDetail backfill_task_detail // 任务历史数据执行详情 + 11: optional TaskSource task_source // 创建来源 100: optional common.BaseInfo base_info // 基础信息 } diff --git a/release/deployment/docker-compose/bootstrap/mysql-init/init-sql/task.sql b/release/deployment/docker-compose/bootstrap/mysql-init/init-sql/task.sql index 491a24414..e68741fa0 100644 --- a/release/deployment/docker-compose/bootstrap/mysql-init/init-sql/task.sql +++ b/release/deployment/docker-compose/bootstrap/mysql-init/init-sql/task.sql @@ -15,6 +15,7 @@ CREATE TABLE IF NOT EXISTS `task` ( `updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间', `created_by` varchar(128) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '创建人', `updated_by` varchar(128) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '更新人', + `task_source` VARCHAR(50) DEFAULT 'user' COMMENT '任务来源', PRIMARY KEY (`id`), KEY `idx_space_id_status` (`workspace_id`,`task_status`), KEY `idx_space_id_type` (`workspace_id`,`task_type`) diff --git a/release/deployment/docker-compose/bootstrap/rmq-init/init-subscription/subscriptions.cfg b/release/deployment/docker-compose/bootstrap/rmq-init/init-subscription/subscriptions.cfg index 42e47a0f0..d536d92e4 100644 --- a/release/deployment/docker-compose/bootstrap/rmq-init/init-subscription/subscriptions.cfg +++ b/release/deployment/docker-compose/bootstrap/rmq-init/init-subscription/subscriptions.cfg @@ -11,4 +11,5 @@ expt_export_csv_event=expt_export_csv_event_cg cozeloop_evaluation_correction_evaluator_result=cozeloop_evaluation_correction_evaluator_result_evaluation_cg cozeloop_async_tasks=cozeloop_async_tasks_backfill_cg cozeloop_evaluation_online_expt_eval_result=cozeloop_evaluation_online_expt_eval_result_cg -trace_to_task=trace_to_task_cg \ No newline at end of file +trace_to_task=trace_to_task_cg +observability_span_queue=observability_span_queue_cg \ No newline at end of file diff --git a/release/deployment/docker-compose/conf/observability.yaml b/release/deployment/docker-compose/conf/observability.yaml index 51bfd44a0..b186c0ed4 100644 --- a/release/deployment/docker-compose/conf/observability.yaml +++ b/release/deployment/docker-compose/conf/observability.yaml @@ -25,6 +25,22 @@ annotation_mq_consumer_config: consumer_group: "trace_annotation_event_cg" worker_num: 4 +span_with_annotation_mq_producer_config: + addr: + - "cozeloop-namesrv:9876" + timeout: 200 + retry_times: 3 + topic: "observability_span_queue" + producer_group: "observability_span_queue_pg" + +span_with_annotation_mq_consumer_config: + addr: + - "cozeloop-namesrv:9876" + timeout: 180000 + topic: "observability_span_queue" + consumer_group: "observability_span_queue_cg" + worker_num: 4 + trace_system_view_cfg: - id: -1 view_name: "Exceptions" diff --git a/release/deployment/helm-chart/charts/app/bootstrap/init/mysql/init-sql/task.sql b/release/deployment/helm-chart/charts/app/bootstrap/init/mysql/init-sql/task.sql index 491a24414..e68741fa0 100644 --- a/release/deployment/helm-chart/charts/app/bootstrap/init/mysql/init-sql/task.sql +++ b/release/deployment/helm-chart/charts/app/bootstrap/init/mysql/init-sql/task.sql @@ -15,6 +15,7 @@ CREATE TABLE IF NOT EXISTS `task` ( `updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间', `created_by` varchar(128) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '创建人', `updated_by` varchar(128) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '更新人', + `task_source` VARCHAR(50) DEFAULT 'user' COMMENT '任务来源', PRIMARY KEY (`id`), KEY `idx_space_id_status` (`workspace_id`,`task_status`), KEY `idx_space_id_type` (`workspace_id`,`task_type`) diff --git a/release/deployment/helm-chart/charts/app/bootstrap/init/rmq/init-subscription/subscriptions.cfg b/release/deployment/helm-chart/charts/app/bootstrap/init/rmq/init-subscription/subscriptions.cfg index 42e47a0f0..d536d92e4 100644 --- a/release/deployment/helm-chart/charts/app/bootstrap/init/rmq/init-subscription/subscriptions.cfg +++ b/release/deployment/helm-chart/charts/app/bootstrap/init/rmq/init-subscription/subscriptions.cfg @@ -11,4 +11,5 @@ expt_export_csv_event=expt_export_csv_event_cg cozeloop_evaluation_correction_evaluator_result=cozeloop_evaluation_correction_evaluator_result_evaluation_cg cozeloop_async_tasks=cozeloop_async_tasks_backfill_cg cozeloop_evaluation_online_expt_eval_result=cozeloop_evaluation_online_expt_eval_result_cg -trace_to_task=trace_to_task_cg \ No newline at end of file +trace_to_task=trace_to_task_cg +observability_span_queue=observability_span_queue_cg \ No newline at end of file diff --git a/release/deployment/helm-chart/umbrella/conf/observability.yaml b/release/deployment/helm-chart/umbrella/conf/observability.yaml index 4cb2dbd43..6ea63fb4c 100644 --- a/release/deployment/helm-chart/umbrella/conf/observability.yaml +++ b/release/deployment/helm-chart/umbrella/conf/observability.yaml @@ -25,6 +25,22 @@ annotation_mq_consumer_config: consumer_group: "trace_annotation_event_cg" worker_num: 4 +span_with_annotation_mq_producer_config: + addr: + - "cozeloop-namesrv:9876" + timeout: 200 + retry_times: 3 + topic: "observability_span_queue" + producer_group: "observability_span_queue_pg" + +span_with_annotation_mq_consumer_config: + addr: + - "cozeloop-namesrv:9876" + timeout: 180000 + topic: "observability_span_queue" + consumer_group: "observability_span_queue_cg" + worker_num: 4 + trace_system_view_cfg: - id: -1 view_name: "Exceptions"