@@ -20,6 +20,12 @@ function Agent(backend, stream) {
2020 this . stream = stream ;
2121
2222 this . clientId = hat ( ) ;
23+ // src is a client-configurable "id" which the client will set in its handshake,
24+ // and attach to its ops. This should take precedence over clientId if set.
25+ // Only legacy clients, or new clients connecting for the first time will use the
26+ // Agent-provided clientId. Ideally we'll deprecate clientId in favour of src
27+ // in the next breaking change.
28+ this . src = null ;
2329 this . connectTime = Date . now ( ) ;
2430
2531 // We need to track which documents are subscribed by the client. This is a
@@ -38,20 +44,15 @@ function Agent(backend, stream) {
3844 // active, and it is passed to each middleware call
3945 this . custom = { } ;
4046
41- // Initialize the remote client by sending it its agent Id.
42- this . send ( {
43- a : 'init' ,
44- protocol : 1 ,
45- id : this . clientId ,
46- type : types . defaultType . uri
47- } ) ;
47+ // Send the legacy message to initialize old clients with the random agent Id
48+ this . send ( this . _initMessage ( 'init' ) ) ;
4849}
4950module . exports = Agent ;
5051
5152// Close the agent with the client.
5253Agent . prototype . close = function ( err ) {
5354 if ( err ) {
54- logger . warn ( 'Agent closed due to error' , this . clientId , err . stack || err ) ;
55+ logger . warn ( 'Agent closed due to error' , this . _src ( ) , err . stack || err ) ;
5556 }
5657 if ( this . closed ) return ;
5758 // This will end the writable stream and emit 'finish'
@@ -190,7 +191,7 @@ Agent.prototype._isOwnOp = function(collection, op) {
190191 // Detect ops from this client on the same projection. Since the client sent
191192 // these in, the submit reply will be sufficient and we can silently ignore
192193 // them in the streams for subscribed documents or queries
193- return ( this . clientId === op . src ) && ( collection === ( op . i || op . c ) ) ;
194+ return ( this . _src ( ) === op . src ) && ( collection === ( op . i || op . c ) ) ;
194195} ;
195196
196197Agent . prototype . send = function ( message ) {
@@ -332,6 +333,9 @@ Agent.prototype._handleMessage = function(request, callback) {
332333 if ( errMessage ) return callback ( new ShareDBError ( ERROR_CODE . ERR_MESSAGE_BADLY_FORMED , errMessage ) ) ;
333334
334335 switch ( request . a ) {
336+ case 'hs' :
337+ if ( request . id ) this . src = request . id ;
338+ return callback ( null , this . _initMessage ( 'hs' ) ) ;
335339 case 'qf' :
336340 return this . _queryFetch ( request . id , request . c , request . q , getQueryOptions ( request ) , callback ) ;
337341 case 'qs' :
@@ -352,7 +356,13 @@ Agent.prototype._handleMessage = function(request, callback) {
352356 return this . _unsubscribe ( request . c , request . d , callback ) ;
353357 case 'op' :
354358 // Normalize the properties submitted
355- var op = createClientOp ( request , this . clientId ) ;
359+ var op = createClientOp ( request , this . _src ( ) ) ;
360+ if ( op . seq >= util . MAX_SAFE_INTEGER ) {
361+ return callback ( new ShareDBError (
362+ ERROR_CODE . ERR_CONNECTION_SEQ_INTEGER_OVERFLOW ,
363+ 'Connection seq has exceeded the max safe integer, maybe from being open for too long'
364+ ) ) ;
365+ }
356366 if ( ! op ) return callback ( new ShareDBError ( ERROR_CODE . ERR_MESSAGE_BADLY_FORMED , 'Invalid op message' ) ) ;
357367 return this . _submit ( request . c , request . d , op , callback ) ;
358368 case 'nf' :
@@ -645,6 +655,20 @@ Agent.prototype._fetchSnapshotByTimestamp = function(collection, id, timestamp,
645655 this . backend . fetchSnapshotByTimestamp ( this , collection , id , timestamp , callback ) ;
646656} ;
647657
658+ Agent . prototype . _initMessage = function ( action ) {
659+ return {
660+ a : action ,
661+ protocol : 1 ,
662+ protocolMinor : 1 ,
663+ id : this . _src ( ) ,
664+ type : types . defaultType . uri
665+ } ;
666+ } ;
667+
668+ Agent . prototype . _src = function ( ) {
669+ return this . src || this . clientId ;
670+ } ;
671+
648672
649673function createClientOp ( request , clientId ) {
650674 // src can be provided if it is not the same as the current agent,
0 commit comments