diff options
Diffstat (limited to '')
-rw-r--r-- | src/jaegertracing/thrift/lib/perl/lib/Thrift/MultiplexedProcessor.pm | 133 |
1 files changed, 133 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/perl/lib/Thrift/MultiplexedProcessor.pm b/src/jaegertracing/thrift/lib/perl/lib/Thrift/MultiplexedProcessor.pm new file mode 100644 index 000000000..ae925d7cd --- /dev/null +++ b/src/jaegertracing/thrift/lib/perl/lib/Thrift/MultiplexedProcessor.pm @@ -0,0 +1,133 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +use 5.10.0; +use strict; +use warnings; + +use Thrift; +use Thrift::MessageType; +use Thrift::MultiplexedProtocol; +use Thrift::Protocol; +use Thrift::ProtocolDecorator; + +package Thrift::StoredMessageProtocol; +use base qw(Thrift::ProtocolDecorator); +use version 0.77; our $VERSION = version->declare("$Thrift::VERSION"); + +sub new { + my $classname = shift; + my $protocol = shift; + my $fname = shift; + my $mtype = shift; + my $rseqid = shift; + my $self = $classname->SUPER::new($protocol); + + $self->{fname} = $fname; + $self->{mtype} = $mtype; + $self->{rseqid} = $rseqid; + + return bless($self,$classname); +} + +sub readMessageBegin +{ + my $self = shift; + my $name = shift; + my $type = shift; + my $seqid = shift; + + $$name = $self->{fname}; + $$type = $self->{mtype}; + $$seqid = $self->{rseqid}; +} + +package Thrift::MultiplexedProcessor; +use version 0.77; our $VERSION = version->declare("$Thrift::VERSION"); + +sub new { + my $classname = shift; + my $self = {}; + + $self->{serviceProcessorMap} = {}; + $self->{defaultProcessor} = undef; + + return bless($self,$classname); +} + +sub defaultProcessor { + my $self = shift; + my $processor = shift; + + $self->{defaultProcessor} = $processor; +} + +sub registerProcessor { + my $self = shift; + my $serviceName = shift; + my $processor = shift; + + $self->{serviceProcessorMap}->{$serviceName} = $processor; +} + +sub process { + my $self = shift; + my $input = shift; + my $output = shift; + + # + # Use the actual underlying protocol (e.g. BinaryProtocol) to read the + # message header. This pulls the message "off the wire", which we'll + # deal with at the end of this method. + # + + my ($fname, $mtype, $rseqid); + $input->readMessageBegin(\$fname, \$mtype, \$rseqid); + + if ($mtype ne Thrift::TMessageType::CALL && $mtype ne Thrift::TMessageType::ONEWAY) { + die Thrift::TException->new('This should not have happened!?'); + } + + # Extract the service name and the new Message name. + if (index($fname, Thrift::MultiplexedProtocol::SEPARATOR) == -1) { + if (defined $self->{defaultProcessor}) { + return $self->{defaultProcessor}->process( + Thrift::StoredMessageProtocol->new($input, $fname, $mtype, $rseqid), $output + ); + } else { + die Thrift::TException->new("Service name not found in message name: {$fname} and no default processor defined. Did you " . + 'forget to use a MultiplexProtocol in your client?'); + } + } + + (my $serviceName, my $messageName) = split(':', $fname, 2); + + if (!exists($self->{serviceProcessorMap}->{$serviceName})) { + die Thrift::TException->new("Service name not found: {$serviceName}. Did you forget " . + 'to call registerProcessor()?'); + } + + # Dispatch processing to the stored processor + my $processor = $self->{serviceProcessorMap}->{$serviceName}; + return $processor->process( + Thrift::StoredMessageProtocol->new($input, $messageName, $mtype, $rseqid), $output + ); +} + +1; |