summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/thrift/lib/erl/src/thrift_disk_log_transport.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/jaegertracing/thrift/lib/erl/src/thrift_disk_log_transport.erl')
-rw-r--r--src/jaegertracing/thrift/lib/erl/src/thrift_disk_log_transport.erl123
1 files changed, 123 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/erl/src/thrift_disk_log_transport.erl b/src/jaegertracing/thrift/lib/erl/src/thrift_disk_log_transport.erl
new file mode 100644
index 000000000..de8ee417b
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/erl/src/thrift_disk_log_transport.erl
@@ -0,0 +1,123 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+%%% Todo: this might be better off as a gen_server type of transport
+%%% that handles stuff like group commit, similar to TFileTransport
+%%% in cpp land
+-module(thrift_disk_log_transport).
+
+-behaviour(thrift_transport).
+
+%% API
+-export([new/2, new_transport_factory/2, new_transport_factory/3]).
+
+%% thrift_transport callbacks
+-export([read/2, write/2, force_flush/1, flush/1, close/1]).
+
+%% state
+-record(dl_transport, {log,
+ close_on_close = false,
+ sync_every = infinity,
+ sync_tref}).
+-type state() :: #dl_transport{}.
+-include("thrift_transport_behaviour.hrl").
+
+
+%% Create a transport attached to an already open log.
+%% If you'd like this transport to close the disk_log using disk_log:lclose()
+%% when the transport is closed, pass a {close_on_close, true} tuple in the
+%% Opts list.
+new(LogName, Opts) when is_atom(LogName), is_list(Opts) ->
+ State = parse_opts(Opts, #dl_transport{log = LogName}),
+
+ State2 =
+ case State#dl_transport.sync_every of
+ N when is_integer(N), N > 0 ->
+ {ok, TRef} = timer:apply_interval(N, ?MODULE, force_flush, [State]),
+ State#dl_transport{sync_tref = TRef};
+ _ -> State
+ end,
+
+ thrift_transport:new(?MODULE, State2).
+
+
+parse_opts([], State) ->
+ State;
+parse_opts([{close_on_close, Bool} | Rest], State) when is_boolean(Bool) ->
+ parse_opts(Rest, State#dl_transport{close_on_close = Bool});
+parse_opts([{sync_every, Int} | Rest], State) when is_integer(Int), Int > 0 ->
+ parse_opts(Rest, State#dl_transport{sync_every = Int}).
+
+
+%%%% TRANSPORT IMPLENTATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+%% disk_log_transport is write-only
+read(State, _Len) ->
+ {State, {error, no_read_from_disk_log}}.
+
+write(This = #dl_transport{log = Log}, Data) ->
+ {This, disk_log:balog(Log, erlang:iolist_to_binary(Data))}.
+
+force_flush(#dl_transport{log = Log}) ->
+ error_logger:info_msg("~p syncing~n", [?MODULE]),
+ disk_log:sync(Log).
+
+flush(This = #dl_transport{log = Log, sync_every = SE}) ->
+ case SE of
+ undefined -> % no time-based sync
+ disk_log:sync(Log);
+ _Else -> % sync will happen automagically
+ ok
+ end,
+ {This, ok}.
+
+
+
+
+%% On close, close the underlying log if we're configured to do so.
+close(This = #dl_transport{close_on_close = false}) ->
+ {This, ok};
+close(This = #dl_transport{log = Log}) ->
+ {This, disk_log:lclose(Log)}.
+
+
+%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+new_transport_factory(Name, ExtraLogOpts) ->
+ new_transport_factory(Name, ExtraLogOpts, [{close_on_close, true},
+ {sync_every, 500}]).
+
+new_transport_factory(Name, ExtraLogOpts, TransportOpts) ->
+ F = fun() -> factory_impl(Name, ExtraLogOpts, TransportOpts) end,
+ {ok, F}.
+
+factory_impl(Name, ExtraLogOpts, TransportOpts) ->
+ LogOpts = [{name, Name},
+ {format, external},
+ {type, wrap} |
+ ExtraLogOpts],
+ Log =
+ case disk_log:open(LogOpts) of
+ {ok, LogS} ->
+ LogS;
+ {repaired, LogS, Info1, Info2} ->
+ error_logger:info_msg("Disk log ~p repaired: ~p, ~p~n", [LogS, Info1, Info2]),
+ LogS
+ end,
+ new(Log, TransportOpts).