diff options
Diffstat (limited to '')
-rw-r--r-- | src/jaegertracing/thrift/lib/erl/src/thrift_framed_transport.erl | 126 |
1 files changed, 126 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/erl/src/thrift_framed_transport.erl b/src/jaegertracing/thrift/lib/erl/src/thrift_framed_transport.erl new file mode 100644 index 000000000..9a5d6af55 --- /dev/null +++ b/src/jaegertracing/thrift/lib/erl/src/thrift_framed_transport.erl @@ -0,0 +1,126 @@ +%% +%% Licensed to the Apache Software Foundation (ASF) under one +%% or more contributor license agreements. See the NOTICE file +%% distributed with this work for additional information +%% regarding copyright ownership. The ASF licenses this file +%% to you under the Apache License, Version 2.0 (the +%% "License"); you may not use this file except in compliance +%% with the License. You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% + +-module(thrift_framed_transport). + +-behaviour(thrift_transport). + +%% constructor +-export([new/1]). +%% protocol callbacks +-export([read/2, read_exact/2, write/2, flush/1, close/1]). + + +-record(t_framed, { + wrapped, + read_buffer, + write_buffer +}). + +-type state() :: #t_framed{}. + + +-spec new(Transport::thrift_transport:t_transport()) -> + thrift_transport:t_transport(). + +new(Wrapped) -> + State = #t_framed{ + wrapped = Wrapped, + read_buffer = [], + write_buffer = [] + }, + thrift_transport:new(?MODULE, State). + + +-include("thrift_transport_behaviour.hrl"). + + +read(State = #t_framed{wrapped = Wrapped, read_buffer = Buffer}, Len) +when is_integer(Len), Len >= 0 -> + Binary = iolist_to_binary(Buffer), + case Binary of + <<>> when Len > 0 -> + case next_frame(Wrapped) of + {NewState, {ok, Frame}} -> + NewBinary = iolist_to_binary([Binary, Frame]), + Give = min(iolist_size(NewBinary), Len), + {Result, Remaining} = split_binary(NewBinary, Give), + {State#t_framed{wrapped = NewState, read_buffer = Remaining}, {ok, Result}}; + {NewState, Error} -> + {State#t_framed{wrapped = NewState}, Error} + end; + %% read of zero bytes + <<>> -> {State, {ok, <<>>}}; + %% read buffer is nonempty + _ -> + Give = min(iolist_size(Binary), Len), + {Result, Remaining} = split_binary(Binary, Give), + {State#t_framed{read_buffer = Remaining}, {ok, Result}} + end. + + +read_exact(State = #t_framed{wrapped = Wrapped, read_buffer = Buffer}, Len) +when is_integer(Len), Len >= 0 -> + Binary = iolist_to_binary(Buffer), + case iolist_size(Binary) of + %% read buffer is larger than requested read size + X when X >= Len -> + {Result, Remaining} = split_binary(Binary, Len), + {State#t_framed{read_buffer = Remaining}, {ok, Result}}; + %% read buffer is insufficient for requested read size + _ -> + case next_frame(Wrapped) of + {NewState, {ok, Frame}} -> + read_exact( + State#t_framed{wrapped = NewState, read_buffer = [Buffer, Frame]}, + Len + ); + {NewState, Error} -> + {State#t_framed{wrapped = NewState}, Error} + end + end. + +next_frame(Transport) -> + case thrift_transport:read_exact(Transport, 4) of + {NewState, {ok, <<FrameLength:32/integer-signed-big>>}} -> + thrift_transport:read_exact(NewState, FrameLength); + Error -> Error + end. + + +write(State = #t_framed{write_buffer = Buffer}, Data) -> + {State#t_framed{write_buffer = [Buffer, Data]}, ok}. + + +flush(State = #t_framed{write_buffer = Buffer, wrapped = Wrapped}) -> + case iolist_size(Buffer) of + %% if write buffer is empty, do nothing + 0 -> {State, ok}; + FrameLen -> + Data = [<<FrameLen:32/integer-signed-big>>, Buffer], + {Written, Response} = thrift_transport:write(Wrapped, Data), + {Flushed, ok} = thrift_transport:flush(Written), + {State#t_framed{wrapped = Flushed, write_buffer = []}, Response} + end. + + +close(State = #t_framed{wrapped = Wrapped}) -> + {Closed, Result} = thrift_transport:close(Wrapped), + {State#t_framed{wrapped = Closed}, Result}. + |