@@ -7,152 +7,114 @@ import { MACHINE_METADATA } from "./constants.js";
77import  {  EventCache  }  from  "./eventCache.js" ; 
88import  nodeMachineId  from  "node-machine-id" ; 
99import  {  getDeviceId  }  from  "@mongodb-js/device-id" ; 
10- import  fs  from  "fs/promises" ; 
11- 
12- async  function  fileExists ( filePath : string ) : Promise < boolean >  { 
13-     try  { 
14-         await  fs . access ( filePath ,  fs . constants . F_OK ) ; 
15-         return  true ;  // File exists 
16-     }  catch  ( e : unknown )  { 
17-         if  ( 
18-             e  instanceof  Error  && 
19-             ( 
20-                 e  as  Error  &  { 
21-                     code : string ; 
22-                 } 
23-             ) . code  ===  "ENOENT" 
24-         )  { 
25-             return  false ;  // File does not exist 
26-         } 
27-         throw  e ;  // Re-throw unexpected errors 
28-     } 
29- } 
3010
31- async  function  isContainerized ( ) : Promise < boolean >  { 
32-     if  ( process . env . container )  { 
33-         return  true ; 
34-     } 
35- 
36-     const  exists  =  await  Promise . all ( [ "/.dockerenv" ,  "/run/.containerenv" ,  "/var/run/.containerenv" ] . map ( fileExists ) ) ; 
11+ type  EventResult  =  { 
12+     success : boolean ; 
13+     error ?: Error ; 
14+ } ; 
3715
38-     return  exists . includes ( true ) ; 
39- } 
16+ export  const  DEVICE_ID_TIMEOUT  =  3000 ; 
4017
4118export  class  Telemetry  { 
19+     private  isBufferingEvents : boolean  =  true ; 
20+     /** Resolves when the device ID is retrieved or timeout occurs */ 
21+     public  deviceIdPromise : Promise < string >  |  undefined ; 
4222    private  deviceIdAbortController  =  new  AbortController ( ) ; 
4323    private  eventCache : EventCache ; 
4424    private  getRawMachineId : ( )  =>  Promise < string > ; 
45-     private  getContainerEnv : ( )  =>  Promise < boolean > ; 
46-     private  cachedCommonProperties ?: CommonProperties ; 
47-     private  flushing : boolean  =  false ; 
4825
4926    private  constructor ( 
5027        private  readonly  session : Session , 
5128        private  readonly  userConfig : UserConfig , 
52-         { 
53-             eventCache, 
54-             getRawMachineId, 
55-             getContainerEnv, 
56-         } : { 
57-             eventCache : EventCache ; 
58-             getRawMachineId : ( )  =>  Promise < string > ; 
59-             getContainerEnv : ( )  =>  Promise < boolean > ; 
60-         } 
29+         private  readonly  commonProperties : CommonProperties , 
30+         {  eventCache,  getRawMachineId } : {  eventCache : EventCache ;  getRawMachineId : ( )  =>  Promise < string >  } 
6131    )  { 
6232        this . eventCache  =  eventCache ; 
6333        this . getRawMachineId  =  getRawMachineId ; 
64-         this . getContainerEnv  =  getContainerEnv ; 
6534    } 
6635
6736    static  create ( 
6837        session : Session , 
6938        userConfig : UserConfig , 
7039        { 
40+             commonProperties =  {  ...MACHINE_METADATA  } , 
7141            eventCache =  EventCache . getInstance ( ) , 
7242            getRawMachineId =  ( )  =>  nodeMachineId . machineId ( true ) , 
73-             getContainerEnv =  isContainerized , 
7443        } : { 
7544            eventCache ?: EventCache ; 
7645            getRawMachineId ?: ( )  =>  Promise < string > ; 
77-             getContainerEnv ?: ( )   =>   Promise < boolean > ; 
46+             commonProperties ?: CommonProperties ; 
7847        }  =  { } 
7948    ) : Telemetry  { 
80-         const  instance  =  new  Telemetry ( session ,  userConfig ,  { 
81-             eventCache, 
82-             getRawMachineId, 
83-             getContainerEnv, 
84-         } ) ; 
49+         const  instance  =  new  Telemetry ( session ,  userConfig ,  commonProperties ,  {  eventCache,  getRawMachineId } ) ; 
8550
51+         void  instance . start ( ) ; 
8652        return  instance ; 
8753    } 
8854
55+     private  async  start ( ) : Promise < void >  { 
56+         if  ( ! this . isTelemetryEnabled ( ) )  { 
57+             return ; 
58+         } 
59+         this . deviceIdPromise  =  getDeviceId ( { 
60+             getMachineId : ( )  =>  this . getRawMachineId ( ) , 
61+             onError : ( reason ,  error )  =>  { 
62+                 switch  ( reason )  { 
63+                     case  "resolutionError" :
64+                         logger . debug ( LogId . telemetryDeviceIdFailure ,  "telemetry" ,  String ( error ) ) ; 
65+                         break ; 
66+                     case  "timeout" :
67+                         logger . debug ( LogId . telemetryDeviceIdTimeout ,  "telemetry" ,  "Device ID retrieval timed out" ) ; 
68+                         break ; 
69+                     case  "abort" :
70+                         // No need to log in the case of aborts 
71+                         break ; 
72+                 } 
73+             } , 
74+             abortSignal : this . deviceIdAbortController . signal , 
75+         } ) ; 
76+ 
77+         this . commonProperties . device_id  =  await  this . deviceIdPromise ; 
78+ 
79+         this . isBufferingEvents  =  false ; 
80+     } 
81+ 
8982    public  async  close ( ) : Promise < void >  { 
9083        this . deviceIdAbortController . abort ( ) ; 
91-         await  this . flush ( ) ; 
84+         this . isBufferingEvents  =  false ; 
85+         await  this . emitEvents ( this . eventCache . getEvents ( ) ) ; 
9286    } 
9387
9488    /** 
9589     * Emits events through the telemetry pipeline 
9690     * @param  events - The events to emit 
9791     */ 
98-     public  emitEvents ( events : BaseEvent [ ] ) : void { 
99-         void  this . flush ( events ) ; 
92+     public  async  emitEvents ( events : BaseEvent [ ] ) : Promise < void >  { 
93+         try  { 
94+             if  ( ! this . isTelemetryEnabled ( ) )  { 
95+                 logger . info ( LogId . telemetryEmitFailure ,  "telemetry" ,  `Telemetry is disabled.` ) ; 
96+                 return ; 
97+             } 
98+ 
99+             await  this . emit ( events ) ; 
100+         }  catch  { 
101+             logger . debug ( LogId . telemetryEmitFailure ,  "telemetry" ,  `Error emitting telemetry events.` ) ; 
102+         } 
100103    } 
101104
102105    /** 
103106     * Gets the common properties for events 
104107     * @returns  Object containing common properties for all events 
105108     */ 
106-     private  async  getCommonProperties ( ) : Promise < CommonProperties >  { 
107-         if  ( ! this . cachedCommonProperties )  { 
108-             let  deviceId : string  |  undefined ; 
109-             let  containerEnv : boolean  |  undefined ; 
110-             try  { 
111-                 await  Promise . all ( [ 
112-                     getDeviceId ( { 
113-                         getMachineId : ( )  =>  this . getRawMachineId ( ) , 
114-                         onError : ( reason ,  error )  =>  { 
115-                             switch  ( reason )  { 
116-                                 case  "resolutionError" :
117-                                     logger . debug ( LogId . telemetryDeviceIdFailure ,  "telemetry" ,  String ( error ) ) ; 
118-                                     break ; 
119-                                 case  "timeout" :
120-                                     logger . debug ( 
121-                                         LogId . telemetryDeviceIdTimeout , 
122-                                         "telemetry" , 
123-                                         "Device ID retrieval timed out" 
124-                                     ) ; 
125-                                     break ; 
126-                                 case  "abort" :
127-                                     // No need to log in the case of aborts 
128-                                     break ; 
129-                             } 
130-                         } , 
131-                         abortSignal : this . deviceIdAbortController . signal , 
132-                     } ) . then ( ( id )  =>  { 
133-                         deviceId  =  id ; 
134-                     } ) , 
135-                     this . getContainerEnv ( ) . then ( ( env )  =>  { 
136-                         containerEnv  =  env ; 
137-                     } ) , 
138-                 ] ) ; 
139-             }  catch  ( error : unknown )  { 
140-                 const  err  =  error  instanceof  Error  ? error  : new  Error ( String ( error ) ) ; 
141-                 logger . debug ( LogId . telemetryDeviceIdFailure ,  "telemetry" ,  err . message ) ; 
142-             } 
143-             this . cachedCommonProperties  =  { 
144-                 ...MACHINE_METADATA , 
145-                 mcp_client_version : this . session . agentRunner ?. version , 
146-                 mcp_client_name : this . session . agentRunner ?. name , 
147-                 session_id : this . session . sessionId , 
148-                 config_atlas_auth : this . session . apiClient . hasCredentials ( )  ? "true"  : "false" , 
149-                 config_connection_string : this . userConfig . connectionString  ? "true"  : "false" , 
150-                 is_container_env : containerEnv  ? "true"  : "false" , 
151-                 device_id : deviceId , 
152-             } ; 
153-         } 
154- 
155-         return  this . cachedCommonProperties ; 
109+     public  getCommonProperties ( ) : CommonProperties  { 
110+         return  { 
111+             ...this . commonProperties , 
112+             mcp_client_version : this . session . agentRunner ?. version , 
113+             mcp_client_name : this . session . agentRunner ?. name , 
114+             session_id : this . session . sessionId , 
115+             config_atlas_auth : this . session . apiClient . hasCredentials ( )  ? "true"  : "false" , 
116+             config_connection_string : this . userConfig . connectionString  ? "true"  : "false" , 
117+         } ; 
156118    } 
157119
158120    /** 
@@ -173,74 +135,60 @@ export class Telemetry {
173135    } 
174136
175137    /** 
176-      * Attempts to flush  events through authenticated and unauthenticated clients 
138+      * Attempts to emit  events through authenticated and unauthenticated clients 
177139     * Falls back to caching if both attempts fail 
178140     */ 
179-     public  async  flush ( events ?: BaseEvent [ ] ) : Promise < void >  { 
180-         if  ( ! this . isTelemetryEnabled ( ) )  { 
181-             logger . info ( LogId . telemetryEmitFailure ,  "telemetry" ,  `Telemetry is disabled.` ) ; 
182-             return ; 
183-         } 
184- 
185-         if  ( this . flushing )  { 
186-             this . eventCache . appendEvents ( events  ??  [ ] ) ; 
187-             process . nextTick ( async  ( )  =>  { 
188-                 // try again if in the middle of a flush 
189-                 await  this . flush ( ) ; 
190-             } ) ; 
141+     private  async  emit ( events : BaseEvent [ ] ) : Promise < void >  { 
142+         if  ( this . isBufferingEvents )  { 
143+             this . eventCache . appendEvents ( events ) ; 
191144            return ; 
192145        } 
193146
194-         this . flushing  =  true ; 
147+         const  cachedEvents  =  this . eventCache . getEvents ( ) ; 
148+         const  allEvents  =  [ ...cachedEvents ,  ...events ] ; 
195149
196-         try  { 
197-             const  cachedEvents  =  this . eventCache . getEvents ( ) ; 
198-             const  allEvents  =  [ ...cachedEvents ,  ...( events  ??  [ ] ) ] ; 
199-             if  ( allEvents . length  <=  0 )  { 
200-                 this . flushing  =  false ; 
201-                 return ; 
202-             } 
203- 
204-             logger . debug ( 
205-                 LogId . telemetryEmitStart , 
206-                 "telemetry" , 
207-                 `Attempting to send ${ allEvents . length } ${ cachedEvents . length }  
208-             ) ; 
150+         logger . debug ( 
151+             LogId . telemetryEmitStart , 
152+             "telemetry" , 
153+             `Attempting to send ${ allEvents . length } ${ cachedEvents . length }  
154+         ) ; 
209155
210-             await  this . sendEvents ( this . session . apiClient ,  allEvents ) ; 
156+         const  result  =  await  this . sendEvents ( this . session . apiClient ,  allEvents ) ; 
157+         if  ( result . success )  { 
211158            this . eventCache . clearEvents ( ) ; 
212159            logger . debug ( 
213160                LogId . telemetryEmitSuccess , 
214161                "telemetry" , 
215162                `Sent ${ allEvents . length } ${ JSON . stringify ( allEvents ,  null ,  2 ) }  
216163            ) ; 
217-         }  catch  ( error : unknown )  { 
218-             logger . debug ( 
219-                 LogId . telemetryEmitFailure , 
220-                 "telemetry" , 
221-                 `Error sending event to client: ${ error  instanceof  Error  ? error . message  : String ( error ) }  
222-             ) ; 
223-             this . eventCache . appendEvents ( events  ??  [ ] ) ; 
224-             process . nextTick ( async  ( )  =>  { 
225-                 // try again 
226-                 await  this . flush ( ) ; 
227-             } ) ; 
164+             return ; 
228165        } 
229166
230-         this . flushing  =  false ; 
167+         logger . debug ( 
168+             LogId . telemetryEmitFailure , 
169+             "telemetry" , 
170+             `Error sending event to client: ${ result . error  instanceof  Error  ? result . error . message  : String ( result . error ) }  
171+         ) ; 
172+         this . eventCache . appendEvents ( events ) ; 
231173    } 
232174
233175    /** 
234176     * Attempts to send events through the provided API client 
235177     */ 
236-     private  async  sendEvents ( client : ApiClient ,  events : BaseEvent [ ] ) : Promise < void >  { 
237-         const  commonProperties  =  await  this . getCommonProperties ( ) ; 
238- 
239-         await  client . sendEvents ( 
240-             events . map ( ( event )  =>  ( { 
241-                 ...event , 
242-                 properties : {  ...commonProperties ,  ...event . properties  } , 
243-             } ) ) 
244-         ) ; 
178+     private  async  sendEvents ( client : ApiClient ,  events : BaseEvent [ ] ) : Promise < EventResult >  { 
179+         try  { 
180+             await  client . sendEvents ( 
181+                 events . map ( ( event )  =>  ( { 
182+                     ...event , 
183+                     properties : {  ...this . getCommonProperties ( ) ,  ...event . properties  } , 
184+                 } ) ) 
185+             ) ; 
186+             return  {  success : true  } ; 
187+         }  catch  ( error )  { 
188+             return  { 
189+                 success : false , 
190+                 error : error  instanceof  Error  ? error  : new  Error ( String ( error ) ) , 
191+             } ; 
192+         } 
245193    } 
246194} 
0 commit comments