summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/thrift/contrib/Stomp/Thrift.Transport.STOMP.pas
diff options
context:
space:
mode:
Diffstat (limited to 'src/jaegertracing/thrift/contrib/Stomp/Thrift.Transport.STOMP.pas')
-rw-r--r--src/jaegertracing/thrift/contrib/Stomp/Thrift.Transport.STOMP.pas200
1 files changed, 200 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/contrib/Stomp/Thrift.Transport.STOMP.pas b/src/jaegertracing/thrift/contrib/Stomp/Thrift.Transport.STOMP.pas
new file mode 100644
index 000000000..7dfb3763c
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/Stomp/Thrift.Transport.STOMP.pas
@@ -0,0 +1,200 @@
+(*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *)
+
+unit Thrift.Transport.STOMP;
+
+interface
+
+uses
+ Classes,Windows, SysUtils,
+ Thrift,
+ Thrift.Transport,
+ Thrift.Protocol,
+ Thrift.Stream,
+ StompClient,
+ StompTypes;
+
+type
+ TStompTransportImpl = class( TStreamTransportImpl)
+ strict private
+ FData : TStringStream;
+ FServer : string;
+ FOutQueue : string;
+ FStompCli : IStompClient;
+ protected
+ function GetIsOpen: Boolean; override;
+ function Peek: Boolean; override;
+ public
+ constructor Create( const aServerAndPort, aOutQueue : string);
+ destructor Destroy; override;
+
+ procedure Open(); override;
+ procedure Close(); override;
+ procedure Flush; override;
+ end;
+
+
+ TStompServerTransportImpl = class( TServerTransportImpl)
+ strict private
+ FServer : string;
+ FInQueue : string;
+ FClient : IStompClient;
+ protected
+ procedure Listen; override;
+ procedure Close; override;
+ function Accept( const fnAccepting: TProc): ITransport; override;
+ public
+ constructor Create( const aServerAndPort, aInQueue : string);
+ destructor Destroy; override;
+ end;
+
+
+const
+ QUEUE_PREFIX = '/queue/';
+ TOPIC_PREFIX = '/topic/';
+ EXCHANGE_PREFIX = '/exchange/';
+
+
+implementation
+
+
+
+constructor TStompTransportImpl.Create( const aServerAndPort, aOutQueue : string);
+var adapter : IThriftStream;
+begin
+ FData := TStringStream.Create;
+ FServer := aServerAndPort;
+ FOutQueue := aOutQueue;
+
+ adapter := TThriftStreamAdapterDelphi.Create( FData, FALSE);
+ inherited Create( nil, adapter); // output only
+end;
+
+
+destructor TStompTransportImpl.Destroy;
+begin
+ inherited Destroy;
+ FreeAndNil( FData);
+ FStompCli := nil;
+end;
+
+
+function TStompTransportImpl.GetIsOpen: Boolean;
+begin
+ result := (FStompCli <> nil);
+end;
+
+
+function TStompTransportImpl.Peek: Boolean;
+begin
+ result := FALSE; // output only
+end;
+
+
+procedure TStompTransportImpl.Open;
+begin
+ if FStompCli <> nil
+ then raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen, 'already open')
+ else FStompCli := StompUtils.NewStomp( FServer);
+end;
+
+
+procedure TStompTransportImpl.Close;
+begin
+ FStompCli := nil;
+ FData.Clear;
+end;
+
+
+procedure TStompTransportImpl.Flush;
+begin
+ if FStompCli = nil
+ then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'not open');
+
+ FStompCli.Send( FOutQueue, FData.DataString);
+ FData.Clear;
+end;
+
+
+//--- TStompServerTransportImpl --------------------------------------------
+
+
+constructor TStompServerTransportImpl.Create( const aServerAndPort, aInQueue : string);
+begin
+ inherited Create;
+ FServer := aServerAndPort;
+ FInQueue := aInQueue;
+end;
+
+
+destructor TStompServerTransportImpl.Destroy;
+begin
+ try
+ Close;
+ finally
+ inherited Destroy;
+ end;
+end;
+
+
+procedure TStompServerTransportImpl.Listen;
+begin
+ FClient := StompUtils.NewStomp(FServer);
+ FClient.Subscribe( FInQueue);
+end;
+
+
+procedure TStompServerTransportImpl.Close;
+begin
+ if FClient <> nil then begin
+ FClient.Unsubscribe( FInQueue);
+ FClient := nil;
+ end;
+end;
+
+
+function TStompServerTransportImpl.Accept( const fnAccepting: TProc): ITransport;
+var frame : IStompFrame;
+ adapter : IThriftStream;
+ stream : TStringStream;
+begin
+ if FClient = nil
+ then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+ 'Not connected.');
+
+ if Assigned(fnAccepting)
+ then fnAccepting();
+
+ try
+ frame := FClient.Receive(MAXINT);
+ if frame = nil then Exit(nil);
+
+ stream := TStringStream.Create( frame.GetBody);
+ adapter := TThriftStreamAdapterDelphi.Create( stream, TRUE);
+ result := TStreamTransportImpl.Create( adapter, nil);
+
+ except
+ on E: Exception
+ do raise TTransportException.Create( E.ToString );
+ end;
+end;
+
+
+end.
+