diff options
Diffstat (limited to '')
-rw-r--r-- | src/jaegertracing/thrift/lib/erl/src/thrift_socket_transport.erl | 176 |
1 files changed, 176 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/erl/src/thrift_socket_transport.erl b/src/jaegertracing/thrift/lib/erl/src/thrift_socket_transport.erl new file mode 100644 index 000000000..fa10ed0c6 --- /dev/null +++ b/src/jaegertracing/thrift/lib/erl/src/thrift_socket_transport.erl @@ -0,0 +1,176 @@ +%% +%% Licensed to the Apache Software Foundation (ASF) under one +%% or more contributor license agreements. See the NOTICE file +%% distributed with this work for additional information +%% regarding copyright ownership. The ASF licenses this file +%% to you under the Apache License, Version 2.0 (the +%% "License"); you may not use this file except in compliance +%% with the License. You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% + +-module(thrift_socket_transport). + +-behaviour(thrift_transport). + +%% constructors +-export([new/1, new/2]). +%% transport callbacks +-export([read/2, read_exact/2, write/2, flush/1, close/1]). +%% legacy api +-export([new_transport_factory/3]). + + +-record(t_socket, { + socket, + recv_timeout=60000, + buffer = [] +}). + +-type state() :: #t_socket{}. + + +-spec new(Socket::any()) -> + thrift_transport:t_transport(). + +new(Socket) -> new(Socket, []). + +-spec new(Socket::any(), Opts::list()) -> + thrift_transport:t_transport(). + +new(Socket, Opts) when is_list(Opts) -> + State = parse_opts(Opts, #t_socket{socket = Socket}), + thrift_transport:new(?MODULE, State). + + +parse_opts([{recv_timeout, Timeout}|Rest], State) +when is_integer(Timeout), Timeout > 0 -> + parse_opts(Rest, State#t_socket{recv_timeout = Timeout}); +parse_opts([{recv_timeout, infinity}|Rest], State) -> + parse_opts(Rest, State#t_socket{recv_timeout = infinity}); +parse_opts([], State) -> + State. + + +-include("thrift_transport_behaviour.hrl"). + + +read(State = #t_socket{buffer = Buf}, Len) +when is_integer(Len), Len >= 0 -> + Binary = iolist_to_binary(Buf), + case iolist_size(Binary) of + X when X >= Len -> + {Result, Remaining} = split_binary(Binary, Len), + {State#t_socket{buffer = Remaining}, {ok, Result}}; + _ -> recv(State, Len) + end. + +recv(State = #t_socket{socket = Socket, buffer = Buf}, Len) -> + case gen_tcp:recv(Socket, 0, State#t_socket.recv_timeout) of + {error, Error} -> + gen_tcp:close(Socket), + {State, {error, Error}}; + {ok, Data} -> + Binary = iolist_to_binary([Buf, Data]), + Give = min(iolist_size(Binary), Len), + {Result, Remaining} = split_binary(Binary, Give), + {State#t_socket{buffer = Remaining}, {ok, Result}} + end. + + +read_exact(State = #t_socket{buffer = Buf}, Len) +when is_integer(Len), Len >= 0 -> + Binary = iolist_to_binary(Buf), + case iolist_size(Binary) of + X when X >= Len -> read(State, Len); + X -> + case gen_tcp:recv(State#t_socket.socket, Len - X, State#t_socket.recv_timeout) of + {error, Error} -> + gen_tcp:close(State#t_socket.socket), + {State, {error, Error}}; + {ok, Data} -> + {State#t_socket{buffer = []}, {ok, <<Binary/binary, Data/binary>>}} + end + end. + + +write(State = #t_socket{socket = Socket}, Data) -> + case gen_tcp:send(Socket, Data) of + {error, Error} -> + gen_tcp:close(Socket), + {State, {error, Error}}; + ok -> {State, ok} + end. + + +flush(State) -> + {State#t_socket{buffer = []}, ok}. + + +close(State = #t_socket{socket = Socket}) -> + {State, gen_tcp:close(Socket)}. + + +%% legacy api. left for compatibility + +%% The following "local" record is filled in by parse_factory_options/2 +%% below. These options can be passed to new_protocol_factory/3 in a +%% proplists-style option list. They're parsed like this so it is an O(n) +%% operation instead of O(n^2) +-record(factory_opts, { + connect_timeout = infinity, + sockopts = [], + framed = false +}). + +parse_factory_options([], FactoryOpts, TransOpts) -> {FactoryOpts, TransOpts}; +parse_factory_options([{framed, Bool}|Rest], FactoryOpts, TransOpts) +when is_boolean(Bool) -> + parse_factory_options(Rest, FactoryOpts#factory_opts{framed = Bool}, TransOpts); +parse_factory_options([{sockopts, OptList}|Rest], FactoryOpts, TransOpts) +when is_list(OptList) -> + parse_factory_options(Rest, FactoryOpts#factory_opts{sockopts = OptList}, TransOpts); +parse_factory_options([{connect_timeout, TO}|Rest], FactoryOpts, TransOpts) +when TO =:= infinity; is_integer(TO) -> + parse_factory_options(Rest, FactoryOpts#factory_opts{connect_timeout = TO}, TransOpts); +parse_factory_options([{recv_timeout, TO}|Rest], FactoryOpts, TransOpts) +when TO =:= infinity; is_integer(TO) -> + parse_factory_options(Rest, FactoryOpts, [{recv_timeout, TO}] ++ TransOpts). + + +%% Generates a "transport factory" function - a fun which returns a thrift_transport() +%% instance. +%% State can be passed into a protocol factory to generate a connection to a +%% thrift server over a socket. +new_transport_factory(Host, Port, Options) -> + {FactoryOpts, TransOpts} = parse_factory_options(Options, #factory_opts{}, []), + {ok, fun() -> SockOpts = [binary, + {packet, 0}, + {active, false}, + {nodelay, true}|FactoryOpts#factory_opts.sockopts + ], + case catch gen_tcp:connect( + Host, + Port, + SockOpts, + FactoryOpts#factory_opts.connect_timeout + ) of + {ok, Sock} -> + {ok, Transport} = thrift_socket_transport:new(Sock, TransOpts), + {ok, BufTransport} = case FactoryOpts#factory_opts.framed of + true -> thrift_framed_transport:new(Transport); + false -> thrift_buffered_transport:new(Transport) + end, + {ok, BufTransport}; + Error -> Error + end + end}. + |