diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/jaegertracing/thrift/contrib/Rebus/ServiceImpl | |
parent | Initial commit. (diff) | |
download | ceph-upstream/16.2.11+ds.tar.xz ceph-upstream/16.2.11+ds.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/jaegertracing/thrift/contrib/Rebus/ServiceImpl')
3 files changed, 335 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/contrib/Rebus/ServiceImpl/Both.cs b/src/jaegertracing/thrift/contrib/Rebus/ServiceImpl/Both.cs new file mode 100644 index 000000000..fba67ec15 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/Rebus/ServiceImpl/Both.cs @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + + +namespace RebusSample +{ + // generic data container for serialized Thrift calls + public class GenericThriftServiceCall + { + public byte[] rawBytes; + } + + // specific containers (one per Thrift service) to leverage Rebus' handler routing + public class MathRequestCall : GenericThriftServiceCall { } + public class MathResponseCall : GenericThriftServiceCall { } + +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/contrib/Rebus/ServiceImpl/Client.cs b/src/jaegertracing/thrift/contrib/Rebus/ServiceImpl/Client.cs new file mode 100644 index 000000000..2408041a9 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/Rebus/ServiceImpl/Client.cs @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Rebus; +using Rebus.Configuration; +using Rebus.Messages; +using Rebus.RabbitMQ; +using System; +using System.Collections.Generic; +using System.IO; +using Thrift.Protocol; +using Thrift.Transport; + +/* + * The client emits calls to BasicMathServers + * + * The client implements the BasicMathClient service. + * If the server has processed our request, we get the results back through this service + */ + +namespace RebusSample.Client +{ + + // handler to be registered with Rebus + class MathResponseCallHandler : IHandleMessages<MathResponseCall> + { + public void Handle(MathResponseCall message) + { + // Thrift protocol/transport stack + var stm = new MemoryStream(message.rawBytes); + var trns = new TStreamTransport(stm, null); + var prot = new TBinaryProtocol(trns); + + // create a processor and let him handle the call + var hndl = new MathResponsesHandler(); + var proc = new BasicMathClient.Processor(hndl); + proc.Process(prot, null); // oneway only + } + } + + + // serves incoming responses with calculation results + internal class MathResponsesHandler : BasicMathClient.Iface + { + public void FourResults(int added, int multiplied, int subtracted, int divided) + { + Console.WriteLine("added = {0}", added); + Console.WriteLine("multiplied= {0}", multiplied); + Console.WriteLine("subtracted = {0}", subtracted); + Console.WriteLine("divided = {0}", divided); + + PingAndDoAnotherCalculation(); + } + + + public void ThreeResults(int added, int multiplied, int subtracted) + { + Console.WriteLine("added = {0}", added); + Console.WriteLine("multiplied= {0}", multiplied); + Console.WriteLine("subtracted = {0}", subtracted); + Console.WriteLine("DIV/0 error during division"); + + PingAndDoAnotherCalculation(); + } + + + public void Pong(long value) + { + var latency = DateTime.Now.Ticks - value; + Console.WriteLine("Ping took {0} ms", new DateTime(latency).Millisecond); + } + + + private void PingAndDoAnotherCalculation() + { + var random = new Random(); + var client = new MathRequestClient("localhost"); + client.Ping(DateTime.Now.Ticks); + client.DoTheMath(random.Next(), random.Next()); + } + } + + + // provides the client-side interface for calculation requests + internal class MathRequestClient : BasicMathServer.Iface + { + private BuiltinContainerAdapter MQAdapter; + + + public MathRequestClient(string server) + { + MQAdapter = new BuiltinContainerAdapter(); + Configure.With(MQAdapter) + .Transport(t => t.UseRabbitMqInOneWayMode("amqp://" + server)) // we need send only + .MessageOwnership(o => o.FromRebusConfigurationSection()) + .CreateBus().Start(); + } + + + public void SerializeThriftCall(Action<BasicMathServer.Iface> action) + { + // Thrift protocol/transport stack + var stm = new MemoryStream(); + var trns = new TStreamTransport(null, stm); + var prot = new TBinaryProtocol(trns); + + // serialize the call into a bunch of bytes + var client = new BasicMathServer.Client(prot); + if( action != null) + action(client); + else + throw new ArgumentException("action must not be null"); + + // make sure everything is written to the MemoryStream + trns.Flush(); + + // send the message + var msg = new MathRequestCall() { rawBytes = stm.ToArray() }; + MQAdapter.Bus.Send(msg); + } + + + public void Ping(long value) + { + SerializeThriftCall(client => + { + client.Ping(value); + }); + } + + + public void DoTheMath( int arg1, int arg2) + { + SerializeThriftCall(client => + { + client.DoTheMath(arg1, arg2); + }); + } + } +} + diff --git a/src/jaegertracing/thrift/contrib/Rebus/ServiceImpl/Server.cs b/src/jaegertracing/thrift/contrib/Rebus/ServiceImpl/Server.cs new file mode 100644 index 000000000..149d513c6 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/Rebus/ServiceImpl/Server.cs @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Rebus; +using Rebus.Configuration; +using Rebus.Messages; +using Rebus.RabbitMQ; +using System; +using System.Collections.Generic; +using System.IO; +using Thrift.Protocol; +using Thrift.Transport; + +/* + * The server implements the BasicMathServer service . + * All results are sent back to the client via the BasicMathClient service + */ + + +namespace RebusSample.Server +{ + // handler to be registered with Rebus + class MathRequestCallHandler : IHandleMessages<MathRequestCall> + { + public void Handle(MathRequestCall message) + { + // Thrift protocol/transport stack + var stm = new MemoryStream(message.rawBytes); + var trns = new TStreamTransport(stm, null); + var prot = new TBinaryProtocol(trns); + + // create a processor and let him handle the call + var hndl = new MathRequestsHandler(); + var proc = new BasicMathServer.Processor(hndl); + proc.Process(prot, null); // oneway only + } + } + + + // serves incoming calculation requests + internal class MathRequestsHandler : BasicMathServer.Iface + { + public void Ping(long value) + { + var client = new MathResponseClient("localhost"); + client.Pong(value); + } + + + public void DoTheMath(int arg1, int arg2) + { + var client = new MathResponseClient("localhost"); + if( arg2 != 0) + client.FourResults( arg1+arg2, arg1*arg2, arg1-arg2, arg1/arg2); + else + client.ThreeResults( arg1+arg2, arg1*arg2, arg1-arg2); + } + } + + + // provides the client-side interface for calculation responses + internal class MathResponseClient : BasicMathClient.Iface + { + private BuiltinContainerAdapter MQAdapter; + + + public MathResponseClient(string server) + { + MQAdapter = new BuiltinContainerAdapter(); + Configure.With(MQAdapter) + .Transport(t => t.UseRabbitMqInOneWayMode("amqp://" + server)) // we need send only + .MessageOwnership(o => o.FromRebusConfigurationSection()) + .CreateBus().Start(); + } + + + public void SerializeThriftCall(Action<BasicMathClient.Iface> action) + { + // Thrift protocol/transport stack + var stm = new MemoryStream(); + var trns = new TStreamTransport(null, stm); + var prot = new TBinaryProtocol(trns); + + // serialize the call into a bunch of bytes + var client = new BasicMathClient.Client(prot); + if (action != null) + action(client); + else + throw new ArgumentException("action must not be null"); + + // make sure everything is written to the MemoryStream + trns.Flush(); + + // send the message + var msg = new MathResponseCall() { rawBytes = stm.ToArray() }; + MQAdapter.Bus.Send(msg); + } + + + public void Pong(long value) + { + SerializeThriftCall(client => + { + client.Pong(value); + }); + } + + + public void ThreeResults(int added, int multiplied, int suctracted) + { + SerializeThriftCall(client => + { + client.ThreeResults(added, multiplied, suctracted); + }); + } + + + public void FourResults(int added, int multiplied, int suctracted, int divided) + { + SerializeThriftCall(client => + { + client.FourResults(added, multiplied, suctracted, divided); + }); + } + } +} + |