Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
G
Grpc
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Container registry
Model registry
Operate
Environments
Monitor
Incidents
Service Desk
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
GitLab community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
tci-gateway-module
Grpc
Commits
c0b1785a
Commit
c0b1785a
authored
Jan 22, 2015
by
Tim Emiola
Browse files
Options
Downloads
Plain Diff
Merge pull request #155 from murgatroid99/node_remove_byte_streams
Node remove byte streams
parents
0a6f0a56
cdc2a738
No related branches found
No related tags found
No related merge requests found
Changes
3
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
src/node/client.js
+48
-5
48 additions, 5 deletions
src/node/client.js
src/node/server.js
+55
-10
55 additions, 10 deletions
src/node/server.js
src/node/test/interop_sanity_test.js
+1
-1
1 addition, 1 deletion
src/node/test/interop_sanity_test.js
with
104 additions
and
16 deletions
src/node/client.js
+
48
−
5
View file @
c0b1785a
...
@@ -45,10 +45,22 @@ util.inherits(GrpcClientStream, Duplex);
...
@@ -45,10 +45,22 @@ util.inherits(GrpcClientStream, Duplex);
* from stream.Duplex.
* from stream.Duplex.
* @constructor
* @constructor
* @param {grpc.Call} call Call object to proxy
* @param {grpc.Call} call Call object to proxy
* @param {object} options Stream options
* @param {function(*):Buffer=} serialize Serialization function for requests
* @param {function(Buffer):*=} deserialize Deserialization function for
* responses
*/
*/
function
GrpcClientStream
(
call
,
options
)
{
function
GrpcClientStream
(
call
,
serialize
,
deserialize
)
{
Duplex
.
call
(
this
,
options
);
Duplex
.
call
(
this
,
{
objectMode
:
true
});
if
(
!
serialize
)
{
serialize
=
function
(
value
)
{
return
value
;
};
}
if
(
!
deserialize
)
{
deserialize
=
function
(
value
)
{
return
value
;
};
}
var
self
=
this
;
var
self
=
this
;
// Indicates that we can start reading and have not received a null read
// Indicates that we can start reading and have not received a null read
var
can_read
=
false
;
var
can_read
=
false
;
...
@@ -59,6 +71,32 @@ function GrpcClientStream(call, options) {
...
@@ -59,6 +71,32 @@ function GrpcClientStream(call, options) {
// Indicates that a write is currently pending
// Indicates that a write is currently pending
var
writing
=
false
;
var
writing
=
false
;
this
.
_call
=
call
;
this
.
_call
=
call
;
/**
* Serialize a request value to a buffer. Always maps null to null. Otherwise
* uses the provided serialize function
* @param {*} value The value to serialize
* @return {Buffer} The serialized value
*/
this
.
serialize
=
function
(
value
)
{
if
(
value
===
null
||
value
===
undefined
)
{
return
null
;
}
return
serialize
(
value
);
};
/**
* Deserialize a response buffer to a value. Always maps null to null.
* Otherwise uses the provided deserialize function.
* @param {Buffer} buffer The buffer to deserialize
* @return {*} The deserialized value
*/
this
.
deserialize
=
function
(
buffer
)
{
if
(
buffer
===
null
)
{
return
null
;
}
return
deserialize
(
buffer
);
};
/**
/**
* Callback to handle receiving a READ event. Pushes the data from that event
* Callback to handle receiving a READ event. Pushes the data from that event
* onto the read queue and starts reading again if applicable.
* onto the read queue and starts reading again if applicable.
...
@@ -66,7 +104,7 @@ function GrpcClientStream(call, options) {
...
@@ -66,7 +104,7 @@ function GrpcClientStream(call, options) {
*/
*/
function
readCallback
(
event
)
{
function
readCallback
(
event
)
{
var
data
=
event
.
data
;
var
data
=
event
.
data
;
if
(
self
.
push
(
data
))
{
if
(
self
.
push
(
self
.
deserialize
(
data
))
)
{
if
(
data
==
null
)
{
if
(
data
==
null
)
{
// Disable starting to read after null read was received
// Disable starting to read after null read was received
can_read
=
false
;
can_read
=
false
;
...
@@ -102,7 +140,7 @@ function GrpcClientStream(call, options) {
...
@@ -102,7 +140,7 @@ function GrpcClientStream(call, options) {
next
.
callback
();
next
.
callback
();
writeNext
();
writeNext
();
};
};
call
.
startWrite
(
next
.
chunk
,
writeCallback
,
0
);
call
.
startWrite
(
self
.
serialize
(
next
.
chunk
)
,
writeCallback
,
0
);
}
else
{
}
else
{
writing
=
false
;
writing
=
false
;
}
}
...
@@ -171,6 +209,9 @@ GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
...
@@ -171,6 +209,9 @@ GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
* Make a request on the channel to the given method with the given arguments
* Make a request on the channel to the given method with the given arguments
* @param {grpc.Channel} channel The channel on which to make the request
* @param {grpc.Channel} channel The channel on which to make the request
* @param {string} method The method to request
* @param {string} method The method to request
* @param {function(*):Buffer} serialize Serialization function for requests
* @param {function(Buffer):*} deserialize Deserialization function for
* responses
* @param {array=} metadata Array of metadata key/value pairs to add to the call
* @param {array=} metadata Array of metadata key/value pairs to add to the call
* @param {(number|Date)=} deadline The deadline for processing this request.
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future.
* Defaults to infinite future.
...
@@ -178,6 +219,8 @@ GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
...
@@ -178,6 +219,8 @@ GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
*/
*/
function
makeRequest
(
channel
,
function
makeRequest
(
channel
,
method
,
method
,
serialize
,
deserialize
,
metadata
,
metadata
,
deadline
)
{
deadline
)
{
if
(
deadline
===
undefined
)
{
if
(
deadline
===
undefined
)
{
...
...
This diff is collapsed.
Click to expand it.
src/node/server.js
+
55
−
10
View file @
c0b1785a
...
@@ -47,10 +47,22 @@ util.inherits(GrpcServerStream, Duplex);
...
@@ -47,10 +47,22 @@ util.inherits(GrpcServerStream, Duplex);
* from stream.Duplex.
* from stream.Duplex.
* @constructor
* @constructor
* @param {grpc.Call} call Call object to proxy
* @param {grpc.Call} call Call object to proxy
* @param {object} options Stream options
* @param {function(*):Buffer=} serialize Serialization function for responses
* @param {function(Buffer):*=} deserialize Deserialization function for
* requests
*/
*/
function
GrpcServerStream
(
call
,
options
)
{
function
GrpcServerStream
(
call
,
serialize
,
deserialize
)
{
Duplex
.
call
(
this
,
options
);
Duplex
.
call
(
this
,
{
objectMode
:
true
});
if
(
!
serialize
)
{
serialize
=
function
(
value
)
{
return
value
;
};
}
if
(
!
deserialize
)
{
deserialize
=
function
(
value
)
{
return
value
;
};
}
this
.
_call
=
call
;
this
.
_call
=
call
;
// Indicate that a status has been sent
// Indicate that a status has been sent
var
finished
=
false
;
var
finished
=
false
;
...
@@ -59,6 +71,33 @@ function GrpcServerStream(call, options) {
...
@@ -59,6 +71,33 @@ function GrpcServerStream(call, options) {
'
code
'
:
grpc
.
status
.
OK
,
'
code
'
:
grpc
.
status
.
OK
,
'
details
'
:
'
OK
'
'
details
'
:
'
OK
'
};
};
/**
* Serialize a response value to a buffer. Always maps null to null. Otherwise
* uses the provided serialize function
* @param {*} value The value to serialize
* @return {Buffer} The serialized value
*/
this
.
serialize
=
function
(
value
)
{
if
(
value
===
null
||
value
===
undefined
)
{
return
null
;
}
return
serialize
(
value
);
};
/**
* Deserialize a request buffer to a value. Always maps null to null.
* Otherwise uses the provided deserialize function.
* @param {Buffer} buffer The buffer to deserialize
* @return {*} The deserialized value
*/
this
.
deserialize
=
function
(
buffer
)
{
if
(
buffer
===
null
)
{
return
null
;
}
return
deserialize
(
buffer
);
};
/**
/**
* Send the pending status
* Send the pending status
*/
*/
...
@@ -75,7 +114,6 @@ function GrpcServerStream(call, options) {
...
@@ -75,7 +114,6 @@ function GrpcServerStream(call, options) {
* @param {Error} err The error object
* @param {Error} err The error object
*/
*/
function
setStatus
(
err
)
{
function
setStatus
(
err
)
{
console
.
log
(
'
Server setting status to
'
,
err
);
var
code
=
grpc
.
status
.
INTERNAL
;
var
code
=
grpc
.
status
.
INTERNAL
;
var
details
=
'
Unknown Error
'
;
var
details
=
'
Unknown Error
'
;
...
@@ -113,7 +151,7 @@ function GrpcServerStream(call, options) {
...
@@ -113,7 +151,7 @@ function GrpcServerStream(call, options) {
return
;
return
;
}
}
var
data
=
event
.
data
;
var
data
=
event
.
data
;
if
(
self
.
push
(
data
)
&&
data
!=
null
)
{
if
(
self
.
push
(
deserialize
(
data
)
)
&&
data
!=
null
)
{
self
.
_call
.
startRead
(
readCallback
);
self
.
_call
.
startRead
(
readCallback
);
}
else
{
}
else
{
reading
=
false
;
reading
=
false
;
...
@@ -155,7 +193,7 @@ GrpcServerStream.prototype._read = function(size) {
...
@@ -155,7 +193,7 @@ GrpcServerStream.prototype._read = function(size) {
*/
*/
GrpcServerStream
.
prototype
.
_write
=
function
(
chunk
,
encoding
,
callback
)
{
GrpcServerStream
.
prototype
.
_write
=
function
(
chunk
,
encoding
,
callback
)
{
var
self
=
this
;
var
self
=
this
;
self
.
_call
.
startWrite
(
chunk
,
function
(
event
)
{
self
.
_call
.
startWrite
(
self
.
serialize
(
chunk
)
,
function
(
event
)
{
callback
();
callback
();
},
0
);
},
0
);
};
};
...
@@ -211,12 +249,13 @@ function Server(options) {
...
@@ -211,12 +249,13 @@ function Server(options) {
}
}
},
0
);
},
0
);
call
.
serverEndInitialMetadata
(
0
);
call
.
serverEndInitialMetadata
(
0
);
var
stream
=
new
GrpcServerStream
(
call
);
var
stream
=
new
GrpcServerStream
(
call
,
handler
.
serialize
,
handler
.
deserialize
);
Object
.
defineProperty
(
stream
,
'
cancelled
'
,
{
Object
.
defineProperty
(
stream
,
'
cancelled
'
,
{
get
:
function
()
{
return
cancelled
;}
get
:
function
()
{
return
cancelled
;}
});
});
try
{
try
{
handler
(
stream
,
data
.
metadata
);
handler
.
func
(
stream
,
data
.
metadata
);
}
catch
(
e
)
{
}
catch
(
e
)
{
stream
.
emit
(
'
error
'
,
e
);
stream
.
emit
(
'
error
'
,
e
);
}
}
...
@@ -237,14 +276,20 @@ function Server(options) {
...
@@ -237,14 +276,20 @@ function Server(options) {
* handle/respond to.
* handle/respond to.
* @param {function} handler Function that takes a stream of request values and
* @param {function} handler Function that takes a stream of request values and
* returns a stream of response values
* returns a stream of response values
* @param {function(*):Buffer} serialize Serialization function for responses
* @param {function(Buffer):*} deserialize Deserialization function for requests
* @return {boolean} True if the handler was set. False if a handler was already
* @return {boolean} True if the handler was set. False if a handler was already
* set for that name.
* set for that name.
*/
*/
Server
.
prototype
.
register
=
function
(
name
,
handler
)
{
Server
.
prototype
.
register
=
function
(
name
,
handler
,
serialize
,
deserialize
)
{
if
(
this
.
handlers
.
hasOwnProperty
(
name
))
{
if
(
this
.
handlers
.
hasOwnProperty
(
name
))
{
return
false
;
return
false
;
}
}
this
.
handlers
[
name
]
=
handler
;
this
.
handlers
[
name
]
=
{
func
:
handler
,
serialize
:
serialize
,
deserialize
:
deserialize
};
return
true
;
return
true
;
};
};
...
...
This diff is collapsed.
Click to expand it.
src/node/test/interop_sanity_test.js
+
1
−
1
View file @
c0b1785a
...
@@ -49,7 +49,7 @@ describe('Interop tests', function() {
...
@@ -49,7 +49,7 @@ describe('Interop tests', function() {
done
();
done
();
});
});
// This depends on not using a binary stream
// This depends on not using a binary stream
it
.
skip
(
'
should pass empty_unary
'
,
function
(
done
)
{
it
(
'
should pass empty_unary
'
,
function
(
done
)
{
interop_client
.
runTest
(
port
,
name_override
,
'
empty_unary
'
,
true
,
done
);
interop_client
.
runTest
(
port
,
name_override
,
'
empty_unary
'
,
true
,
done
);
});
});
it
(
'
should pass large_unary
'
,
function
(
done
)
{
it
(
'
should pass large_unary
'
,
function
(
done
)
{
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
sign in
to comment