Skip to content

Commit 08fb120

Browse files
committed
Merge pull request #10 from kron4eg/custom_channels
New options to define custom errors and blocking channels on *Client.
2 parents 87aa456 + da90349 commit 08fb120

4 files changed

Lines changed: 38 additions & 5 deletions

File tree

.travis.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ language: go
22

33
go:
44
- 1.4
5-
- tip
5+
- 1.5
6+
- 1.6
67

78
services:
89
- rabbitmq

backoff.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"time"
66
)
77

8-
// See: http://blog.gopheracademy.com/advent-2014/backoff/
8+
// DefaultBackoff See: http://blog.gopheracademy.com/advent-2014/backoff/
99
var DefaultBackoff Backoffer = BackoffPolicy{
1010
[]int{0, 10, 100, 200, 500, 1000, 2000, 3000, 5000},
1111
}

client.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,14 @@ func (c *Client) deletePublisher(pub *Publisher) {
7272
delete(c.publishers, pub)
7373
}
7474

75-
// Errors returns AMQP connection level errors
75+
// Errors returns AMQP connection level errors. Default buffer size is 100.
76+
// Messages will be dropped in case if receiver can't keep up
7677
func (c *Client) Errors() <-chan error {
7778
return c.errs
7879
}
7980

80-
// Blocking notifies the server's TCP flow control of the Connection
81+
// Blocking notifies the server's TCP flow control of the Connection. Default
82+
// buffer size is 10. Messages will be dropped in case if receiver can't keep up
8183
func (c *Client) Blocking() <-chan amqp.Blocking {
8284
return c.blocking
8385
}
@@ -231,9 +233,29 @@ func URL(addr string) ClientOpt {
231233
}
232234
}
233235

234-
// Backoff is a functional option, used to define backoff policy
236+
// Backoff is a functional option, used to define backoff policy, used in
237+
// `NewClient` constructor
235238
func Backoff(bo Backoffer) ClientOpt {
236239
return func(c *Client) {
237240
c.bo = bo
238241
}
239242
}
243+
244+
// ErrorsChan is a functional option, used to initialize error reporting channel
245+
// in client code, maintaining control over buffer size. Default buffer size is
246+
// 100. Messages will be dropped in case if receiver can't keep up, used in
247+
// `NewClient` constructor
248+
func ErrorsChan(errChan chan error) ClientOpt {
249+
return func(c *Client) {
250+
c.errs = errChan
251+
}
252+
}
253+
254+
// BlockingChan is a functional option, used to initialize blocking reporting
255+
// channel in client code, maintaining control over buffering, used in
256+
// `NewClient` constructor
257+
func BlockingChan(blockingChan chan amqp.Blocking) ClientOpt {
258+
return func(c *Client) {
259+
c.blocking = blockingChan
260+
}
261+
}

doc_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,16 @@ func ExampleURL() {
8585
cony.NewClient(cony.URL("amqp://guest:guest@localhost/"))
8686
}
8787

88+
func ExampleErrorsChan() {
89+
errors := make(chan error, 100) // define custom buffer size
90+
cony.NewClient(cony.ErrorsChan(errors))
91+
}
92+
93+
func ExampleBlockingChan() {
94+
blockings := make(chan amqp.Blocking, 100) // define custom buffer size
95+
cony.NewClient(cony.BlockingChan(blockings))
96+
}
97+
8898
func ExampleClient_Loop() {
8999
client := cony.NewClient(cony.URL("amqp://guest:guest@localhost/"))
90100

0 commit comments

Comments
 (0)