@@ -16,6 +16,7 @@ import (
1616 "os/signal"
1717 "path"
1818 "strconv"
19+ "sync"
1920 "syscall"
2021 "time"
2122)
@@ -40,7 +41,7 @@ func main() {
4041 debugLog ("Firetail extension starting in debug mode" )
4142 }
4243
43- // Get API url & API token from env vars
44+ // Get API url, API token & buffer size from env vars
4445 firetailApiToken := os .Getenv ("FIRETAIL_API_TOKEN" )
4546 if firetailApiToken == "" {
4647 log .Fatal ("FIRETAIL_API_TOKEN not set" )
@@ -50,9 +51,27 @@ func main() {
5051 firetailApiUrl = "https://api.logging.eu-west-1.sandbox.firetail.app/logs/bulk"
5152 debugLog ("FIRETAIL_API_URL not set, defaulting to %s" , firetailApiUrl )
5253 }
54+ var bufferSize int
55+ bufferSizeStr := os .Getenv ("FIRETAIL_LOG_BUFFER_SIZE" )
56+ if bufferSizeStr != "" {
57+ bufferSize , err := strconv .Atoi (bufferSizeStr )
58+ if err != nil {
59+ log .Fatalf ("FIRETAIL_LOG_BUFFER_SIZE value invalid, err: %s" , err .Error ())
60+ }
61+ if bufferSize < 1 {
62+ log .Fatalf ("FIRETAIL_LOG_BUFFER_SIZE is %d but must be >= 0" , bufferSize )
63+ }
64+ } else {
65+ bufferSize = 1000
66+ debugLog ("FIRETAIL_LOG_BUFFER_SIZE not set; defaulting to %d" , bufferSize )
67+ }
5368
54- // Create a context with which we'll perform all our actions & make a channel to receive
55- // SIGTERM and SIGINT events & spawn a goroutine to call cancel() when we get one
69+ // Create a channel down which the logsApiAgent will send events from the log API as []bytes
70+ logQueue := make (chan []byte , bufferSize )
71+
72+ // Create a context with which we'll perform all our requests to the extensions API
73+ // & make a channel to receive SIGTERM and SIGINT events & spawn a goroutine to call
74+ // cancel() when we get one
5675 ctx , cancel := context .WithCancel (context .Background ())
5776 sigs := make (chan os.Signal , 1 )
5877 signal .Notify (sigs , syscall .SIGTERM , syscall .SIGINT )
@@ -69,73 +88,86 @@ func main() {
6988 panic (err )
7089 }
7190
72- // Create a channel down which the logsApiAgent will send events from the log API as []bytes
73- logQueue := make (chan []byte )
91+ // Create a Logs API agent
92+ logsApiAgent , err := agent .NewHttpAgent (logQueue )
93+ if err != nil {
94+ log .Fatal (err .Error ())
95+ }
7496
75- // Start a receiver routine for logQueue that'll run until logQueue is closed or a logsapi.RuntimeDone event is received
97+ // Subscribe to the logs API. Logs start being delivered only after the subscription happens.
98+ agentID := extensionClient .ExtensionID
99+ err = logsApiAgent .Init (agentID )
100+ if err != nil {
101+ log .Fatal (err .Error ())
102+ }
103+
104+ // Start a receiver routine for logQueue that'll run until logQueue is closed
105+ wg := sync.WaitGroup {}
106+ wg .Add (1 )
107+ defer wg .Wait ()
76108 go func () {
109+ defer wg .Done ()
110+ recordsBatch := []firetail.Record {}
77111 for {
78- select {
79- case logBytes , open := <- logQueue :
80- if ! open {
81- debugLog ("Queue channel closed, logQueue recevier routine exiting..." )
82- return
83- }
112+ // Receive from the queue until there's nothing currently left in it
113+ logQueueClosed := false
114+ ReceiveLoop:
115+ for {
116+ select {
117+ case logBytes , open := <- logQueue :
118+ if ! open {
119+ debugLog ("Queue channel closed & empty" )
120+ logQueueClosed = true
121+ break ReceiveLoop
122+ }
84123
85- // Unmarshal the bytes into a LogMessages
86- var logMessages logsapi. LogMessages
87- err := json . Unmarshal ([] byte ( logBytes ), & logMessages )
88- if err != nil {
89- debugLog ( "Err unmarshalling logBytes into logsapi.LogMessages, err: %s" , err . Error ())
90- }
124+ var logMessages logsapi. LogMessages
125+ err := json . Unmarshal ([] byte ( logBytes ), & logMessages )
126+ if err != nil {
127+ debugLog ( "Err unmarshalling logBytes into logsapi.LogMessages, err: %s" , err . Error ())
128+ continue
129+ }
91130
92- // Extract any firetail records from the log messages
93- firetailRecords , errs := firetail .ExtractFiretailRecords (logMessages )
94- // Log any errs, but still continue as it's possible not all failed
95- if errs != nil {
96- debugLog ("Errs extracting firetail records, errs: %s" , errs .Error ())
97- }
98- // If there's no firetail records, then all failed or there were none, so there's nothing to do
99- if len (firetailRecords ) == 0 {
100- debugLog ("No firetail records extracted. Continuing..." )
101- continue
131+ newFiretailRecords , errs := firetail .ExtractFiretailRecords (logMessages )
132+ // If there are errs, proceed as normal as it's possible not all failed to extract
133+ if errs != nil {
134+ debugLog ("Errs extracting firetail records, errs: %s" , errs .Error ())
135+ }
136+ recordsBatch = append (recordsBatch , newFiretailRecords ... )
137+ debugLog ("Extracted %d record(s) from logBytes; batch is now of size %d" , len (newFiretailRecords ), len (recordsBatch ))
138+ break
139+
140+ default :
141+ // Give the other routines some to do their thang
142+ time .Sleep (time .Nanosecond )
143+ break ReceiveLoop
102144 }
145+ }
103146
104- // Send the Firetail records to Firetail SaaS
105- debugLog ("Sending %d record(s) to Firetail..." , len (firetailRecords ))
106- recordsSent , err := firetail .SendRecordsToSaaS (firetailRecords , firetailApiUrl , firetailApiToken )
147+ // If the batch isn't empty, we can attempt to send it
148+ if len (recordsBatch ) > 0 {
149+ // Debug log before & after the request, as it takes some time & it's helpful to know if execution was frozen at this time
150+ debugLog ("Sending %d record(s) to Firetail..." , len (recordsBatch ))
151+ recordsSent , err := firetail .SendRecordsToSaaS (recordsBatch , firetailApiUrl , firetailApiToken )
107152 debugLog ("Sent %d record(s) to Firetail." , recordsSent )
108153 if err != nil {
109154 debugLog ("Err sending record(s) to Firetail SaaS, err: %s" , err .Error ())
155+ continue
110156 }
157+ // If sending the batch to Firetail was a success, we can clear out the batch!
158+ debugLog ("Clearing records batch..." )
159+ recordsBatch = []firetail.Record {}
160+ }
111161
112- // Check if logMessages contains a message of type logsapi.RuntimeDone - if it does, this routine needs to exit.
113- for _ , logMessage := range logMessages {
114- if logMessage .Type == string (logsapi .RuntimeDone ) {
115- debugLog ("Found log message of type logsapi.RuntimeDone, logQueue receiver routine exiting..." )
116- return
117- }
118- }
119- default :
120- time .Sleep (time .Nanosecond )
162+ // If the logQueue is closed and the recordsBatch is now empty, then we can return
163+ if logQueueClosed && len (recordsBatch ) == 0 {
164+ debugLog ("logQueue closed & batch empty, logQueue receiver routine returning..." )
165+ return
121166 }
122167 }
123168 }()
124169
125- // Create a Logs API agent
126- logsApiAgent , err := agent .NewHttpAgent (logQueue )
127- if err != nil {
128- log .Fatal (err .Error ())
129- }
130-
131- // Subscribe to the logs API. Logs start being delivered only after the subscription happens.
132- agentID := extensionClient .ExtensionID
133- err = logsApiAgent .Init (agentID )
134- if err != nil {
135- log .Fatal (err .Error ())
136- }
137-
138- // This for loop will block until invoke or shutdown event is received or cancelled via the context
170+ // This for loop will block until an invoke or shutdown event is received, or the context is cancelled
139171 for {
140172 select {
141173 case <- ctx .Done ():
@@ -151,7 +183,9 @@ func main() {
151183
152184 // Exit if we receive a SHUTDOWN event
153185 if res .EventType == extension .Shutdown {
154- debugLog ("Received extension shutdown event, exiting..." )
186+ debugLog ("Received extension shutdown event, sleeping for 500ms to allow final logs to arrive..." )
187+ time .Sleep (500 )
188+ debugLog ("Exiting..." )
155189 logsApiAgent .Shutdown ()
156190 close (logQueue )
157191 return
0 commit comments