-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.go
More file actions
309 lines (270 loc) · 7.27 KB
/
main.go
File metadata and controls
309 lines (270 loc) · 7.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
/*
amqp2gelf
A simple tool to read from an AMQP queue and forwards as GELF/UPD packet.
For JSON messages it tries to not change any fields.
Other messages are put inside the GELF "message" field.
2014, DECK36 GmbH & Co. KG, <martin.schuette@deck36.de>
*/
package main
import (
"bytes"
"flag"
"fmt"
"github.com/DECK36/go-gelf/gelf"
"github.com/streadway/amqp"
"log"
"os"
"os/signal"
"syscall"
"encoding/json"
"strconv"
"time"
)
const thisVersion = "0.3"
const thisProgram = "amqp2gelf"
// all command line options
type CommandLineOptions struct {
uri *string
queueName *string
gelf_server *string
gelf_port *int
verbose *bool
}
var options CommandLineOptions
func init() {
// this does not look right...
// I am looking for a pattern how to group command line arguments in a struct
options = CommandLineOptions{
flag.String("uri", "amqp://user:password@broker.example.com:5672/vhost", "AMQP URI"),
flag.String("queue", "logging_queue", "Durable AMQP queue name"),
flag.String("server", "localhost", "Graylog2 server"),
flag.Int("port", 12201, "Graylog2 GELF/UDP port"),
flag.Bool("v", false, "Verbose output"),
}
flag.Parse()
}
type Consumer struct {
conn *amqp.Connection
channel *amqp.Channel
tag string
done chan error
}
func amqpConsumer(amqpURI string, amqpQueue string, shutdown chan<- string) (c *Consumer, err error) {
amqpConfig := amqp.Config{
Properties: amqp.Table{
"product": thisProgram,
"version": thisVersion,
},
}
c = &Consumer{
conn: nil,
channel: nil,
tag: fmt.Sprintf("amqp2gelf-%d", os.Getpid()),
done: make(chan error),
}
// this is the important part:
if *options.verbose {
log.Println("connecting to ", amqpURI, "...")
}
c.conn, err = amqp.DialConfig(amqpURI, amqpConfig)
if err != nil {
return nil, fmt.Errorf("AMQP Dial: %s", err)
} else if *options.verbose {
log.Println("got connection")
}
c.channel, err = c.conn.Channel()
if err != nil {
return nil, fmt.Errorf("AMQP Channel: %s", err)
} else if *options.verbose {
log.Println("got channel")
}
// here we only ensure the AMQP queue exists
q, err := c.channel.QueueDeclare(
amqpQueue, // name
true, // durable
false, // auto-deleted
false, // exclusive
false, // noWait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Queue Declare: %v", err)
}
// init the consumer
deliveries, err := c.channel.Consume(
q.Name, // name
c.tag, // consumerTag,
false, // noAck
false, // exclusive
false, // noLocal
false, // noWait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Queue Consume: %s", err)
} else if *options.verbose {
log.Println("got consumer")
}
go writeLogsToGelf(deliveries, c.done)
go func() {
notification := c.channel.NotifyClose(make(chan *amqp.Error))
n := <-notification
c.done <- fmt.Errorf("AMQP server closed connection: %v", n)
}()
return
}
// convert JSON (possibly already GELF) input to GELF
func buildGelfMessage_json(message []byte) (gm gelf.Message, err error) {
// list of "reserved" field names
// cf. https://github.com/Graylog2/graylog2-server/blob/0.20/graylog2-plugin-interfaces/src/main/java/org/graylog2/plugin/Message.java#L61 and #L81
// Go does not allow const maps :-/
gelfReservedField := map[string]bool{
"_id": true,
"_ttl": true,
"_source": true,
"_all": true,
"_index": true,
"_type": true,
"_score": true,
}
var emptyinterface interface{}
err = json.Unmarshal(message, &emptyinterface)
if err != nil {
if *options.verbose {
log.Printf("Cannot parse JSON, err: %v, msg: '%s'", err, message)
}
return
}
jm := emptyinterface.(map[string]interface{})
// rename reserved field names (with and w/o '_')
// note: we do not double check if 'renamed_xyz' is already present
for k, v := range jm {
if gelfReservedField[k] {
jm["renamed"+k] = v
delete(jm, k)
} else if gelfReservedField["_"+k] {
jm["renamed_"+k] = v
delete(jm, k)
}
}
// ensure some required fields are set, use defaults if missing
var gelf_hostname string = "unknown_amqp"
if _, ok := jm["host"]; ok {
gelf_hostname = jm["host"].(string)
}
var gelf_shortmsg string = ""
if _, ok := jm["short_message"]; ok {
gelf_shortmsg = jm["short_message"].(string)
}
var gelf_timestamp float64 = 0.0
if _, ok := jm["timestamp"]; ok {
switch tsval := jm["timestamp"].(type) {
case float64:
gelf_timestamp = tsval
case string:
gelf_timestamp, _ = strconv.ParseFloat(tsval, 64)
}
}
var gelf_level int32 = 6 // info
if _, ok := jm["level"]; ok {
gelf_level = jm["level"].(int32)
}
var gelf_version string = "1.1"
if _, ok := jm["version"]; ok {
gelf_version = jm["version"].(string)
}
gm = gelf.Message{
Version: gelf_version,
Host: gelf_hostname,
Short: gelf_shortmsg,
TimeUnix: gelf_timestamp,
Level: gelf_level,
Extra: jm,
}
return gm, nil
}
// package text input in GELF
func buildGelfMessage_text(message []byte) (gm gelf.Message, err error) {
gm = gelf.Message{
Version: "1.1",
Host: "unknown_amqp",
Short: string(message),
TimeUnix: 0.0,
Level: 6, // info
Extra: map[string]interface{}{},
}
return gm, nil
}
func buildGelfMessage(message []byte, ctype string) (gm gelf.Message, err error) {
message = bytes.TrimSpace(message)
if (ctype == "application/json" || ctype == "text/json") &&
message[0] == '{' && message[len(message)-1] == '}' {
gm, err = buildGelfMessage_json(message)
} else {
gm, err = buildGelfMessage_text(message)
}
return
}
// handle AMQP delivery
func writeLogsToGelf(deliveries <-chan amqp.Delivery, done chan error) {
graylogAddr := fmt.Sprintf("%s:%d", *options.gelf_server, *options.gelf_port)
gelfWriter, err := gelf.NewWriter(graylogAddr)
if err != nil {
done <- fmt.Errorf("Cannot create gelf writer: %v", err)
return
}
for d := range deliveries {
gm, err := buildGelfMessage(d.Body, d.ContentType)
if err != nil {
d.Reject(false) // do not requeue
if *options.verbose {
log.Printf("Rejected msg: %#v\n", d.Body)
}
continue
}
if *options.verbose {
log.Printf("sent msg: %f %s\n", gm.TimeUnix, gm.Short)
}
err = gelfWriter.WriteMessage(&gm)
if err != nil {
done <- fmt.Errorf("Cannot send gelf msg: %v", err)
d.Reject(false) // do not requeue
continue
}
d.Ack(false) // don't ack multiple
}
done <- fmt.Errorf("done")
return
}
// let the OS tell us to shutdown
func osSignalHandler(shutdown chan<- string) {
var sigs = make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigs // this is the blocking part
go func(){
time.Sleep(2*time.Second)
log.Fatalf("shutdown was ignored, bailing out now.\n")
}()
shutdown <- fmt.Sprintf("received signal %v", sig)
}
func main() {
if *options.verbose {
log.Printf("Start %s %s", thisProgram, thisVersion)
}
// let goroutines tell us to shutdown (on error)
var shutdown = make(chan string)
// let the OS tell us to shutdown
go osSignalHandler(shutdown)
// start input
c, err := amqpConsumer(*options.uri, *options.queueName, shutdown)
if err != nil {
// cannot use shutdown channel, no listener yet
log.Fatalln("Fatal Error: ", err.Error())
}
go func() {
err = <-c.done
shutdown <- fmt.Sprintln(err)
}()
message := <-shutdown
log.Println("The End.", message)
}