summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/thrift/lib/delphi/src/Thrift.Transport.pas
diff options
context:
space:
mode:
Diffstat (limited to 'src/jaegertracing/thrift/lib/delphi/src/Thrift.Transport.pas')
-rw-r--r--src/jaegertracing/thrift/lib/delphi/src/Thrift.Transport.pas1523
1 files changed, 1523 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/delphi/src/Thrift.Transport.pas b/src/jaegertracing/thrift/lib/delphi/src/Thrift.Transport.pas
new file mode 100644
index 000000000..c2071df89
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/delphi/src/Thrift.Transport.pas
@@ -0,0 +1,1523 @@
+(*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *)
+unit Thrift.Transport;
+
+{$I Thrift.Defines.inc}
+{$SCOPEDENUMS ON}
+
+interface
+
+uses
+ Classes,
+ SysUtils,
+ Math,
+ Generics.Collections,
+ {$IFDEF OLD_UNIT_NAMES}
+ WinSock, Sockets,
+ {$ELSE}
+ Winapi.WinSock,
+ {$IFDEF OLD_SOCKETS}
+ Web.Win.Sockets,
+ {$ELSE}
+ Thrift.Socket,
+ {$ENDIF}
+ {$ENDIF}
+ Thrift.Collections,
+ Thrift.Exception,
+ Thrift.Utils,
+ Thrift.WinHTTP,
+ Thrift.Stream;
+
+type
+ ITransport = interface
+ ['{DB84961E-8BB3-4532-99E1-A8C7AC2300F7}']
+ function GetIsOpen: Boolean;
+ property IsOpen: Boolean read GetIsOpen;
+ function Peek: Boolean;
+ procedure Open;
+ procedure Close;
+ function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
+ function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
+ function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
+ function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
+ procedure Write( const buf: TBytes); overload;
+ procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
+ procedure Write( const pBuf : Pointer; off, len : Integer); overload;
+ procedure Write( const pBuf : Pointer; len : Integer); overload;
+ procedure Flush;
+ end;
+
+ TTransportImpl = class( TInterfacedObject, ITransport)
+ protected
+ function GetIsOpen: Boolean; virtual; abstract;
+ property IsOpen: Boolean read GetIsOpen;
+ function Peek: Boolean; virtual;
+ procedure Open(); virtual; abstract;
+ procedure Close(); virtual; abstract;
+ function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
+ function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual; abstract;
+ function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
+ function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual;
+ procedure Write( const buf: TBytes); overload; inline;
+ procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; inline;
+ procedure Write( const pBuf : Pointer; len : Integer); overload; inline;
+ procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract;
+ procedure Flush; virtual;
+ end;
+
+ TTransportException = class( TException)
+ public
+ type
+ TExceptionType = (
+ Unknown,
+ NotOpen,
+ AlreadyOpen,
+ TimedOut,
+ EndOfFile,
+ BadArgs,
+ Interrupted
+ );
+ private
+ function GetType: TExceptionType;
+ protected
+ constructor HiddenCreate(const Msg: string);
+ public
+ class function Create( AType: TExceptionType): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
+ class function Create( const msg: string): TTransportException; reintroduce; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
+ class function Create( AType: TExceptionType; const msg: string): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
+ property Type_: TExceptionType read GetType;
+ end;
+
+ // Needed to remove deprecation warning
+ TTransportExceptionSpecialized = class abstract (TTransportException)
+ public
+ constructor Create(const Msg: string);
+ end;
+
+ TTransportExceptionUnknown = class (TTransportExceptionSpecialized);
+ TTransportExceptionNotOpen = class (TTransportExceptionSpecialized);
+ TTransportExceptionAlreadyOpen = class (TTransportExceptionSpecialized);
+ TTransportExceptionTimedOut = class (TTransportExceptionSpecialized);
+ TTransportExceptionEndOfFile = class (TTransportExceptionSpecialized);
+ TTransportExceptionBadArgs = class (TTransportExceptionSpecialized);
+ TTransportExceptionInterrupted = class (TTransportExceptionSpecialized);
+
+ TSecureProtocol = (
+ SSL_2, SSL_3, TLS_1, // outdated, for compatibilty only
+ TLS_1_1, TLS_1_2 // secure (as of today)
+ );
+
+ TSecureProtocols = set of TSecureProtocol;
+
+ IHTTPClient = interface( ITransport )
+ ['{7BF615DD-8680-4004-A5B2-88947BA3BA3D}']
+ procedure SetDnsResolveTimeout(const Value: Integer);
+ function GetDnsResolveTimeout: Integer;
+ procedure SetConnectionTimeout(const Value: Integer);
+ function GetConnectionTimeout: Integer;
+ procedure SetSendTimeout(const Value: Integer);
+ function GetSendTimeout: Integer;
+ procedure SetReadTimeout(const Value: Integer);
+ function GetReadTimeout: Integer;
+ function GetCustomHeaders: IThriftDictionary<string,string>;
+ procedure SendRequest;
+ function GetSecureProtocols : TSecureProtocols;
+ procedure SetSecureProtocols( const value : TSecureProtocols);
+
+ property DnsResolveTimeout: Integer read GetDnsResolveTimeout write SetDnsResolveTimeout;
+ property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
+ property SendTimeout: Integer read GetSendTimeout write SetSendTimeout;
+ property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
+ property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
+ property SecureProtocols : TSecureProtocols read GetSecureProtocols write SetSecureProtocols;
+ end;
+
+ IServerTransport = interface
+ ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
+ procedure Listen;
+ procedure Close;
+ function Accept( const fnAccepting: TProc): ITransport;
+ end;
+
+ TServerTransportImpl = class( TInterfacedObject, IServerTransport)
+ protected
+ procedure Listen; virtual; abstract;
+ procedure Close; virtual; abstract;
+ function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
+ end;
+
+ ITransportFactory = interface
+ ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
+ function GetTransport( const ATrans: ITransport): ITransport;
+ end;
+
+ TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
+ function GetTransport( const ATrans: ITransport): ITransport; virtual;
+ end;
+
+ TTcpSocketStreamImpl = class( TThriftStreamImpl )
+{$IFDEF OLD_SOCKETS}
+ private type
+ TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
+ private
+ FTcpClient : TCustomIpClient;
+ FTimeout : Integer;
+ function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
+ TimeOut: Integer; var wsaError : Integer): Integer;
+ function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
+ var wsaError, bytesReady : Integer): TWaitForData;
+{$ELSE}
+ FTcpClient: TSocket;
+ protected const
+ SLEEP_TIME = 200;
+{$ENDIF}
+ protected
+ procedure Write( const pBuf : Pointer; offset, count: Integer); override;
+ function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
+ procedure Open; override;
+ procedure Close; override;
+ procedure Flush; override;
+
+ function IsOpen: Boolean; override;
+ function ToArray: TBytes; override;
+ public
+{$IFDEF OLD_SOCKETS}
+ constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0);
+{$ELSE}
+ constructor Create( const ATcpClient: TSocket; const aTimeout : Longword = 0);
+{$ENDIF}
+ end;
+
+ IStreamTransport = interface( ITransport )
+ ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
+ function GetInputStream: IThriftStream;
+ function GetOutputStream: IThriftStream;
+ property InputStream : IThriftStream read GetInputStream;
+ property OutputStream : IThriftStream read GetOutputStream;
+ end;
+
+ TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
+ protected
+ FInputStream : IThriftStream;
+ FOutputStream : IThriftStream;
+ protected
+ function GetIsOpen: Boolean; override;
+
+ function GetInputStream: IThriftStream;
+ function GetOutputStream: IThriftStream;
+ public
+ property InputStream : IThriftStream read GetInputStream;
+ property OutputStream : IThriftStream read GetOutputStream;
+
+ procedure Open; override;
+ procedure Close; override;
+ procedure Flush; override;
+ function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
+ procedure Write( const pBuf : Pointer; off, len : Integer); override;
+ constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
+ destructor Destroy; override;
+ end;
+
+ TBufferedStreamImpl = class( TThriftStreamImpl)
+ private
+ FStream : IThriftStream;
+ FBufSize : Integer;
+ FReadBuffer : TMemoryStream;
+ FWriteBuffer : TMemoryStream;
+ protected
+ procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
+ function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
+ procedure Open; override;
+ procedure Close; override;
+ procedure Flush; override;
+ function IsOpen: Boolean; override;
+ function ToArray: TBytes; override;
+ public
+ constructor Create( const AStream: IThriftStream; ABufSize: Integer);
+ destructor Destroy; override;
+ end;
+
+ TServerSocketImpl = class( TServerTransportImpl)
+ private
+{$IFDEF OLD_SOCKETS}
+ FServer : TTcpServer;
+ FPort : Integer;
+ FClientTimeout : Integer;
+{$ELSE}
+ FServer: TServerSocket;
+{$ENDIF}
+ FUseBufferedSocket : Boolean;
+ FOwnsServer : Boolean;
+ protected
+ function Accept( const fnAccepting: TProc) : ITransport; override;
+ public
+{$IFDEF OLD_SOCKETS}
+ constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload;
+ constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
+{$ELSE}
+ constructor Create( const AServer: TServerSocket; AClientTimeout: Longword = 0); overload;
+ constructor Create( APort: Integer; AClientTimeout: Longword = 0; AUseBufferedSockets: Boolean = FALSE); overload;
+{$ENDIF}
+ destructor Destroy; override;
+ procedure Listen; override;
+ procedure Close; override;
+ end;
+
+ TBufferedTransportImpl = class( TTransportImpl )
+ private
+ FInputBuffer : IThriftStream;
+ FOutputBuffer : IThriftStream;
+ FTransport : IStreamTransport;
+ FBufSize : Integer;
+
+ procedure InitBuffers;
+ function GetUnderlyingTransport: ITransport;
+ protected
+ function GetIsOpen: Boolean; override;
+ procedure Flush; override;
+ public
+ procedure Open(); override;
+ procedure Close(); override;
+ function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
+ procedure Write( const pBuf : Pointer; off, len : Integer); override;
+ constructor Create( const ATransport : IStreamTransport ); overload;
+ constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;
+ property UnderlyingTransport: ITransport read GetUnderlyingTransport;
+ property IsOpen: Boolean read GetIsOpen;
+ end;
+
+ TSocketImpl = class(TStreamTransportImpl)
+ private
+{$IFDEF OLD_SOCKETS}
+ FClient : TCustomIpClient;
+{$ELSE}
+ FClient: TSocket;
+{$ENDIF}
+ FOwnsClient : Boolean;
+ FHost : string;
+ FPort : Integer;
+{$IFDEF OLD_SOCKETS}
+ FTimeout : Integer;
+{$ELSE}
+ FTimeout : Longword;
+{$ENDIF}
+
+ procedure InitSocket;
+ protected
+ function GetIsOpen: Boolean; override;
+ public
+ procedure Open; override;
+{$IFDEF OLD_SOCKETS}
+ constructor Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
+ constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
+{$ELSE}
+ constructor Create(const AClient: TSocket; aOwnsClient: Boolean); overload;
+ constructor Create( const AHost: string; APort: Integer; ATimeout: Longword = 0); overload;
+{$ENDIF}
+ destructor Destroy; override;
+ procedure Close; override;
+{$IFDEF OLD_SOCKETS}
+ property TcpClient: TCustomIpClient read FClient;
+{$ELSE}
+ property TcpClient: TSocket read FClient;
+{$ENDIF}
+ property Host : string read FHost;
+ property Port: Integer read FPort;
+ end;
+
+ TFramedTransportImpl = class( TTransportImpl)
+ private const
+ FHeaderSize : Integer = 4;
+ private class var
+ FHeader_Dummy : array of Byte;
+ protected
+ FTransport : ITransport;
+ FWriteBuffer : TMemoryStream;
+ FReadBuffer : TMemoryStream;
+
+ procedure InitWriteBuffer;
+ procedure ReadFrame;
+ public
+ type
+ TFactory = class( TTransportFactoryImpl )
+ public
+ function GetTransport( const ATrans: ITransport): ITransport; override;
+ end;
+
+ {$IFDEF HAVE_CLASS_CTOR}
+ class constructor Create;
+ {$ENDIF}
+
+ constructor Create; overload;
+ constructor Create( const ATrans: ITransport); overload;
+ destructor Destroy; override;
+
+ procedure Open(); override;
+ function GetIsOpen: Boolean; override;
+
+ procedure Close(); override;
+ function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
+ procedure Write( const pBuf : Pointer; off, len : Integer); override;
+ procedure Flush; override;
+ end;
+
+{$IFNDEF HAVE_CLASS_CTOR}
+procedure TFramedTransportImpl_Initialize;
+{$ENDIF}
+
+const
+ DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
+ DEFAULT_THRIFT_SECUREPROTOCOLS = [ TSecureProtocol.TLS_1_1, TSecureProtocol.TLS_1_2];
+
+
+
+implementation
+
+{ TTransportImpl }
+
+procedure TTransportImpl.Flush;
+begin
+ // nothing to do
+end;
+
+function TTransportImpl.Peek: Boolean;
+begin
+ Result := IsOpen;
+end;
+
+function TTransportImpl.Read(var buf: TBytes; off: Integer; len: Integer): Integer;
+begin
+ if Length(buf) > 0
+ then result := Read( @buf[0], Length(buf), off, len)
+ else result := 0;
+end;
+
+function TTransportImpl.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
+begin
+ if Length(buf) > 0
+ then result := ReadAll( @buf[0], Length(buf), off, len)
+ else result := 0;
+end;
+
+procedure TTransportImpl.Write( const buf: TBytes);
+begin
+ if Length(buf) > 0
+ then Write( @buf[0], 0, Length(buf));
+end;
+
+procedure TTransportImpl.Write( const buf: TBytes; off: Integer; len: Integer);
+begin
+ if Length(buf) > 0
+ then Write( @buf[0], off, len);
+end;
+
+function TTransportImpl.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
+var ret : Integer;
+begin
+ result := 0;
+ while result < len do begin
+ ret := Read( pBuf, buflen, off + result, len - result);
+ if ret > 0
+ then Inc( result, ret)
+ else raise TTransportExceptionNotOpen.Create( 'Cannot read, Remote side has closed' );
+ end;
+end;
+
+procedure TTransportImpl.Write( const pBuf : Pointer; len : Integer);
+begin
+ Self.Write( pBuf, 0, len);
+end;
+
+{ TTransportException }
+
+function TTransportException.GetType: TExceptionType;
+begin
+ if Self is TTransportExceptionNotOpen then Result := TExceptionType.NotOpen
+ else if Self is TTransportExceptionAlreadyOpen then Result := TExceptionType.AlreadyOpen
+ else if Self is TTransportExceptionTimedOut then Result := TExceptionType.TimedOut
+ else if Self is TTransportExceptionEndOfFile then Result := TExceptionType.EndOfFile
+ else if Self is TTransportExceptionBadArgs then Result := TExceptionType.BadArgs
+ else if Self is TTransportExceptionInterrupted then Result := TExceptionType.Interrupted
+ else Result := TExceptionType.Unknown;
+end;
+
+constructor TTransportException.HiddenCreate(const Msg: string);
+begin
+ inherited Create(Msg);
+end;
+
+class function TTransportException.Create(AType: TExceptionType): TTransportException;
+begin
+ //no inherited;
+{$WARN SYMBOL_DEPRECATED OFF}
+ Result := Create(AType, '')
+{$WARN SYMBOL_DEPRECATED DEFAULT}
+end;
+
+class function TTransportException.Create(AType: TExceptionType;
+ const msg: string): TTransportException;
+begin
+ case AType of
+ TExceptionType.NotOpen: Result := TTransportExceptionNotOpen.Create(msg);
+ TExceptionType.AlreadyOpen: Result := TTransportExceptionAlreadyOpen.Create(msg);
+ TExceptionType.TimedOut: Result := TTransportExceptionTimedOut.Create(msg);
+ TExceptionType.EndOfFile: Result := TTransportExceptionEndOfFile.Create(msg);
+ TExceptionType.BadArgs: Result := TTransportExceptionBadArgs.Create(msg);
+ TExceptionType.Interrupted: Result := TTransportExceptionInterrupted.Create(msg);
+ else
+ Result := TTransportExceptionUnknown.Create(msg);
+ end;
+end;
+
+class function TTransportException.Create(const msg: string): TTransportException;
+begin
+ Result := TTransportExceptionUnknown.Create(Msg);
+end;
+
+{ TTransportExceptionSpecialized }
+
+constructor TTransportExceptionSpecialized.Create(const Msg: string);
+begin
+ inherited HiddenCreate(Msg);
+end;
+
+{ TTransportFactoryImpl }
+
+function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
+begin
+ Result := ATrans;
+end;
+
+{ TServerSocket }
+
+{$IFDEF OLD_SOCKETS}
+constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);
+begin
+ inherited Create;
+ FServer := AServer;
+ FClientTimeout := AClientTimeout;
+end;
+{$ELSE}
+constructor TServerSocketImpl.Create( const AServer: TServerSocket; AClientTimeout: Longword);
+begin
+ inherited Create;
+ FServer := AServer;
+ FServer.RecvTimeout := AClientTimeout;
+ FServer.SendTimeout := AClientTimeout;
+end;
+{$ENDIF}
+
+{$IFDEF OLD_SOCKETS}
+constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);
+{$ELSE}
+constructor TServerSocketImpl.Create(APort: Integer; AClientTimeout: Longword; AUseBufferedSockets: Boolean);
+{$ENDIF}
+begin
+ inherited Create;
+{$IFDEF OLD_SOCKETS}
+ FPort := APort;
+ FClientTimeout := AClientTimeout;
+ FServer := TTcpServer.Create( nil );
+ FServer.BlockMode := bmBlocking;
+ {$IF CompilerVersion >= 21.0}
+ FServer.LocalPort := AnsiString( IntToStr( FPort));
+ {$ELSE}
+ FServer.LocalPort := IntToStr( FPort);
+ {$IFEND}
+{$ELSE}
+ FServer := TServerSocket.Create(APort, AClientTimeout, AClientTimeout);
+{$ENDIF}
+ FUseBufferedSocket := AUseBufferedSockets;
+ FOwnsServer := True;
+end;
+
+destructor TServerSocketImpl.Destroy;
+begin
+ if FOwnsServer then begin
+ FServer.Free;
+ FServer := nil;
+ end;
+ inherited;
+end;
+
+function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
+var
+{$IFDEF OLD_SOCKETS}
+ client : TCustomIpClient;
+{$ELSE}
+ client: TSocket;
+{$ENDIF}
+ trans : IStreamTransport;
+begin
+ if FServer = nil then begin
+ raise TTransportExceptionNotOpen.Create('No underlying server socket.');
+ end;
+
+{$IFDEF OLD_SOCKETS}
+ client := nil;
+ try
+ client := TCustomIpClient.Create(nil);
+
+ if Assigned(fnAccepting)
+ then fnAccepting();
+
+ if not FServer.Accept( client) then begin
+ client.Free;
+ Result := nil;
+ Exit;
+ end;
+
+ if client = nil then begin
+ Result := nil;
+ Exit;
+ end;
+
+ trans := TSocketImpl.Create( client, TRUE, FClientTimeout);
+ client := nil; // trans owns it now
+
+ if FUseBufferedSocket
+ then result := TBufferedTransportImpl.Create( trans)
+ else result := trans;
+
+ except
+ on E: Exception do begin
+ client.Free;
+ raise TTransportExceptionUnknown.Create(E.ToString);
+ end;
+ end;
+{$ELSE}
+ if Assigned(fnAccepting) then
+ fnAccepting();
+
+ client := FServer.Accept;
+ try
+ trans := TSocketImpl.Create(client, True);
+ client := nil;
+
+ if FUseBufferedSocket then
+ Result := TBufferedTransportImpl.Create(trans)
+ else
+ Result := trans;
+ except
+ client.Free;
+ raise;
+ end;
+{$ENDIF}
+end;
+
+procedure TServerSocketImpl.Listen;
+begin
+ if FServer <> nil then
+ begin
+{$IFDEF OLD_SOCKETS}
+ try
+ FServer.Active := True;
+ except
+ on E: Exception
+ do raise TTransportExceptionUnknown.Create('Could not accept on listening socket: ' + E.Message);
+ end;
+{$ELSE}
+ FServer.Listen;
+{$ENDIF}
+ end;
+end;
+
+procedure TServerSocketImpl.Close;
+begin
+ if FServer <> nil then
+{$IFDEF OLD_SOCKETS}
+ try
+ FServer.Active := False;
+ except
+ on E: Exception
+ do raise TTransportExceptionUnknown.Create('Error on closing socket : ' + E.Message);
+ end;
+{$ELSE}
+ FServer.Close;
+{$ENDIF}
+end;
+
+{ TSocket }
+
+{$IFDEF OLD_SOCKETS}
+constructor TSocketImpl.Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
+var stream : IThriftStream;
+begin
+ FClient := AClient;
+ FTimeout := ATimeout;
+ FOwnsClient := aOwnsClient;
+ stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
+ inherited Create( stream, stream);
+end;
+{$ELSE}
+constructor TSocketImpl.Create(const AClient: TSocket; aOwnsClient: Boolean);
+var stream : IThriftStream;
+begin
+ FClient := AClient;
+ FTimeout := AClient.RecvTimeout;
+ FOwnsClient := aOwnsClient;
+ stream := TTcpSocketStreamImpl.Create(FClient, FTimeout);
+ inherited Create(stream, stream);
+end;
+{$ENDIF}
+
+{$IFDEF OLD_SOCKETS}
+constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
+{$ELSE}
+constructor TSocketImpl.Create(const AHost: string; APort: Integer; ATimeout: Longword);
+{$ENDIF}
+begin
+ inherited Create(nil,nil);
+ FHost := AHost;
+ FPort := APort;
+ FTimeout := ATimeout;
+ InitSocket;
+end;
+
+destructor TSocketImpl.Destroy;
+begin
+ if FOwnsClient
+ then FreeAndNil( FClient);
+ inherited;
+end;
+
+procedure TSocketImpl.Close;
+begin
+ inherited Close;
+
+ FInputStream := nil;
+ FOutputStream := nil;
+
+ if FOwnsClient
+ then FreeAndNil( FClient)
+ else FClient := nil;
+end;
+
+function TSocketImpl.GetIsOpen: Boolean;
+begin
+{$IFDEF OLD_SOCKETS}
+ Result := (FClient <> nil) and FClient.Connected;
+{$ELSE}
+ Result := (FClient <> nil) and FClient.IsOpen
+{$ENDIF}
+end;
+
+procedure TSocketImpl.InitSocket;
+var
+ stream : IThriftStream;
+begin
+ if FOwnsClient
+ then FreeAndNil( FClient)
+ else FClient := nil;
+
+{$IFDEF OLD_SOCKETS}
+ FClient := TTcpClient.Create( nil);
+{$ELSE}
+ FClient := TSocket.Create(FHost, FPort);
+{$ENDIF}
+ FOwnsClient := True;
+
+ stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
+ FInputStream := stream;
+ FOutputStream := stream;
+end;
+
+procedure TSocketImpl.Open;
+begin
+ if IsOpen then begin
+ raise TTransportExceptionAlreadyOpen.Create('Socket already connected');
+ end;
+
+ if FHost = '' then begin
+ raise TTransportExceptionNotOpen.Create('Cannot open null host');
+ end;
+
+ if Port <= 0 then begin
+ raise TTransportExceptionNotOpen.Create('Cannot open without port');
+ end;
+
+ if FClient = nil
+ then InitSocket;
+
+{$IFDEF OLD_SOCKETS}
+ FClient.RemoteHost := TSocketHost( Host);
+ FClient.RemotePort := TSocketPort( IntToStr( Port));
+ FClient.Connect;
+{$ELSE}
+ FClient.Open;
+{$ENDIF}
+
+ FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
+ FOutputStream := FInputStream;
+end;
+
+{ TBufferedStream }
+
+procedure TBufferedStreamImpl.Close;
+begin
+ Flush;
+ FStream := nil;
+
+ FReadBuffer.Free;
+ FReadBuffer := nil;
+
+ FWriteBuffer.Free;
+ FWriteBuffer := nil;
+end;
+
+constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);
+begin
+ inherited Create;
+ FStream := AStream;
+ FBufSize := ABufSize;
+ FReadBuffer := TMemoryStream.Create;
+ FWriteBuffer := TMemoryStream.Create;
+end;
+
+destructor TBufferedStreamImpl.Destroy;
+begin
+ Close;
+ inherited;
+end;
+
+procedure TBufferedStreamImpl.Flush;
+var
+ buf : TBytes;
+ len : Integer;
+begin
+ if IsOpen then begin
+ len := FWriteBuffer.Size;
+ if len > 0 then begin
+ SetLength( buf, len );
+ FWriteBuffer.Position := 0;
+ FWriteBuffer.Read( Pointer(@buf[0])^, len );
+ FStream.Write( buf, 0, len );
+ end;
+ FWriteBuffer.Clear;
+ end;
+end;
+
+function TBufferedStreamImpl.IsOpen: Boolean;
+begin
+ Result := (FWriteBuffer <> nil)
+ and (FReadBuffer <> nil)
+ and (FStream <> nil)
+ and FStream.IsOpen;
+end;
+
+procedure TBufferedStreamImpl.Open;
+begin
+ FStream.Open;
+end;
+
+function TBufferedStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
+var
+ nRead : Integer;
+ tempbuf : TBytes;
+ pTmp : PByte;
+begin
+ inherited;
+ Result := 0;
+
+ if IsOpen then begin
+ while count > 0 do begin
+
+ if FReadBuffer.Position >= FReadBuffer.Size then begin
+ FReadBuffer.Clear;
+ SetLength( tempbuf, FBufSize);
+ nRead := FStream.Read( tempbuf, 0, FBufSize );
+ if nRead = 0 then Break; // avoid infinite loop
+
+ FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
+ FReadBuffer.Position := 0;
+ end;
+
+ if FReadBuffer.Position < FReadBuffer.Size then begin
+ nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
+ pTmp := pBuf;
+ Inc( pTmp, offset);
+ Inc( Result, FReadBuffer.Read( pTmp^, nRead));
+ Dec( count, nRead);
+ Inc( offset, nRead);
+ end;
+ end;
+ end;
+end;
+
+function TBufferedStreamImpl.ToArray: TBytes;
+var len : Integer;
+begin
+ len := 0;
+
+ if IsOpen then begin
+ len := FReadBuffer.Size;
+ end;
+
+ SetLength( Result, len);
+
+ if len > 0 then begin
+ FReadBuffer.Position := 0;
+ FReadBuffer.Read( Pointer(@Result[0])^, len );
+ end;
+end;
+
+procedure TBufferedStreamImpl.Write( const pBuf : Pointer; offset: Integer; count: Integer);
+var pTmp : PByte;
+begin
+ inherited;
+ if count > 0 then begin
+ if IsOpen then begin
+ pTmp := pBuf;
+ Inc( pTmp, offset);
+ FWriteBuffer.Write( pTmp^, count );
+ if FWriteBuffer.Size > FBufSize then begin
+ Flush;
+ end;
+ end;
+ end;
+end;
+
+{ TStreamTransportImpl }
+
+constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
+begin
+ inherited Create;
+ FInputStream := AInputStream;
+ FOutputStream := AOutputStream;
+end;
+
+destructor TStreamTransportImpl.Destroy;
+begin
+ FInputStream := nil;
+ FOutputStream := nil;
+ inherited;
+end;
+
+procedure TStreamTransportImpl.Close;
+begin
+ FInputStream := nil;
+ FOutputStream := nil;
+end;
+
+procedure TStreamTransportImpl.Flush;
+begin
+ if FOutputStream = nil then begin
+ raise TTransportExceptionNotOpen.Create('Cannot flush null outputstream' );
+ end;
+
+ FOutputStream.Flush;
+end;
+
+function TStreamTransportImpl.GetInputStream: IThriftStream;
+begin
+ Result := FInputStream;
+end;
+
+function TStreamTransportImpl.GetIsOpen: Boolean;
+begin
+ Result := True;
+end;
+
+function TStreamTransportImpl.GetOutputStream: IThriftStream;
+begin
+ Result := FOutputStream;
+end;
+
+procedure TStreamTransportImpl.Open;
+begin
+
+end;
+
+function TStreamTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
+begin
+ if FInputStream = nil then begin
+ raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
+ end;
+
+ Result := FInputStream.Read( pBuf,buflen, off, len );
+end;
+
+procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
+begin
+ if FOutputStream = nil then begin
+ raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' );
+ end;
+
+ FOutputStream.Write( pBuf, off, len );
+end;
+
+{ TBufferedTransportImpl }
+
+constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);
+begin
+ //no inherited;
+ Create( ATransport, 1024 );
+end;
+
+constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport; ABufSize: Integer);
+begin
+ inherited Create;
+ FTransport := ATransport;
+ FBufSize := ABufSize;
+ InitBuffers;
+end;
+
+procedure TBufferedTransportImpl.Close;
+begin
+ FTransport.Close;
+ FInputBuffer := nil;
+ FOutputBuffer := nil;
+end;
+
+procedure TBufferedTransportImpl.Flush;
+begin
+ if FOutputBuffer <> nil then begin
+ FOutputBuffer.Flush;
+ end;
+end;
+
+function TBufferedTransportImpl.GetIsOpen: Boolean;
+begin
+ Result := FTransport.IsOpen;
+end;
+
+function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
+begin
+ Result := FTransport;
+end;
+
+procedure TBufferedTransportImpl.InitBuffers;
+begin
+ if FTransport.InputStream <> nil then begin
+ FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
+ end;
+ if FTransport.OutputStream <> nil then begin
+ FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
+ end;
+end;
+
+procedure TBufferedTransportImpl.Open;
+begin
+ FTransport.Open;
+ InitBuffers; // we need to get the buffers to match FTransport substreams again
+end;
+
+function TBufferedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
+begin
+ Result := 0;
+ if FInputBuffer <> nil then begin
+ Result := FInputBuffer.Read( pBuf,buflen, off, len );
+ end;
+end;
+
+procedure TBufferedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
+begin
+ if FOutputBuffer <> nil then begin
+ FOutputBuffer.Write( pBuf, off, len );
+ end;
+end;
+
+{ TFramedTransportImpl }
+
+{$IFDEF HAVE_CLASS_CTOR}
+class constructor TFramedTransportImpl.Create;
+begin
+ SetLength( FHeader_Dummy, FHeaderSize);
+ FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);
+end;
+{$ELSE}
+procedure TFramedTransportImpl_Initialize;
+begin
+ SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);
+ FillChar( TFramedTransportImpl.FHeader_Dummy[0],
+ Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);
+end;
+{$ENDIF}
+
+constructor TFramedTransportImpl.Create;
+begin
+ inherited Create;
+ InitWriteBuffer;
+end;
+
+procedure TFramedTransportImpl.Close;
+begin
+ FTransport.Close;
+end;
+
+constructor TFramedTransportImpl.Create( const ATrans: ITransport);
+begin
+ inherited Create;
+ InitWriteBuffer;
+ FTransport := ATrans;
+end;
+
+destructor TFramedTransportImpl.Destroy;
+begin
+ FWriteBuffer.Free;
+ FReadBuffer.Free;
+ inherited;
+end;
+
+procedure TFramedTransportImpl.Flush;
+var
+ buf : TBytes;
+ len : Integer;
+ data_len : Integer;
+
+begin
+ len := FWriteBuffer.Size;
+ SetLength( buf, len);
+ if len > 0 then begin
+ System.Move( FWriteBuffer.Memory^, buf[0], len );
+ end;
+
+ data_len := len - FHeaderSize;
+ if (data_len < 0) then begin
+ raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: data_len < 0' );
+ end;
+
+ InitWriteBuffer;
+
+ buf[0] := Byte($FF and (data_len shr 24));
+ buf[1] := Byte($FF and (data_len shr 16));
+ buf[2] := Byte($FF and (data_len shr 8));
+ buf[3] := Byte($FF and data_len);
+
+ FTransport.Write( buf, 0, len );
+ FTransport.Flush;
+end;
+
+function TFramedTransportImpl.GetIsOpen: Boolean;
+begin
+ Result := FTransport.IsOpen;
+end;
+
+type
+ TAccessMemoryStream = class(TMemoryStream)
+ end;
+
+procedure TFramedTransportImpl.InitWriteBuffer;
+begin
+ FWriteBuffer.Free;
+ FWriteBuffer := TMemoryStream.Create;
+ TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
+ FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);
+end;
+
+procedure TFramedTransportImpl.Open;
+begin
+ FTransport.Open;
+end;
+
+function TFramedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
+var pTmp : PByte;
+begin
+ if len > (buflen-off)
+ then len := buflen-off;
+
+ pTmp := pBuf;
+ Inc( pTmp, off);
+
+ if (FReadBuffer <> nil) and (len > 0) then begin
+ result := FReadBuffer.Read( pTmp^, len);
+ if result > 0 then begin
+ Exit;
+ end;
+ end;
+
+ ReadFrame;
+ if len > 0
+ then Result := FReadBuffer.Read( pTmp^, len)
+ else Result := 0;
+end;
+
+procedure TFramedTransportImpl.ReadFrame;
+var
+ i32rd : TBytes;
+ size : Integer;
+ buff : TBytes;
+begin
+ SetLength( i32rd, FHeaderSize );
+ FTransport.ReadAll( i32rd, 0, FHeaderSize);
+ size :=
+ ((i32rd[0] and $FF) shl 24) or
+ ((i32rd[1] and $FF) shl 16) or
+ ((i32rd[2] and $FF) shl 8) or
+ (i32rd[3] and $FF);
+ SetLength( buff, size );
+ FTransport.ReadAll( buff, 0, size );
+ FReadBuffer.Free;
+ FReadBuffer := TMemoryStream.Create;
+ if Length(buff) > 0
+ then FReadBuffer.Write( Pointer(@buff[0])^, size );
+ FReadBuffer.Position := 0;
+end;
+
+procedure TFramedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
+var pTmp : PByte;
+begin
+ if len > 0 then begin
+ pTmp := pBuf;
+ Inc( pTmp, off);
+
+ FWriteBuffer.Write( pTmp^, len );
+ end;
+end;
+
+{ TFramedTransport.TFactory }
+
+function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;
+begin
+ Result := TFramedTransportImpl.Create( ATrans );
+end;
+
+{ TTcpSocketStreamImpl }
+
+procedure TTcpSocketStreamImpl.Close;
+begin
+ FTcpClient.Close;
+end;
+
+{$IFDEF OLD_SOCKETS}
+constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer);
+begin
+ inherited Create;
+ FTcpClient := ATcpClient;
+ FTimeout := aTimeout;
+end;
+{$ELSE}
+constructor TTcpSocketStreamImpl.Create( const ATcpClient: TSocket; const aTimeout : Longword);
+begin
+ inherited Create;
+ FTcpClient := ATcpClient;
+ if aTimeout = 0 then
+ FTcpClient.RecvTimeout := SLEEP_TIME
+ else
+ FTcpClient.RecvTimeout := aTimeout;
+ FTcpClient.SendTimeout := aTimeout;
+end;
+{$ENDIF}
+
+procedure TTcpSocketStreamImpl.Flush;
+begin
+
+end;
+
+function TTcpSocketStreamImpl.IsOpen: Boolean;
+begin
+{$IFDEF OLD_SOCKETS}
+ Result := FTcpClient.Active;
+{$ELSE}
+ Result := FTcpClient.IsOpen;
+{$ENDIF}
+end;
+
+procedure TTcpSocketStreamImpl.Open;
+begin
+ FTcpClient.Open;
+end;
+
+
+{$IFDEF OLD_SOCKETS}
+function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
+ TimeOut: Integer; var wsaError : Integer): Integer;
+var
+ ReadFds: TFDset;
+ ReadFdsptr: PFDset;
+ WriteFds: TFDset;
+ WriteFdsptr: PFDset;
+ ExceptFds: TFDset;
+ ExceptFdsptr: PFDset;
+ tv: timeval;
+ Timeptr: PTimeval;
+ socket : TSocket;
+begin
+ if not FTcpClient.Active then begin
+ wsaError := WSAEINVAL;
+ Exit( SOCKET_ERROR);
+ end;
+
+ socket := FTcpClient.Handle;
+
+ if Assigned(ReadReady) then begin
+ ReadFdsptr := @ReadFds;
+ FD_ZERO(ReadFds);
+ FD_SET(socket, ReadFds);
+ end
+ else begin
+ ReadFdsptr := nil;
+ end;
+
+ if Assigned(WriteReady) then begin
+ WriteFdsptr := @WriteFds;
+ FD_ZERO(WriteFds);
+ FD_SET(socket, WriteFds);
+ end
+ else begin
+ WriteFdsptr := nil;
+ end;
+
+ if Assigned(ExceptFlag) then begin
+ ExceptFdsptr := @ExceptFds;
+ FD_ZERO(ExceptFds);
+ FD_SET(socket, ExceptFds);
+ end
+ else begin
+ ExceptFdsptr := nil;
+ end;
+
+ if TimeOut >= 0 then begin
+ tv.tv_sec := TimeOut div 1000;
+ tv.tv_usec := 1000 * (TimeOut mod 1000);
+ Timeptr := @tv;
+ end
+ else begin
+ Timeptr := nil; // wait forever
+ end;
+
+ wsaError := 0;
+ try
+ {$IFDEF MSWINDOWS}
+ {$IFDEF OLD_UNIT_NAMES}
+ result := WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
+ {$ELSE}
+ result := Winapi.WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
+ {$ENDIF}
+ {$ENDIF}
+ {$IFDEF LINUX}
+ result := Libc.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
+ {$ENDIF}
+
+ if result = SOCKET_ERROR
+ then wsaError := WSAGetLastError;
+
+ except
+ result := SOCKET_ERROR;
+ end;
+
+ if Assigned(ReadReady) then
+ ReadReady^ := FD_ISSET(socket, ReadFds);
+
+ if Assigned(WriteReady) then
+ WriteReady^ := FD_ISSET(socket, WriteFds);
+
+ if Assigned(ExceptFlag) then
+ ExceptFlag^ := FD_ISSET(socket, ExceptFds);
+end;
+{$ENDIF}
+
+{$IFDEF OLD_SOCKETS}
+function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
+ DesiredBytes : Integer;
+ var wsaError, bytesReady : Integer): TWaitForData;
+var bCanRead, bError : Boolean;
+ retval : Integer;
+const
+ MSG_PEEK = {$IFDEF OLD_UNIT_NAMES} WinSock.MSG_PEEK {$ELSE} Winapi.WinSock.MSG_PEEK {$ENDIF};
+begin
+ bytesReady := 0;
+
+ // The select function returns the total number of socket handles that are ready
+ // and contained in the fd_set structures, zero if the time limit expired,
+ // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
+ // WSAGetLastError can be used to retrieve a specific error code.
+ retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
+ if retval = SOCKET_ERROR
+ then Exit( TWaitForData.wfd_Error);
+ if (retval = 0) or not bCanRead
+ then Exit( TWaitForData.wfd_Timeout);
+
+ // recv() returns the number of bytes received, or -1 if an error occurred.
+ // The return value will be 0 when the peer has performed an orderly shutdown.
+
+ retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, MSG_PEEK);
+ if retval <= 0
+ then Exit( TWaitForData.wfd_Error);
+
+ // at least we have some data
+ bytesReady := Min( retval, DesiredBytes);
+ result := TWaitForData.wfd_HaveData;
+end;
+{$ENDIF}
+
+{$IFDEF OLD_SOCKETS}
+function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
+// old sockets version
+var wfd : TWaitForData;
+ wsaError,
+ msecs : Integer;
+ nBytes : Integer;
+ pTmp : PByte;
+begin
+ inherited;
+
+ if FTimeout > 0
+ then msecs := FTimeout
+ else msecs := DEFAULT_THRIFT_TIMEOUT;
+
+ result := 0;
+ pTmp := pBuf;
+ Inc( pTmp, offset);
+ while count > 0 do begin
+
+ while TRUE do begin
+ wfd := WaitForData( msecs, pTmp, count, wsaError, nBytes);
+ case wfd of
+ TWaitForData.wfd_Error : Exit;
+ TWaitForData.wfd_HaveData : Break;
+ TWaitForData.wfd_Timeout : begin
+ if (FTimeout = 0)
+ then Exit
+ else begin
+ raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError)));
+
+ end;
+ end;
+ else
+ ASSERT( FALSE);
+ end;
+ end;
+
+ // reduce the timeout once we got data
+ if FTimeout > 0
+ then msecs := FTimeout div 10
+ else msecs := DEFAULT_THRIFT_TIMEOUT div 10;
+ msecs := Max( msecs, 200);
+
+ ASSERT( nBytes <= count);
+ nBytes := FTcpClient.ReceiveBuf( pTmp^, nBytes);
+ Inc( pTmp, nBytes);
+ Dec( count, nBytes);
+ Inc( result, nBytes);
+ end;
+end;
+
+function TTcpSocketStreamImpl.ToArray: TBytes;
+// old sockets version
+var len : Integer;
+begin
+ len := 0;
+ if IsOpen then begin
+ len := FTcpClient.BytesReceived;
+ end;
+
+ SetLength( Result, len );
+
+ if len > 0 then begin
+ FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
+ end;
+end;
+
+procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
+// old sockets version
+var bCanWrite, bError : Boolean;
+ retval, wsaError : Integer;
+ pTmp : PByte;
+begin
+ inherited;
+
+ if not FTcpClient.Active
+ then raise TTransportExceptionNotOpen.Create('not open');
+
+ // The select function returns the total number of socket handles that are ready
+ // and contained in the fd_set structures, zero if the time limit expired,
+ // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
+ // WSAGetLastError can be used to retrieve a specific error code.
+ retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
+ if retval = SOCKET_ERROR
+ then raise TTransportExceptionUnknown.Create(SysErrorMessage(Cardinal(wsaError)));
+
+ if (retval = 0)
+ then raise TTransportExceptionTimedOut.Create('timed out');
+
+ if bError or not bCanWrite
+ then raise TTransportExceptionUnknown.Create('unknown error');
+
+ pTmp := pBuf;
+ Inc( pTmp, offset);
+ FTcpClient.SendBuf( pTmp^, count);
+end;
+
+{$ELSE}
+
+function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
+// new sockets version
+var nBytes : Integer;
+ pTmp : PByte;
+begin
+ inherited;
+
+ result := 0;
+ pTmp := pBuf;
+ Inc( pTmp, offset);
+ while count > 0 do begin
+ nBytes := FTcpClient.Read( pTmp^, count);
+ if nBytes = 0 then Exit;
+ Inc( pTmp, nBytes);
+ Dec( count, nBytes);
+ Inc( result, nBytes);
+ end;
+end;
+
+function TTcpSocketStreamImpl.ToArray: TBytes;
+// new sockets version
+var len : Integer;
+begin
+ len := 0;
+ try
+ if FTcpClient.Peek then
+ repeat
+ SetLength(Result, Length(Result) + 1024);
+ len := FTcpClient.Read(Result[Length(Result) - 1024], 1024);
+ until len < 1024;
+ except
+ on TTransportException do begin { don't allow default exceptions } end;
+ else raise;
+ end;
+ if len > 0 then
+ SetLength(Result, Length(Result) - 1024 + len);
+end;
+
+procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
+// new sockets version
+var pTmp : PByte;
+begin
+ inherited;
+
+ if not FTcpClient.IsOpen
+ then raise TTransportExceptionNotOpen.Create('not open');
+
+ pTmp := pBuf;
+ Inc( pTmp, offset);
+ FTcpClient.Write( pTmp^, count);
+end;
+
+{$ENDIF}
+
+
+{$IF CompilerVersion < 21.0}
+initialization
+begin
+ TFramedTransportImpl_Initialize;
+end;
+{$IFEND}
+
+
+end.