diff options
Diffstat (limited to '')
-rw-r--r-- | src/jaegertracing/thrift/lib/delphi/src/Thrift.Processor.Multiplex.pas | 231 |
1 files changed, 231 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/delphi/src/Thrift.Processor.Multiplex.pas b/src/jaegertracing/thrift/lib/delphi/src/Thrift.Processor.Multiplex.pas new file mode 100644 index 000000000..8cf23db07 --- /dev/null +++ b/src/jaegertracing/thrift/lib/delphi/src/Thrift.Processor.Multiplex.pas @@ -0,0 +1,231 @@ +(* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *) + +unit Thrift.Processor.Multiplex; + + +interface + +uses + SysUtils, + Generics.Collections, + Thrift, + Thrift.Protocol, + Thrift.Protocol.Multiplex; + +{ TMultiplexedProcessor is a TProcessor allowing a single TServer to provide multiple services. + To do so, you instantiate the processor and then register additional processors with it, + as shown in the following example: + + + TMultiplexedProcessor processor = new TMultiplexedProcessor(); + + processor.registerProcessor( + "Calculator", + new Calculator.Processor(new CalculatorHandler())); + + processor.registerProcessor( + "WeatherReport", + new WeatherReport.Processor(new WeatherReportHandler())); + + TServerTransport t = new TServerSocket(9090); + TSimpleServer server = new TSimpleServer(processor, t); + + server.serve(); +} + + +type + IMultiplexedProcessor = interface( IProcessor) + ['{807F9D19-6CF4-4789-840E-93E87A12EB63}'] + // Register a service with this TMultiplexedProcessor. This allows us + // to broker requests to individual services by using the service name + // to select them at request time. + procedure RegisterProcessor( const serviceName : String; const processor : IProcessor; const asDefault : Boolean = FALSE); + end; + + + TMultiplexedProcessorImpl = class( TInterfacedObject, IMultiplexedProcessor, IProcessor) + private type + // Our goal was to work with any protocol. In order to do that, we needed + // to allow them to call readMessageBegin() and get a TMessage in exactly + // the standard format, without the service name prepended to TMessage.name. + TStoredMessageProtocol = class( TProtocolDecorator) + private + FMessageBegin : TThriftMessage; + public + constructor Create( const protocol : IProtocol; const aMsgBegin : TThriftMessage); + function ReadMessageBegin: TThriftMessage; override; + end; + + private + FServiceProcessorMap : TDictionary<String, IProcessor>; + FDefaultProcessor : IProcessor; + + procedure Error( const oprot : IProtocol; const msg : TThriftMessage; + extype : TApplicationExceptionSpecializedClass; const etxt : string); + + public + constructor Create; + destructor Destroy; override; + + // Register a service with this TMultiplexedProcessorImpl. This allows us + // to broker requests to individual services by using the service name + // to select them at request time. + procedure RegisterProcessor( const serviceName : String; const processor : IProcessor; const asDefault : Boolean = FALSE); + + { This implementation of process performs the following steps: + - Read the beginning of the message. + - Extract the service name from the message. + - Using the service name to locate the appropriate processor. + - Dispatch to the processor, with a decorated instance of TProtocol + that allows readMessageBegin() to return the original TMessage. + + An exception is thrown if the message type is not CALL or ONEWAY + or if the service is unknown (or not properly registered). + } + function Process( const iprot, oprot: IProtocol; const events : IProcessorEvents = nil): Boolean; + end; + + +implementation + +constructor TMultiplexedProcessorImpl.TStoredMessageProtocol.Create( const protocol : IProtocol; const aMsgBegin : TThriftMessage); +begin + inherited Create( protocol); + FMessageBegin := aMsgBegin; +end; + + +function TMultiplexedProcessorImpl.TStoredMessageProtocol.ReadMessageBegin: TThriftMessage; +begin + result := FMessageBegin; +end; + + +constructor TMultiplexedProcessorImpl.Create; +begin + inherited Create; + FServiceProcessorMap := TDictionary<string,IProcessor>.Create; +end; + + +destructor TMultiplexedProcessorImpl.Destroy; +begin + try + FreeAndNil( FServiceProcessorMap); + finally + inherited Destroy; + end; +end; + + +procedure TMultiplexedProcessorImpl.RegisterProcessor( const serviceName : String; const processor : IProcessor; const asDefault : Boolean); +begin + FServiceProcessorMap.Add( serviceName, processor); + + if asDefault then begin + if FDefaultProcessor = nil + then FDefaultProcessor := processor + else raise TApplicationExceptionInternalError.Create('Only one default service allowed'); + end; +end; + + +procedure TMultiplexedProcessorImpl.Error( const oprot : IProtocol; const msg : TThriftMessage; + extype : TApplicationExceptionSpecializedClass; + const etxt : string); +var appex : TApplicationException; + newMsg : TThriftMessage; +begin + appex := extype.Create(etxt); + try + Init( newMsg, msg.Name, TMessageType.Exception, msg.SeqID); + + oprot.WriteMessageBegin(newMsg); + appex.Write(oprot); + oprot.WriteMessageEnd(); + oprot.Transport.Flush(); + + finally + appex.Free; + end; +end; + + +function TMultiplexedProcessorImpl.Process(const iprot, oprot : IProtocol; const events : IProcessorEvents = nil): Boolean; +var msg, newMsg : TThriftMessage; + idx : Integer; + sService : string; + processor : IProcessor; + protocol : IProtocol; +const + ERROR_INVALID_MSGTYPE = 'Message must be "call" or "oneway"'; + ERROR_INCOMPATIBLE_PROT = 'No service name found in "%s". Client is expected to use TMultiplexProtocol.'; + ERROR_UNKNOWN_SERVICE = 'Service "%s" is not registered with MultiplexedProcessor'; +begin + // Use the actual underlying protocol (e.g. TBinaryProtocol) to read the message header. + // This pulls the message "off the wire", which we'll deal with at the end of this method. + msg := iprot.readMessageBegin(); + if not (msg.Type_ in [TMessageType.Call, TMessageType.Oneway]) then begin + Error( oprot, msg, + TApplicationExceptionInvalidMessageType, + ERROR_INVALID_MSGTYPE); + Exit( FALSE); + end; + + // Extract the service name + // use FDefaultProcessor as fallback if there is no separator + idx := Pos( TMultiplexedProtocol.SEPARATOR, msg.Name); + if idx > 0 then begin + + // Create a new TMessage, something that can be consumed by any TProtocol + sService := Copy( msg.Name, 1, idx-1); + if not FServiceProcessorMap.TryGetValue( sService, processor) + then begin + Error( oprot, msg, + TApplicationExceptionInternalError, + Format(ERROR_UNKNOWN_SERVICE,[sService])); + Exit( FALSE); + end; + + // Create a new TMessage, removing the service name + Inc( idx, Length(TMultiplexedProtocol.SEPARATOR)); + Init( newMsg, Copy( msg.Name, idx, MAXINT), msg.Type_, msg.SeqID); + + end + else if FDefaultProcessor <> nil then begin + processor := FDefaultProcessor; + newMsg := msg; // no need to change + + end + else begin + Error( oprot, msg, + TApplicationExceptionInvalidProtocol, + Format(ERROR_INCOMPATIBLE_PROT,[msg.Name])); + Exit( FALSE); + end; + + // Dispatch processing to the stored processor + protocol := TStoredMessageProtocol.Create( iprot, newMsg); + result := processor.process( protocol, oprot, events); +end; + + +end. |