summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/thrift/lib/erl/src/thrift_socket_transport.erl
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/jaegertracing/thrift/lib/erl/src/thrift_socket_transport.erl176
1 files changed, 176 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/erl/src/thrift_socket_transport.erl b/src/jaegertracing/thrift/lib/erl/src/thrift_socket_transport.erl
new file mode 100644
index 000000000..fa10ed0c6
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/erl/src/thrift_socket_transport.erl
@@ -0,0 +1,176 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+-module(thrift_socket_transport).
+
+-behaviour(thrift_transport).
+
+%% constructors
+-export([new/1, new/2]).
+%% transport callbacks
+-export([read/2, read_exact/2, write/2, flush/1, close/1]).
+%% legacy api
+-export([new_transport_factory/3]).
+
+
+-record(t_socket, {
+ socket,
+ recv_timeout=60000,
+ buffer = []
+}).
+
+-type state() :: #t_socket{}.
+
+
+-spec new(Socket::any()) ->
+ thrift_transport:t_transport().
+
+new(Socket) -> new(Socket, []).
+
+-spec new(Socket::any(), Opts::list()) ->
+ thrift_transport:t_transport().
+
+new(Socket, Opts) when is_list(Opts) ->
+ State = parse_opts(Opts, #t_socket{socket = Socket}),
+ thrift_transport:new(?MODULE, State).
+
+
+parse_opts([{recv_timeout, Timeout}|Rest], State)
+when is_integer(Timeout), Timeout > 0 ->
+ parse_opts(Rest, State#t_socket{recv_timeout = Timeout});
+parse_opts([{recv_timeout, infinity}|Rest], State) ->
+ parse_opts(Rest, State#t_socket{recv_timeout = infinity});
+parse_opts([], State) ->
+ State.
+
+
+-include("thrift_transport_behaviour.hrl").
+
+
+read(State = #t_socket{buffer = Buf}, Len)
+when is_integer(Len), Len >= 0 ->
+ Binary = iolist_to_binary(Buf),
+ case iolist_size(Binary) of
+ X when X >= Len ->
+ {Result, Remaining} = split_binary(Binary, Len),
+ {State#t_socket{buffer = Remaining}, {ok, Result}};
+ _ -> recv(State, Len)
+ end.
+
+recv(State = #t_socket{socket = Socket, buffer = Buf}, Len) ->
+ case gen_tcp:recv(Socket, 0, State#t_socket.recv_timeout) of
+ {error, Error} ->
+ gen_tcp:close(Socket),
+ {State, {error, Error}};
+ {ok, Data} ->
+ Binary = iolist_to_binary([Buf, Data]),
+ Give = min(iolist_size(Binary), Len),
+ {Result, Remaining} = split_binary(Binary, Give),
+ {State#t_socket{buffer = Remaining}, {ok, Result}}
+ end.
+
+
+read_exact(State = #t_socket{buffer = Buf}, Len)
+when is_integer(Len), Len >= 0 ->
+ Binary = iolist_to_binary(Buf),
+ case iolist_size(Binary) of
+ X when X >= Len -> read(State, Len);
+ X ->
+ case gen_tcp:recv(State#t_socket.socket, Len - X, State#t_socket.recv_timeout) of
+ {error, Error} ->
+ gen_tcp:close(State#t_socket.socket),
+ {State, {error, Error}};
+ {ok, Data} ->
+ {State#t_socket{buffer = []}, {ok, <<Binary/binary, Data/binary>>}}
+ end
+ end.
+
+
+write(State = #t_socket{socket = Socket}, Data) ->
+ case gen_tcp:send(Socket, Data) of
+ {error, Error} ->
+ gen_tcp:close(Socket),
+ {State, {error, Error}};
+ ok -> {State, ok}
+ end.
+
+
+flush(State) ->
+ {State#t_socket{buffer = []}, ok}.
+
+
+close(State = #t_socket{socket = Socket}) ->
+ {State, gen_tcp:close(Socket)}.
+
+
+%% legacy api. left for compatibility
+
+%% The following "local" record is filled in by parse_factory_options/2
+%% below. These options can be passed to new_protocol_factory/3 in a
+%% proplists-style option list. They're parsed like this so it is an O(n)
+%% operation instead of O(n^2)
+-record(factory_opts, {
+ connect_timeout = infinity,
+ sockopts = [],
+ framed = false
+}).
+
+parse_factory_options([], FactoryOpts, TransOpts) -> {FactoryOpts, TransOpts};
+parse_factory_options([{framed, Bool}|Rest], FactoryOpts, TransOpts)
+when is_boolean(Bool) ->
+ parse_factory_options(Rest, FactoryOpts#factory_opts{framed = Bool}, TransOpts);
+parse_factory_options([{sockopts, OptList}|Rest], FactoryOpts, TransOpts)
+when is_list(OptList) ->
+ parse_factory_options(Rest, FactoryOpts#factory_opts{sockopts = OptList}, TransOpts);
+parse_factory_options([{connect_timeout, TO}|Rest], FactoryOpts, TransOpts)
+when TO =:= infinity; is_integer(TO) ->
+ parse_factory_options(Rest, FactoryOpts#factory_opts{connect_timeout = TO}, TransOpts);
+parse_factory_options([{recv_timeout, TO}|Rest], FactoryOpts, TransOpts)
+when TO =:= infinity; is_integer(TO) ->
+ parse_factory_options(Rest, FactoryOpts, [{recv_timeout, TO}] ++ TransOpts).
+
+
+%% Generates a "transport factory" function - a fun which returns a thrift_transport()
+%% instance.
+%% State can be passed into a protocol factory to generate a connection to a
+%% thrift server over a socket.
+new_transport_factory(Host, Port, Options) ->
+ {FactoryOpts, TransOpts} = parse_factory_options(Options, #factory_opts{}, []),
+ {ok, fun() -> SockOpts = [binary,
+ {packet, 0},
+ {active, false},
+ {nodelay, true}|FactoryOpts#factory_opts.sockopts
+ ],
+ case catch gen_tcp:connect(
+ Host,
+ Port,
+ SockOpts,
+ FactoryOpts#factory_opts.connect_timeout
+ ) of
+ {ok, Sock} ->
+ {ok, Transport} = thrift_socket_transport:new(Sock, TransOpts),
+ {ok, BufTransport} = case FactoryOpts#factory_opts.framed of
+ true -> thrift_framed_transport:new(Transport);
+ false -> thrift_buffered_transport:new(Transport)
+ end,
+ {ok, BufTransport};
+ Error -> Error
+ end
+ end}.
+