summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/thrift/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/jaegertracing/thrift/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java399
1 files changed, 399 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java b/src/jaegertracing/thrift/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java
new file mode 100644
index 000000000..382d978db
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.thrift;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+
+/**
+ * This class uses a single thread to set up non-blocking sockets to a set
+ * of remote servers (hostname and port pairs), and sends a same request to
+ * all these servers. It then fetches responses from servers.
+ *
+ * Parameters:
+ * int maxRecvBufBytesPerServer - an upper limit for receive buffer size
+ * per server (in byte). If a response from a server exceeds this limit, the
+ * client will not allocate memory or read response data for it.
+ *
+ * int fetchTimeoutSeconds - time limit for fetching responses from all
+ * servers (in second). After the timeout, the fetch job is stopped and
+ * available responses are returned.
+ *
+ * ByteBuffer requestBuf - request message that is sent to all servers.
+ *
+ * Output:
+ * Responses are stored in an array of ByteBuffers. Index of elements in
+ * this array corresponds to index of servers in the server list. Content in
+ * a ByteBuffer may be in one of the following forms:
+ * 1. First 4 bytes form an integer indicating length of following data,
+ * then followed by the data.
+ * 2. First 4 bytes form an integer indicating length of following data,
+ * then followed by nothing - this happens when the response data size
+ * exceeds maxRecvBufBytesPerServer, and the client will not read any
+ * response data.
+ * 3. No data in the ByteBuffer - this happens when the server does not
+ * return any response within fetchTimeoutSeconds.
+ *
+ * In some special cases (no servers are given, fetchTimeoutSeconds less
+ * than or equal to 0, requestBuf is null), the return is null.
+ *
+ * Note:
+ * It assumes all remote servers are TNonblockingServers and use
+ * TFramedTransport.
+ *
+ */
+public class TNonblockingMultiFetchClient {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(
+ TNonblockingMultiFetchClient.class.getName()
+ );
+
+ // if the size of the response msg exceeds this limit (in byte), we will
+ // not read the msg
+ private int maxRecvBufBytesPerServer;
+
+ // time limit for fetching data from all servers (in second)
+ private int fetchTimeoutSeconds;
+
+ // store request that will be sent to servers
+ private ByteBuffer requestBuf;
+ private ByteBuffer requestBufDuplication;
+
+ // a list of remote servers
+ private List<InetSocketAddress> servers;
+
+ // store fetch results
+ private TNonblockingMultiFetchStats stats;
+ private ByteBuffer[] recvBuf;
+
+ public TNonblockingMultiFetchClient(int maxRecvBufBytesPerServer,
+ int fetchTimeoutSeconds, ByteBuffer requestBuf,
+ List<InetSocketAddress> servers) {
+ this.maxRecvBufBytesPerServer = maxRecvBufBytesPerServer;
+ this.fetchTimeoutSeconds = fetchTimeoutSeconds;
+ this.requestBuf = requestBuf;
+ this.servers = servers;
+
+ stats = new TNonblockingMultiFetchStats();
+ recvBuf = null;
+ }
+
+ public synchronized int getMaxRecvBufBytesPerServer() {
+ return maxRecvBufBytesPerServer;
+ }
+
+ public synchronized int getFetchTimeoutSeconds() {
+ return fetchTimeoutSeconds;
+ }
+
+ /**
+ * return a duplication of requestBuf, so that requestBuf will not
+ * be modified by others.
+ */
+ public synchronized ByteBuffer getRequestBuf() {
+ if (requestBuf == null) {
+ return null;
+ } else {
+ if (requestBufDuplication == null) {
+ requestBufDuplication = requestBuf.duplicate();
+ }
+ return requestBufDuplication;
+ }
+ }
+
+ public synchronized List<InetSocketAddress> getServerList() {
+ if (servers == null) {
+ return null;
+ }
+ return Collections.unmodifiableList(servers);
+ }
+
+ public synchronized TNonblockingMultiFetchStats getFetchStats() {
+ return stats;
+ }
+
+ /**
+ * main entry function for fetching from servers
+ */
+ public synchronized ByteBuffer[] fetch() {
+ // clear previous results
+ recvBuf = null;
+ stats.clear();
+
+ if (servers == null || servers.size() == 0 ||
+ requestBuf == null || fetchTimeoutSeconds <= 0) {
+ return recvBuf;
+ }
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ MultiFetch multiFetch = new MultiFetch();
+ FutureTask<?> task = new FutureTask(multiFetch, null);
+ executor.execute(task);
+ try {
+ task.get(fetchTimeoutSeconds, TimeUnit.SECONDS);
+ } catch(InterruptedException ie) {
+ // attempt to cancel execution of the task.
+ task.cancel(true);
+ LOGGER.error("interrupted during fetch: "+ie.toString());
+ } catch(ExecutionException ee) {
+ // attempt to cancel execution of the task.
+ task.cancel(true);
+ LOGGER.error("exception during fetch: "+ee.toString());
+ } catch(TimeoutException te) {
+ // attempt to cancel execution of the task.
+ task.cancel(true);
+ LOGGER.error("timeout for fetch: "+te.toString());
+ }
+
+ executor.shutdownNow();
+ multiFetch.close();
+ return recvBuf;
+ }
+
+ /**
+ * Private class that does real fetch job.
+ * Users are not allowed to directly use this class, as its run()
+ * function may run forever.
+ */
+ private class MultiFetch implements Runnable {
+ private Selector selector;
+
+ /**
+ * main entry function for fetching.
+ *
+ * Server responses are stored in TNonblocingMultiFetchClient.recvBuf,
+ * and fetch statistics is in TNonblockingMultiFetchClient.stats.
+ *
+ * Sanity check for parameters has been done in
+ * TNonblockingMultiFetchClient before calling this function.
+ */
+ public void run() {
+ long t1 = System.currentTimeMillis();
+
+ int numTotalServers = servers.size();
+ stats.setNumTotalServers(numTotalServers);
+
+ // buffer for receiving response from servers
+ recvBuf = new ByteBuffer[numTotalServers];
+ // buffer for sending request
+ ByteBuffer sendBuf[] = new ByteBuffer[numTotalServers];
+ long numBytesRead[] = new long[numTotalServers];
+ int frameSize[] = new int[numTotalServers];
+ boolean hasReadFrameSize[] = new boolean[numTotalServers];
+
+ try {
+ selector = Selector.open();
+ } catch (IOException e) {
+ LOGGER.error("selector opens error: "+e.toString());
+ return;
+ }
+
+ for (int i = 0; i < numTotalServers; i++) {
+ // create buffer to send request to server.
+ sendBuf[i] = requestBuf.duplicate();
+ // create buffer to read response's frame size from server
+ recvBuf[i] = ByteBuffer.allocate(4);
+ stats.incTotalRecvBufBytes(4);
+
+ InetSocketAddress server = servers.get(i);
+ SocketChannel s = null;
+ SelectionKey key = null;
+ try {
+ s = SocketChannel.open();
+ s.configureBlocking(false);
+ // now this method is non-blocking
+ s.connect(server);
+ key = s.register(selector, s.validOps());
+ // attach index of the key
+ key.attach(i);
+ } catch (Exception e) {
+ stats.incNumConnectErrorServers();
+ String err = String.format("set up socket to server %s error: %s",
+ server.toString(), e.toString());
+ LOGGER.error(err);
+ // free resource
+ if (s != null) {
+ try {s.close();} catch (Exception ex) {}
+ }
+ if (key != null) {
+ key.cancel();
+ }
+ }
+ }
+
+ // wait for events
+ while (stats.getNumReadCompletedServers() +
+ stats.getNumConnectErrorServers() < stats.getNumTotalServers()) {
+ // if the thread is interrupted (e.g., task is cancelled)
+ if (Thread.currentThread().isInterrupted()) {
+ return;
+ }
+
+ try{
+ selector.select();
+ } catch (Exception e) {
+ LOGGER.error("selector selects error: "+e.toString());
+ continue;
+ }
+
+ Iterator<SelectionKey> it = selector.selectedKeys().iterator();
+ while (it.hasNext()) {
+ SelectionKey selKey = it.next();
+ it.remove();
+
+ // get previously attached index
+ int index = (Integer)selKey.attachment();
+
+ if (selKey.isValid() && selKey.isConnectable()) {
+ // if this socket throws an exception (e.g., connection refused),
+ // print error msg and skip it.
+ try {
+ SocketChannel sChannel = (SocketChannel)selKey.channel();
+ sChannel.finishConnect();
+ } catch (Exception e) {
+ stats.incNumConnectErrorServers();
+ String err = String.format("socket %d connects to server %s " +
+ "error: %s",
+ index, servers.get(index).toString(), e.toString());
+ LOGGER.error(err);
+ }
+ }
+
+ if (selKey.isValid() && selKey.isWritable()) {
+ if (sendBuf[index].hasRemaining()) {
+ // if this socket throws an exception, print error msg and
+ // skip it.
+ try {
+ SocketChannel sChannel = (SocketChannel)selKey.channel();
+ sChannel.write(sendBuf[index]);
+ } catch (Exception e) {
+ String err = String.format("socket %d writes to server %s " +
+ "error: %s",
+ index, servers.get(index).toString(), e.toString());
+ LOGGER.error(err);
+ }
+ }
+ }
+
+ if (selKey.isValid() && selKey.isReadable()) {
+ // if this socket throws an exception, print error msg and
+ // skip it.
+ try {
+ SocketChannel sChannel = (SocketChannel)selKey.channel();
+ int bytesRead = sChannel.read(recvBuf[index]);
+
+ if (bytesRead > 0) {
+ numBytesRead[index] += bytesRead;
+
+ if (!hasReadFrameSize[index] &&
+ recvBuf[index].remaining()==0) {
+ // if the frame size has been read completely, then prepare
+ // to read the actual frame.
+ frameSize[index] = recvBuf[index].getInt(0);
+
+ if (frameSize[index] <= 0) {
+ stats.incNumInvalidFrameSize();
+ String err = String.format("Read an invalid frame size %d"
+ + " from %s. Does the server use TFramedTransport? ",
+ frameSize[index], servers.get(index).toString());
+ LOGGER.error(err);
+ sChannel.close();
+ continue;
+ }
+
+ if (frameSize[index] + 4 > stats.getMaxResponseBytes()) {
+ stats.setMaxResponseBytes(frameSize[index]+4);
+ }
+
+ if (frameSize[index] + 4 > maxRecvBufBytesPerServer) {
+ stats.incNumOverflowedRecvBuf();
+ String err = String.format("Read frame size %d from %s,"
+ + " total buffer size would exceed limit %d",
+ frameSize[index], servers.get(index).toString(),
+ maxRecvBufBytesPerServer);
+ LOGGER.error(err);
+ sChannel.close();
+ continue;
+ }
+
+ // reallocate buffer for actual frame data
+ recvBuf[index] = ByteBuffer.allocate(frameSize[index] + 4);
+ recvBuf[index].putInt(frameSize[index]);
+
+ stats.incTotalRecvBufBytes(frameSize[index]);
+ hasReadFrameSize[index] = true;
+ }
+
+ if (hasReadFrameSize[index] &&
+ numBytesRead[index] >= frameSize[index]+4) {
+ // has read all data
+ sChannel.close();
+ stats.incNumReadCompletedServers();
+ long t2 = System.currentTimeMillis();
+ stats.setReadTime(t2-t1);
+ }
+ }
+ } catch (Exception e) {
+ String err = String.format("socket %d reads from server %s " +
+ "error: %s",
+ index, servers.get(index).toString(), e.toString());
+ LOGGER.error(err);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * dispose any resource allocated
+ */
+ public void close() {
+ try {
+ if (selector.isOpen()) {
+ Iterator<SelectionKey> it = selector.keys().iterator();
+ while (it.hasNext()) {
+ SelectionKey selKey = it.next();
+ SocketChannel sChannel = (SocketChannel)selKey.channel();
+ sChannel.close();
+ }
+
+ selector.close();
+ }
+ } catch (IOException e) {
+ LOGGER.error("free resource error: "+e.toString());
+ }
+ }
+ }
+} \ No newline at end of file