diff options
Diffstat (limited to 'src/jaegertracing/thrift/lib/erl/src/thrift_reconnecting_client.erl')
-rw-r--r-- | src/jaegertracing/thrift/lib/erl/src/thrift_reconnecting_client.erl | 258 |
1 files changed, 258 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/erl/src/thrift_reconnecting_client.erl b/src/jaegertracing/thrift/lib/erl/src/thrift_reconnecting_client.erl new file mode 100644 index 000000000..538fd3ad1 --- /dev/null +++ b/src/jaegertracing/thrift/lib/erl/src/thrift_reconnecting_client.erl @@ -0,0 +1,258 @@ +%% +%% Licensed to the Apache Software Foundation (ASF) under one +%% or more contributor license agreements. See the NOTICE file +%% distributed with this work for additional information +%% regarding copyright ownership. The ASF licenses this file +%% to you under the Apache License, Version 2.0 (the +%% "License"); you may not use this file except in compliance +%% with the License. You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% + +-module(thrift_reconnecting_client). + +-behaviour(gen_server). + +%% API +-export([ call/3, + get_stats/1, + get_and_reset_stats/1 ]). + +-export([ start_link/6 ]). + +%% gen_server callbacks +-export([ init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 ]). + +-record( state, { client = nil, + host, + port, + thrift_svc, + thrift_opts, + reconn_min, + reconn_max, + reconn_time = 0, + op_cnt_dict, + op_time_dict } ). + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} +%% Description: Starts the server +%%-------------------------------------------------------------------- +start_link( Host, Port, + ThriftSvc, ThriftOpts, + ReconnMin, ReconnMax ) -> + gen_server:start_link( ?MODULE, + [ Host, Port, + ThriftSvc, ThriftOpts, + ReconnMin, ReconnMax ], + [] ). + +call( Pid, Op, Args ) -> + gen_server:call( Pid, { call, Op, Args } ). + +get_stats( Pid ) -> + gen_server:call( Pid, get_stats ). + +get_and_reset_stats( Pid ) -> + gen_server:call( Pid, get_and_reset_stats ). + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% Function: init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Start the server. +%%-------------------------------------------------------------------- +init( [ Host, Port, TSvc, TOpts, ReconnMin, ReconnMax ] ) -> + process_flag( trap_exit, true ), + + State = #state{ host = Host, + port = Port, + thrift_svc = TSvc, + thrift_opts = TOpts, + reconn_min = ReconnMin, + reconn_max = ReconnMax, + op_cnt_dict = dict:new(), + op_time_dict = dict:new() }, + + { ok, try_connect( State ) }. + +%%-------------------------------------------------------------------- +%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% Description: Handling call messages +%%-------------------------------------------------------------------- +handle_call( { call, Op, _ }, + _From, + State = #state{ client = nil } ) -> + { reply, { error, noconn }, incr_stats( Op, "failfast", 1, State ) }; + +handle_call( { call, Op, Args }, + _From, + State=#state{ client = Client } ) -> + + Timer = timer_fun(), + Result = ( catch thrift_client:call( Client, Op, Args) ), + Time = Timer(), + + case Result of + { C, { ok, Reply } } -> + S = incr_stats( Op, "success", Time, State#state{ client = C } ), + { reply, {ok, Reply }, S }; + { _, { E, Msg } } when E == error; E == exception -> + S = incr_stats( Op, "error", Time, try_connect( State ) ), + { reply, { E, Msg }, S }; + Other -> + S = incr_stats( Op, "error", Time, try_connect( State ) ), + { reply, Other, S } + end; + +handle_call( get_stats, + _From, + State = #state{} ) -> + { reply, stats( State ), State }; + +handle_call( get_and_reset_stats, + _From, + State = #state{} ) -> + { reply, stats( State ), reset_stats( State ) }. + +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling cast messages +%%-------------------------------------------------------------------- +handle_cast( _Msg, State ) -> + { noreply, State }. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- +handle_info( try_connect, State ) -> + { noreply, try_connect( State ) }; + +handle_info( _Info, State ) -> + { noreply, State }. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate( _Reason, #state{ client = Client } ) -> + thrift_client:close( Client ), + ok. + +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change( _OldVsn, State, _Extra ) -> + { ok, State }. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- +try_connect( State = #state{ client = OldClient, + host = Host, + port = Port, + thrift_svc = TSvc, + thrift_opts = TOpts } ) -> + + case OldClient of + nil -> ok; + _ -> ( catch thrift_client:close( OldClient ) ) + end, + + case catch thrift_client_util:new( Host, Port, TSvc, TOpts ) of + { ok, Client } -> + State#state{ client = Client, reconn_time = 0 }; + { E, Msg } when E == error; E == exception -> + ReconnTime = reconn_time( State ), + error_logger:error_msg( "[~w] ~w connect failed (~w), trying again in ~w ms~n", + [ self(), TSvc, Msg, ReconnTime ] ), + erlang:send_after( ReconnTime, self(), try_connect ), + State#state{ client = nil, reconn_time = ReconnTime } + end. + + +reconn_time( #state{ reconn_min = ReconnMin, reconn_time = 0 } ) -> + ReconnMin; +reconn_time( #state{ reconn_max = ReconnMax, reconn_time = ReconnMax } ) -> + ReconnMax; +reconn_time( #state{ reconn_max = ReconnMax, reconn_time = R } ) -> + Backoff = 2 * R, + case Backoff > ReconnMax of + true -> ReconnMax; + false -> Backoff + end. + +-ifdef(time_correction). +timer_fun() -> + T1 = erlang:monotonic_time(), + fun() -> + T2 = erlang:monotonic_time(), + erlang:convert_time_unit(T2 - T1, native, micro_seconds) + end. +-else. +timer_fun() -> + T1 = erlang:timestamp(), + fun() -> + T2 = erlang:timestamp(), + timer:now_diff(T2, T1) + end. +-endif. + +incr_stats( Op, Result, Time, + State = #state{ op_cnt_dict = OpCntDict, + op_time_dict = OpTimeDict } ) -> + Key = lists:flatten( [ atom_to_list( Op ), [ "_" | Result ] ] ), + State#state{ op_cnt_dict = dict:update_counter( Key, 1, OpCntDict ), + op_time_dict = dict:update_counter( Key, Time, OpTimeDict ) }. + + +stats( #state{ thrift_svc = TSvc, + op_cnt_dict = OpCntDict, + op_time_dict = OpTimeDict } ) -> + Svc = atom_to_list(TSvc), + + F = fun( Key, Count, Stats ) -> + Name = lists:flatten( [ Svc, [ "_" | Key ] ] ), + Micros = dict:fetch( Key, OpTimeDict ), + [ { Name, Count, Micros } | Stats ] + end, + + dict:fold( F, [], OpCntDict ). + +reset_stats( State = #state{} ) -> + State#state{ op_cnt_dict = dict:new(), op_time_dict = dict:new() }. |