diff options
Diffstat (limited to 'src/jaegertracing/thrift/contrib/Stomp/Thrift.Transport.STOMP.pas')
-rw-r--r-- | src/jaegertracing/thrift/contrib/Stomp/Thrift.Transport.STOMP.pas | 200 |
1 files changed, 200 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/contrib/Stomp/Thrift.Transport.STOMP.pas b/src/jaegertracing/thrift/contrib/Stomp/Thrift.Transport.STOMP.pas new file mode 100644 index 000000000..7dfb3763c --- /dev/null +++ b/src/jaegertracing/thrift/contrib/Stomp/Thrift.Transport.STOMP.pas @@ -0,0 +1,200 @@ +(* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *) + +unit Thrift.Transport.STOMP; + +interface + +uses + Classes,Windows, SysUtils, + Thrift, + Thrift.Transport, + Thrift.Protocol, + Thrift.Stream, + StompClient, + StompTypes; + +type + TStompTransportImpl = class( TStreamTransportImpl) + strict private + FData : TStringStream; + FServer : string; + FOutQueue : string; + FStompCli : IStompClient; + protected + function GetIsOpen: Boolean; override; + function Peek: Boolean; override; + public + constructor Create( const aServerAndPort, aOutQueue : string); + destructor Destroy; override; + + procedure Open(); override; + procedure Close(); override; + procedure Flush; override; + end; + + + TStompServerTransportImpl = class( TServerTransportImpl) + strict private + FServer : string; + FInQueue : string; + FClient : IStompClient; + protected + procedure Listen; override; + procedure Close; override; + function Accept( const fnAccepting: TProc): ITransport; override; + public + constructor Create( const aServerAndPort, aInQueue : string); + destructor Destroy; override; + end; + + +const + QUEUE_PREFIX = '/queue/'; + TOPIC_PREFIX = '/topic/'; + EXCHANGE_PREFIX = '/exchange/'; + + +implementation + + + +constructor TStompTransportImpl.Create( const aServerAndPort, aOutQueue : string); +var adapter : IThriftStream; +begin + FData := TStringStream.Create; + FServer := aServerAndPort; + FOutQueue := aOutQueue; + + adapter := TThriftStreamAdapterDelphi.Create( FData, FALSE); + inherited Create( nil, adapter); // output only +end; + + +destructor TStompTransportImpl.Destroy; +begin + inherited Destroy; + FreeAndNil( FData); + FStompCli := nil; +end; + + +function TStompTransportImpl.GetIsOpen: Boolean; +begin + result := (FStompCli <> nil); +end; + + +function TStompTransportImpl.Peek: Boolean; +begin + result := FALSE; // output only +end; + + +procedure TStompTransportImpl.Open; +begin + if FStompCli <> nil + then raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen, 'already open') + else FStompCli := StompUtils.NewStomp( FServer); +end; + + +procedure TStompTransportImpl.Close; +begin + FStompCli := nil; + FData.Clear; +end; + + +procedure TStompTransportImpl.Flush; +begin + if FStompCli = nil + then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'not open'); + + FStompCli.Send( FOutQueue, FData.DataString); + FData.Clear; +end; + + +//--- TStompServerTransportImpl -------------------------------------------- + + +constructor TStompServerTransportImpl.Create( const aServerAndPort, aInQueue : string); +begin + inherited Create; + FServer := aServerAndPort; + FInQueue := aInQueue; +end; + + +destructor TStompServerTransportImpl.Destroy; +begin + try + Close; + finally + inherited Destroy; + end; +end; + + +procedure TStompServerTransportImpl.Listen; +begin + FClient := StompUtils.NewStomp(FServer); + FClient.Subscribe( FInQueue); +end; + + +procedure TStompServerTransportImpl.Close; +begin + if FClient <> nil then begin + FClient.Unsubscribe( FInQueue); + FClient := nil; + end; +end; + + +function TStompServerTransportImpl.Accept( const fnAccepting: TProc): ITransport; +var frame : IStompFrame; + adapter : IThriftStream; + stream : TStringStream; +begin + if FClient = nil + then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, + 'Not connected.'); + + if Assigned(fnAccepting) + then fnAccepting(); + + try + frame := FClient.Receive(MAXINT); + if frame = nil then Exit(nil); + + stream := TStringStream.Create( frame.GetBody); + adapter := TThriftStreamAdapterDelphi.Create( stream, TRUE); + result := TStreamTransportImpl.Create( adapter, nil); + + except + on E: Exception + do raise TTransportException.Create( E.ToString ); + end; +end; + + +end. + |