Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions lib/tripod/streaming.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,37 @@
module Tripod
module Streaming

def self.create_http_client(uri, opts)
client = Net::HTTP.new(uri.host, uri.port)
client.use_ssl = uri.scheme == 'https'
client.read_timeout = opts[:timeout_seconds] || 10
client
end

def self.create_request(uri, opts)
headers = opts[:extra_headers] || {}
a = opts[:accept] || headers['Accept'] || '*/*'
headers['Accept'] = a

req = Net::HTTP::Post.new(uri.request_uri, headers)

if uri.user
req.basic_auth(uri.user, uri.password)
end
req
end

# stream data from a url
# opts
#  :accept => "*/*"
# :timeout_seconds = 10
# :response_limit_bytes = nil
def self.get_data(request_url, payload, opts={})

accept = opts[:accept]
timeout_in_seconds = opts[:timeout_seconds] || 10
limit_in_bytes = opts[:response_limit_bytes]

# set request headers
headers = opts[:extra_headers] || {}

# if explicit accept option is given, set it in the headers (and overwrite any existing value in the extra_headers map)
# if none is given accept */*
headers['Accept'] = accept || headers['Accept'] || '*/*'

uri = URI(request_url)

http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = true if uri.port.to_s == "443"
http.read_timeout = timeout_in_seconds
http = self.create_http_client(uri, opts)
post_request = self.create_request(uri, opts)

total_bytes = 0

Expand All @@ -35,7 +43,7 @@ def self.get_data(request_url, payload, opts={})
response = StringIO.new

begin
http.request_post(uri.request_uri, payload, headers) do |res|
http.request(post_request, payload) do |res|

response_duration = Time.now - request_start_time if Tripod.logger.debug?

Expand Down
6 changes: 5 additions & 1 deletion spec/tripod/memcached_cache_store_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,15 @@
@number_of_memcache_get_calls = Tripod.cache_store.stats[0]["localhost:11211"]["cmd_get"]
@number_of_memcache_set_calls = Tripod.cache_store.stats[0]["localhost:11211"]["cmd_set"]

100.times do
threads = (1 .. 100).map do
Thread.new do
Tripod::SparqlClient::Query.query(query, accept_header)
end
end

threads.each do |t|
t.join
end
end

it "should increase number of memcache get calls" do
Expand Down
4 changes: 3 additions & 1 deletion spec/tripod/sparql_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ module Tripod::SparqlClient
@query_endpoint = Tripod.query_endpoint
Tripod.query_endpoint = "http://localhost:#{port}/sparql/query"
@server_thread = Thread.new do
Timeout::timeout(5) do
Timeout::timeout(20) do
listener = TCPServer.new port
client = listener.accept

Expand All @@ -45,6 +45,8 @@ module Tripod::SparqlClient
client.puts
end
end

sleep 1
end

after do
Expand Down
100 changes: 100 additions & 0 deletions spec/tripod/streaming_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,106 @@
end
end

describe '.create_http_client' do
context 'http URI with no options' do
let(:client) { Tripod::Streaming.create_http_client(URI('http://localhost:8080/sparql/query'), {}) }

it 'should set host' do
expect(client.address).to eq('localhost')
end

it 'should set port' do
expect(client.port).to eq(8080)
end

it 'should set default read timeout' do
expect(client.read_timeout).to eq(10)
end

it 'should not use ssl' do
expect(client.use_ssl?).to eq(false)
end
end

context 'https URI with default port' do
let(:client) { Tripod::Streaming.create_http_client(URI('https://localhost/sparql/query'), {}) }
it 'should use ssl' do
expect(client.use_ssl?).to eq(true)
end
end

context 'https with non-default port' do
let(:client) { Tripod::Streaming.create_http_client(URI('https://localhost:4433/sparql/query'), {}) }
it 'should use ssl' do
expect(client.use_ssl?).to eq(true)
end

it 'should use specified port' do
expect(client.port).to eq(4433)
end
end

context 'with read timeout option' do
let(:read_timeout) { 5 }
let(:client) { Tripod::Streaming.create_http_client(URI('http://localhost/sparql/query'), {:timeout_seconds => read_timeout}) }

it 'should set read timeout' do
expect(client.read_timeout).to eq(read_timeout)
end
end
end

describe '.create_request' do
context 'with no user or extra headers' do
let(:req) { Tripod::Streaming.create_request(URI('http://localhost/sparql/query'), {}) }
it 'should set accept header' do
expect(req['accept']).to eq('*/*')
end
end

context 'with no user and extra headers' do
let!(:extra_headers) { {'content-type' => 'application/sparql-query', 'connection' => 'close'} }
let!(:req) { Tripod::Streaming.create_request(URI('http://localhost/sparql/query'), {:extra_headers => extra_headers}) }

it 'should set headers' do
extra_headers.each do |name, value|
expect(req[name]).to eq(value)
end
end
end

context 'with no user and extra headers and accept options' do
let(:extra_headers) { { 'accept' => 'text/trig' } }
let(:accept) { 'application/n-triples' }
let(:req) { Tripod::Streaming.create_request(URI('http://localhost/sparql/query'), {:extra_headers => extra_headers, :accept => accept}) }

it 'should set accept option as accept header' do
expect(req['accept']).to eq(accept)
end
end

context 'with user and no headers' do
let(:req) { Tripod::Streaming.create_request(URI('http://user:password@localhost/sparql/query'), {}) }
it 'should use basic auth' do
expect(req['authorization']).to match(/Basic [a-zA-Z0-9+\/=]+/)
end
end

context 'with user and extra Authorization header' do
let(:extra_headers) { {'content-type' => 'application/sparql-query', 'authorization' => 'Token abcdef'} }
let(:uri) { URI('http://user:password@localhost/sparql/query') }
let(:req) { Tripod::Streaming.create_request(uri, {:extra_headers => extra_headers}) }

it 'should use basic auth' do
expect(req['authorization']).to match(/Basic [a-zA-Z0-9+\/=]+/)
end

it 'should set other extra header' do
expect(req['content-type']).to eq('application/sparql-query')
end
end
end

# these tests actually download remote resources (from jQuery's CDN) to test the streaming bits
# TODO: move this out so it doesn't run with the normal rake task??
context "streaming" do
Expand Down
4 changes: 2 additions & 2 deletions travis/bootstrap_fuseki.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ mkdir -p /opt/fuseki
mkdir /opt/fuseki/config
sudo apt-get update
sudo apt-get -y install tar wget
wget http://apache.mirror.anlx.net/jena/binaries/jena-fuseki1-1.5.0-distribution.tar.gz -O /opt/fuseki/jena-fuseki-1.5.0.tar.gz
wget http://archive.apache.org/dist/jena/binaries/jena-fuseki1-1.5.0-distribution.tar.gz -O /opt/fuseki/jena-fuseki-1.5.0.tar.gz
tar -xvzf /opt/fuseki/jena-fuseki-1.5.0.tar.gz -C /opt/fuseki
mv ./travis/config_tripod.ttl /opt/fuseki/config/config.ttl
mv ./travis/config_tripod.ttl /opt/fuseki/config/config.ttl