Skip to content

Commit 3d728bc

Browse files
committed
Handle connectivity issues while reading the body (#1343)
1 parent 2d51ef4 commit 3d728bc

File tree

8 files changed

+267
-229
lines changed

8 files changed

+267
-229
lines changed

.github/workflows/nodejs.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ jobs:
99

1010
strategy:
1111
matrix:
12-
node-version: [10.x, 12.x, 14.x]
12+
node-version: [10.x, 12.x, 14.x, 15.x]
1313
os: [ubuntu-latest, windows-latest, macOS-latest]
1414

1515
steps:

lib/Connection.js

Lines changed: 38 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ const hpagent = require('hpagent')
2525
const http = require('http')
2626
const https = require('https')
2727
const debug = require('debug')('elasticsearch')
28-
const decompressResponse = require('decompress-response')
2928
const pump = require('pump')
3029
const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/
3130
const {
@@ -83,7 +82,6 @@ class Connection {
8382

8483
request (params, callback) {
8584
this._openRequests++
86-
var ended = false
8785

8886
const requestParams = this.buildRequestObject(params)
8987
// https://github.com/nodejs/node/commit/b961d9fd83
@@ -96,53 +94,38 @@ class Connection {
9694
debug('Starting a new request', params)
9795
const request = this.makeRequest(requestParams)
9896

99-
// listen for the response event
100-
// TODO: handle redirects?
101-
request.on('response', response => {
102-
/* istanbul ignore else */
103-
if (ended === false) {
104-
ended = true
105-
this._openRequests--
97+
const onResponse = response => {
98+
cleanListeners()
99+
this._openRequests--
100+
callback(null, response)
101+
}
106102

107-
if (params.asStream === true) {
108-
callback(null, response)
109-
} else {
110-
callback(null, decompressResponse(response))
111-
}
112-
}
113-
})
114-
115-
// handles request timeout
116-
request.on('timeout', () => {
117-
/* istanbul ignore else */
118-
if (ended === false) {
119-
ended = true
120-
this._openRequests--
121-
request.abort()
122-
callback(new TimeoutError('Request timed out', params), null)
123-
}
124-
})
125-
126-
// handles request error
127-
request.on('error', err => {
128-
/* istanbul ignore else */
129-
if (ended === false) {
130-
ended = true
131-
this._openRequests--
132-
callback(new ConnectionError(err.message), null)
133-
}
134-
})
103+
const onTimeout = () => {
104+
cleanListeners()
105+
this._openRequests--
106+
request.once('error', () => {}) // we need to catch the request aborted error
107+
request.abort()
108+
callback(new TimeoutError('Request timed out', params), null)
109+
}
135110

136-
// updates the ended state
137-
request.on('abort', () => {
111+
const onError = err => {
112+
cleanListeners()
113+
this._openRequests--
114+
callback(new ConnectionError(err.message), null)
115+
}
116+
117+
const onAbort = () => {
118+
cleanListeners()
119+
request.once('error', () => {}) // we need to catch the request aborted error
138120
debug('Request aborted', params)
139-
/* istanbul ignore else */
140-
if (ended === false) {
141-
ended = true
142-
this._openRequests--
143-
callback(new RequestAbortedError(), null)
144-
}
145-
})
121+
this._openRequests--
122+
callback(new RequestAbortedError(), null)
123+
}
124+
125+
request.on('response', onResponse)
126+
request.on('timeout', onTimeout)
127+
request.on('error', onError)
128+
request.on('abort', onAbort)
146129

147130
// Disables the Nagle algorithm
148131
request.setNoDelay(true)
@@ -151,8 +134,8 @@ class Connection {
151134
if (isStream(params.body) === true) {
152135
pump(params.body, request, err => {
153136
/* istanbul ignore if */
154-
if (err != null && /* istanbul ignore next */ ended === false) {
155-
ended = true
137+
if (err != null) {
138+
cleanListeners()
156139
this._openRequests--
157140
callback(err, null)
158141
}
@@ -162,6 +145,13 @@ class Connection {
162145
}
163146

164147
return request
148+
149+
function cleanListeners () {
150+
request.removeListener('response', onResponse)
151+
request.removeListener('timeout', onTimeout)
152+
request.removeListener('error', onError)
153+
request.removeListener('abort', onAbort)
154+
}
165155
}
166156

167157
// TODO: write a better closing logic

lib/Transport.js

Lines changed: 127 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
const debug = require('debug')('elasticsearch')
2323
const os = require('os')
24-
const { gzip, createGzip } = require('zlib')
24+
const { gzip, unzip, createGzip } = require('zlib')
2525
const ms = require('ms')
2626
const {
2727
ConnectionError,
@@ -174,37 +174,40 @@ class Transport {
174174
request = meta.connection.request(params, onResponse)
175175
}
176176

177-
const onResponse = (err, response) => {
178-
if (err !== null) {
179-
if (err.name !== 'RequestAbortedError') {
180-
// if there is an error in the connection
181-
// let's mark the connection as dead
182-
this.connectionPool.markDead(meta.connection)
183-
184-
if (this.sniffOnConnectionFault === true) {
185-
this.sniff({
186-
reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT,
187-
requestId: meta.request.id
188-
})
189-
}
177+
const onConnectionError = (err) => {
178+
if (err.name !== 'RequestAbortedError') {
179+
// if there is an error in the connection
180+
// let's mark the connection as dead
181+
this.connectionPool.markDead(meta.connection)
182+
183+
if (this.sniffOnConnectionFault === true) {
184+
this.sniff({
185+
reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT,
186+
requestId: meta.request.id
187+
})
188+
}
190189

191-
// retry logic
192-
if (meta.attempts < maxRetries) {
193-
meta.attempts++
194-
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
195-
makeRequest()
196-
return
197-
}
190+
// retry logic
191+
if (meta.attempts < maxRetries) {
192+
meta.attempts++
193+
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
194+
makeRequest()
195+
return
198196
}
197+
}
199198

200-
err.meta = result
201-
this.emit('response', err, result)
202-
return callback(err, result)
199+
err.meta = result
200+
this.emit('response', err, result)
201+
return callback(err, result)
202+
}
203+
204+
const onResponse = (err, response) => {
205+
if (err !== null) {
206+
return onConnectionError(err)
203207
}
204208

205-
const { statusCode, headers } = response
206-
result.statusCode = statusCode
207-
result.headers = headers
209+
result.statusCode = response.statusCode
210+
result.headers = response.headers
208211

209212
if (options.asStream === true) {
210213
result.body = response
@@ -213,74 +216,109 @@ class Transport {
213216
return
214217
}
215218

216-
var payload = ''
217-
// collect the payload
218-
response.setEncoding('utf8')
219-
response.on('data', chunk => { payload += chunk })
220-
/* istanbul ignore next */
221-
response.on('error', err => {
222-
const error = new ConnectionError(err.message, result)
223-
this.emit('response', error, result)
224-
callback(error, result)
225-
})
226-
response.on('end', () => {
227-
const isHead = params.method === 'HEAD'
228-
// we should attempt the payload deserialization only if:
229-
// - a `content-type` is defined and is equal to `application/json`
230-
// - the request is not a HEAD request
231-
// - the payload is not an empty string
232-
if (headers['content-type'] !== undefined &&
233-
headers['content-type'].indexOf('application/json') > -1 &&
234-
isHead === false &&
235-
payload !== ''
236-
) {
237-
try {
238-
result.body = this.serializer.deserialize(payload)
239-
} catch (err) {
240-
this.emit('response', err, result)
241-
return callback(err, result)
242-
}
243-
} else {
244-
// cast to boolean if the request method was HEAD
245-
result.body = isHead === true ? true : payload
219+
const contentEncoding = (result.headers['content-encoding'] || '').toLowerCase()
220+
const isCompressed = contentEncoding.indexOf('gzip') > -1 || contentEncoding.indexOf('deflate') > -1
221+
// if the response is compressed, we must handle it
222+
// as buffer for allowing decompression later
223+
let payload = isCompressed ? [] : ''
224+
const onData = isCompressed
225+
? chunk => { payload.push(chunk) }
226+
: chunk => { payload += chunk }
227+
const onEnd = err => {
228+
response.removeListener('data', onData)
229+
response.removeListener('end', onEnd)
230+
response.removeListener('error', onEnd)
231+
response.removeListener('aborted', onAbort)
232+
233+
if (err) {
234+
return onConnectionError(new ConnectionError(err.message))
246235
}
247236

248-
// we should ignore the statusCode if the user has configured the `ignore` field with
249-
// the statusCode we just got or if the request method is HEAD and the statusCode is 404
250-
const ignoreStatusCode = (Array.isArray(options.ignore) && options.ignore.indexOf(statusCode) > -1) ||
251-
(isHead === true && statusCode === 404)
252-
253-
if (ignoreStatusCode === false &&
254-
(statusCode === 502 || statusCode === 503 || statusCode === 504)) {
255-
// if the statusCode is 502/3/4 we should run our retry strategy
256-
// and mark the connection as dead
257-
this.connectionPool.markDead(meta.connection)
258-
// retry logic (we shoukd not retry on "429 - Too Many Requests")
259-
if (meta.attempts < maxRetries && statusCode !== 429) {
260-
meta.attempts++
261-
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
262-
makeRequest()
263-
return
264-
}
237+
if (isCompressed) {
238+
unzip(Buffer.concat(payload), onBody)
265239
} else {
266-
// everything has worked as expected, let's mark
267-
// the connection as alive (or confirm it)
268-
this.connectionPool.markAlive(meta.connection)
240+
onBody(null, payload)
269241
}
242+
}
270243

271-
if (ignoreStatusCode === false && statusCode >= 400) {
272-
const error = new ResponseError(result)
273-
this.emit('response', error, result)
274-
callback(error, result)
275-
} else {
276-
// cast to boolean if the request method was HEAD
277-
if (isHead === true && statusCode === 404) {
278-
result.body = false
279-
}
280-
this.emit('response', null, result)
281-
callback(null, result)
244+
const onAbort = () => {
245+
response.destroy()
246+
onEnd(new Error('Response aborted while reading the body'))
247+
}
248+
249+
if (!isCompressed) {
250+
response.setEncoding('utf8')
251+
}
252+
response.on('data', onData)
253+
response.on('error', onEnd)
254+
response.on('end', onEnd)
255+
response.on('aborted', onAbort)
256+
}
257+
258+
const onBody = (err, payload) => {
259+
if (err) {
260+
this.emit('response', err, result)
261+
return callback(err, result)
262+
}
263+
if (Buffer.isBuffer(payload)) {
264+
payload = payload.toString()
265+
}
266+
const isHead = params.method === 'HEAD'
267+
// we should attempt the payload deserialization only if:
268+
// - a `content-type` is defined and is equal to `application/json`
269+
// - the request is not a HEAD request
270+
// - the payload is not an empty string
271+
if (result.headers['content-type'] !== undefined &&
272+
result.headers['content-type'].indexOf('application/json') > -1 &&
273+
isHead === false &&
274+
payload !== ''
275+
) {
276+
try {
277+
result.body = this.serializer.deserialize(payload)
278+
} catch (err) {
279+
this.emit('response', err, result)
280+
return callback(err, result)
282281
}
283-
})
282+
} else {
283+
// cast to boolean if the request method was HEAD
284+
result.body = isHead === true ? true : payload
285+
}
286+
287+
// we should ignore the statusCode if the user has configured the `ignore` field with
288+
// the statusCode we just got or if the request method is HEAD and the statusCode is 404
289+
const ignoreStatusCode = (Array.isArray(options.ignore) && options.ignore.indexOf(result.statusCode) > -1) ||
290+
(isHead === true && result.statusCode === 404)
291+
292+
if (ignoreStatusCode === false &&
293+
(result.statusCode === 502 || result.statusCode === 503 || result.statusCode === 504)) {
294+
// if the statusCode is 502/3/4 we should run our retry strategy
295+
// and mark the connection as dead
296+
this.connectionPool.markDead(meta.connection)
297+
// retry logic (we shoukd not retry on "429 - Too Many Requests")
298+
if (meta.attempts < maxRetries && result.statusCode !== 429) {
299+
meta.attempts++
300+
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
301+
makeRequest()
302+
return
303+
}
304+
} else {
305+
// everything has worked as expected, let's mark
306+
// the connection as alive (or confirm it)
307+
this.connectionPool.markAlive(meta.connection)
308+
}
309+
310+
if (ignoreStatusCode === false && result.statusCode >= 400) {
311+
const error = new ResponseError(result)
312+
this.emit('response', error, result)
313+
callback(error, result)
314+
} else {
315+
// cast to boolean if the request method was HEAD
316+
if (isHead === true && result.statusCode === 404) {
317+
result.body = false
318+
}
319+
this.emit('response', null, result)
320+
callback(null, result)
321+
}
284322
}
285323

286324
const headers = Object.assign({}, this.headers, lowerCaseHeaders(options.headers))

package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@
7575
},
7676
"dependencies": {
7777
"debug": "^4.1.1",
78-
"decompress-response": "^4.2.0",
7978
"hpagent": "^0.1.1",
8079
"ms": "^2.1.1",
8180
"pump": "^3.0.0",

0 commit comments

Comments
 (0)