summaryrefslogtreecommitdiffstats
path: root/t/helper/test-fsmonitor-client.c
blob: 54a4856c48c55af0af4e3ffe7cc3a62dce993842 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
/*
 * test-fsmonitor-client.c: client code to send commands/requests to
 * a `git fsmonitor--daemon` daemon.
 */

#include "test-tool.h"
#include "cache.h"
#include "parse-options.h"
#include "fsmonitor-ipc.h"
#include "thread-utils.h"
#include "trace2.h"

#ifndef HAVE_FSMONITOR_DAEMON_BACKEND
int cmd__fsmonitor_client(int argc, const char **argv)
{
	die("fsmonitor--daemon not available on this platform");
}
#else

/*
 * Read the `.git/index` to get the last token written to the
 * FSMonitor Index Extension.
 */
static const char *get_token_from_index(void)
{
	struct index_state *istate = the_repository->index;

	if (do_read_index(istate, the_repository->index_file, 0) < 0)
		die("unable to read index file");
	if (!istate->fsmonitor_last_update)
		die("index file does not have fsmonitor extension");

	return istate->fsmonitor_last_update;
}

/*
 * Send an IPC query to a `git-fsmonitor--daemon` daemon and
 * ask for the changes since the given token or from the last
 * token in the index extension.
 *
 * This will implicitly start a daemon process if necessary.  The
 * daemon process will persist after we exit.
 */
static int do_send_query(const char *token)
{
	struct strbuf answer = STRBUF_INIT;
	int ret;

	if (!token || !*token)
		token = get_token_from_index();

	ret = fsmonitor_ipc__send_query(token, &answer);
	if (ret < 0)
		die("could not query fsmonitor--daemon");

	write_in_full(1, answer.buf, answer.len);
	strbuf_release(&answer);

	return 0;
}

/*
 * Send a "flush" command to the `git-fsmonitor--daemon` (if running)
 * and tell it to flush its cache.
 *
 * This feature is primarily used by the test suite to simulate a loss of
 * sync with the filesystem where we miss kernel events.
 */
static int do_send_flush(void)
{
	struct strbuf answer = STRBUF_INIT;
	int ret;

	ret = fsmonitor_ipc__send_command("flush", &answer);
	if (ret)
		return ret;

	write_in_full(1, answer.buf, answer.len);
	strbuf_release(&answer);

	return 0;
}

struct hammer_thread_data
{
	pthread_t pthread_id;
	int thread_nr;

	int nr_requests;
	const char *token;

	int sum_successful;
	int sum_errors;
};

static void *hammer_thread_proc(void *_hammer_thread_data)
{
	struct hammer_thread_data *data = _hammer_thread_data;
	struct strbuf answer = STRBUF_INIT;
	int k;
	int ret;

	trace2_thread_start("hammer");

	for (k = 0; k < data->nr_requests; k++) {
		strbuf_reset(&answer);

		ret = fsmonitor_ipc__send_query(data->token, &answer);
		if (ret < 0)
			data->sum_errors++;
		else
			data->sum_successful++;
	}

	strbuf_release(&answer);
	trace2_thread_exit();
	return NULL;
}

/*
 * Start a pool of client threads that will each send a series of
 * commands to the daemon.
 *
 * The goal is to overload the daemon with a sustained series of
 * concurrent requests.
 */
static int do_hammer(const char *token, int nr_threads, int nr_requests)
{
	struct hammer_thread_data *data = NULL;
	int k;
	int sum_join_errors = 0;
	int sum_commands = 0;
	int sum_errors = 0;

	if (!token || !*token)
		token = get_token_from_index();
	if (nr_threads < 1)
		nr_threads = 1;
	if (nr_requests < 1)
		nr_requests = 1;

	CALLOC_ARRAY(data, nr_threads);

	for (k = 0; k < nr_threads; k++) {
		struct hammer_thread_data *p = &data[k];
		p->thread_nr = k;
		p->nr_requests = nr_requests;
		p->token = token;

		if (pthread_create(&p->pthread_id, NULL, hammer_thread_proc, p)) {
			warning("failed to create thread[%d] skipping remainder", k);
			nr_threads = k;
			break;
		}
	}

	for (k = 0; k < nr_threads; k++) {
		struct hammer_thread_data *p = &data[k];

		if (pthread_join(p->pthread_id, NULL))
			sum_join_errors++;
		sum_commands += p->sum_successful;
		sum_errors += p->sum_errors;
	}

	fprintf(stderr, "HAMMER: [threads %d][requests %d] [ok %d][err %d][join %d]\n",
		nr_threads, nr_requests, sum_commands, sum_errors, sum_join_errors);

	free(data);

	/*
	 * Return an error if any of the _send_query requests failed.
	 * We don't care about thread create/join errors.
	 */
	return sum_errors > 0;
}

int cmd__fsmonitor_client(int argc, const char **argv)
{
	const char *subcmd;
	const char *token = NULL;
	int nr_threads = 1;
	int nr_requests = 1;

	const char * const fsmonitor_client_usage[] = {
		"test-tool fsmonitor-client query [<token>]",
		"test-tool fsmonitor-client flush",
		"test-tool fsmonitor-client hammer [<token>] [<threads>] [<requests>]",
		NULL,
	};

	struct option options[] = {
		OPT_STRING(0, "token", &token, "token",
			   "command token to send to the server"),

		OPT_INTEGER(0, "threads", &nr_threads, "number of client threads"),
		OPT_INTEGER(0, "requests", &nr_requests, "number of requests per thread"),

		OPT_END()
	};

	argc = parse_options(argc, argv, NULL, options, fsmonitor_client_usage, 0);

	if (argc != 1)
		usage_with_options(fsmonitor_client_usage, options);

	subcmd = argv[0];

	setup_git_directory();

	if (!strcmp(subcmd, "query"))
		return !!do_send_query(token);

	if (!strcmp(subcmd, "flush"))
		return !!do_send_flush();

	if (!strcmp(subcmd, "hammer"))
		return !!do_hammer(token, nr_threads, nr_requests);

	die("Unhandled subcommand: '%s'", subcmd);
}
#endif