From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- .../thrift/lib/rb/spec/nonblocking_server_spec.rb | 263 +++++++++++++++++++++ 1 file changed, 263 insertions(+) create mode 100644 src/jaegertracing/thrift/lib/rb/spec/nonblocking_server_spec.rb (limited to 'src/jaegertracing/thrift/lib/rb/spec/nonblocking_server_spec.rb') diff --git a/src/jaegertracing/thrift/lib/rb/spec/nonblocking_server_spec.rb b/src/jaegertracing/thrift/lib/rb/spec/nonblocking_server_spec.rb new file mode 100644 index 000000000..613d88390 --- /dev/null +++ b/src/jaegertracing/thrift/lib/rb/spec/nonblocking_server_spec.rb @@ -0,0 +1,263 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'spec_helper' + +describe 'NonblockingServer' do + + class Handler + def initialize + @queue = Queue.new + end + + attr_accessor :server + + def greeting(english) + if english + SpecNamespace::Hello.new + else + SpecNamespace::Hello.new(:greeting => "Aloha!") + end + end + + def block + @queue.pop + end + + def unblock(n) + n.times { @queue.push true } + end + + def sleep(time) + Kernel.sleep time + end + + def shutdown + @server.shutdown(0, false) + end + end + + class SpecTransport < Thrift::BaseTransport + def initialize(transport, queue) + @transport = transport + @queue = queue + @flushed = false + end + + def open? + @transport.open? + end + + def open + @transport.open + end + + def close + @transport.close + end + + def read(sz) + @transport.read(sz) + end + + def write(buf,sz=nil) + @transport.write(buf, sz) + end + + def flush + @queue.push :flushed unless @flushed or @queue.nil? + @flushed = true + @transport.flush + end + end + + class SpecServerSocket < Thrift::ServerSocket + def initialize(host, port, queue) + super(host, port) + @queue = queue + end + + def listen + super + @queue.push :listen + end + end + + describe Thrift::NonblockingServer do + before(:each) do + @port = 43251 + handler = Handler.new + processor = SpecNamespace::NonblockingService::Processor.new(handler) + queue = Queue.new + @transport = SpecServerSocket.new('localhost', @port, queue) + transport_factory = Thrift::FramedTransportFactory.new + logger = Logger.new(STDERR) + logger.level = Logger::WARN + @server = Thrift::NonblockingServer.new(processor, @transport, transport_factory, nil, 5, logger) + handler.server = @server + @server_thread = Thread.new(Thread.current) do |master_thread| + begin + @server.serve + rescue => e + p e + puts e.backtrace * "\n" + master_thread.raise e + end + end + queue.pop + + @clients = [] + @catch_exceptions = false + end + + after(:each) do + @clients.each { |client, trans| trans.close } + # @server.shutdown(1) + @server_thread.kill + @transport.close + end + + def setup_client(queue = nil) + transport = SpecTransport.new(Thrift::FramedTransport.new(Thrift::Socket.new('localhost', @port)), queue) + protocol = Thrift::BinaryProtocol.new(transport) + client = SpecNamespace::NonblockingService::Client.new(protocol) + transport.open + @clients << [client, transport] + client + end + + def setup_client_thread(result) + queue = Queue.new + Thread.new do + begin + client = setup_client + while (cmd = queue.pop) + msg, *args = cmd + case msg + when :block + result << client.block + when :unblock + client.unblock(args.first) + when :hello + result << client.greeting(true) # ignore result + when :sleep + client.sleep(args[0] || 0.5) + result << :slept + when :shutdown + client.shutdown + when :exit + result << :done + break + end + end + @clients.each { |c,t| t.close and break if c == client } #close the transport + rescue => e + raise e unless @catch_exceptions + end + end + queue + end + + it "should handle basic message passing" do + client = setup_client + expect(client.greeting(true)).to eq(SpecNamespace::Hello.new) + expect(client.greeting(false)).to eq(SpecNamespace::Hello.new(:greeting => 'Aloha!')) + @server.shutdown + end + + it "should handle concurrent clients" do + queue = Queue.new + trans_queue = Queue.new + 4.times do + Thread.new(Thread.current) do |main_thread| + begin + queue.push setup_client(trans_queue).block + rescue => e + main_thread.raise e + end + end + end + 4.times { trans_queue.pop } + setup_client.unblock(4) + 4.times { expect(queue.pop).to be_truthy } + @server.shutdown + end + + it "should handle messages from more than 5 long-lived connections" do + queues = [] + result = Queue.new + 7.times do |i| + queues << setup_client_thread(result) + Thread.pass if i == 4 # give the server time to accept connections + end + client = setup_client + # block 4 connections + 4.times { |i| queues[i] << :block } + queues[4] << :hello + queues[5] << :hello + queues[6] << :hello + 3.times { expect(result.pop).to eq(SpecNamespace::Hello.new) } + expect(client.greeting(true)).to eq(SpecNamespace::Hello.new) + queues[5] << [:unblock, 4] + 4.times { expect(result.pop).to be_truthy } + queues[2] << :hello + expect(result.pop).to eq(SpecNamespace::Hello.new) + expect(client.greeting(false)).to eq(SpecNamespace::Hello.new(:greeting => 'Aloha!')) + 7.times { queues.shift << :exit } + expect(client.greeting(true)).to eq(SpecNamespace::Hello.new) + @server.shutdown + end + + it "should shut down when asked" do + # connect first to ensure it's running + client = setup_client + client.greeting(false) # force a message pass + @server.shutdown + expect(@server_thread.join(2)).to be_an_instance_of(Thread) + end + + it "should continue processing active messages when shutting down" do + result = Queue.new + client = setup_client_thread(result) + client << :sleep + sleep 0.1 # give the server time to start processing the client's message + @server.shutdown + expect(@server_thread.join(2)).to be_an_instance_of(Thread) + expect(result.pop).to eq(:slept) + end + + it "should kill active messages when they don't expire while shutting down" do + result = Queue.new + client = setup_client_thread(result) + client << [:sleep, 10] + sleep 0.1 # start processing the client's message + @server.shutdown(1) + @catch_exceptions = true + expect(@server_thread.join(3)).not_to be_nil + expect(result).to be_empty + end + + it "should allow shutting down in response to a message" do + client = setup_client + expect(client.greeting(true)).to eq(SpecNamespace::Hello.new) + client.shutdown + expect(@server_thread.join(2)).not_to be_nil + end + end +end -- cgit v1.2.3