summaryrefslogtreecommitdiffstats
path: root/test/threadtest5.c
blob: 6e6610ff66319638365206ebc0e4a0ef9db213e1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
/*
** 2021-05-12
**
** The author disclaims copyright to this source code.  In place of
** a legal notice, here is a blessing:
**
**    May you do good and not evil.
**    May you find forgiveness for yourself and forgive others.
**    May you share freely, never taking more than you give.
**
*************************************************************************
**
** Testing threading behavior when multiple database connections in separate
** threads of the same process are all talking to the same database file.
**
** For best results, ensure that SQLite is compiled with HAVE_USLEEP=1
**
** Only works on unix platforms.
**
** Usage:
**
**      ./threadtest5  ?DATABASE?
**
** If DATABASE is omitted, it defaults to using file:/mem?vfs=memdb.
*/
#include "sqlite3.h"
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdarg.h>

/* Name of the in-memory database */
static char *zDbName = 0;

/* True for debugging */
static int eVerbose = 0;

/* If rc is not SQLITE_OK, then print an error message and stop
** the test.
*/
static void error_out(int rc, const char *zCtx, int lineno){
  if( rc!=SQLITE_OK ){
    fprintf(stderr, "error %d at %d in \"%s\"\n", rc, lineno, zCtx);
    exit(-1);
  }
}

#if 0
/* Return the number of milliseconds since the Julian epoch (-4714-11-24).
*/
static sqlite3_int64 gettime(void){
  sqlite3_int64 tm;
  sqlite3_vfs *pVfs = sqlite3_vfs_find(0);
  pVfs->xCurrentTimeInt64(pVfs, &tm);
  return tm;
}
#endif

/* Run the SQL in the second argument.
*/
static int exec(
  sqlite3 *db,
  const char *zId,
  int lineno,
  const char *zFormat,
  ...
){
  int rc;
  va_list ap;
  char *zSql;
  va_start(ap, zFormat);
  zSql = sqlite3_vmprintf(zFormat, ap);
  va_end(ap);
  if( eVerbose){
    printf("%s:%d: [%s]\n", zId, lineno, zSql);
    fflush(stdout);
  }
  rc = sqlite3_exec(db, zSql, 0, 0, 0);
  if( rc && eVerbose ){
    printf("%s:%d: return-code %d\n", zId, lineno, rc);
    fflush(stdout);
  }
  sqlite3_free(zSql);
  return rc;
}

/* Generate a perpared statement from the input SQL
*/
static sqlite3_stmt *prepare(
  sqlite3 *db,
  const char *zId,
  int lineno,
  const char *zFormat,
  ...
){
  int rc;
  va_list ap;
  char *zSql;
  sqlite3_stmt *pStmt = 0;
  va_start(ap, zFormat);
  zSql = sqlite3_vmprintf(zFormat, ap);
  va_end(ap);
  if( eVerbose){
    printf("%s:%d: [%s]\n", zId, lineno, zSql);
    fflush(stdout);
  }

  rc = sqlite3_prepare_v2(db, zSql, -1, &pStmt, 0);
  if( rc ){
    printf("%s:%d: ERROR - %s\n", zId, lineno, sqlite3_errmsg(db));
    exit(-1);
  }
  sqlite3_free(zSql);
  return pStmt;
}

/*
** Wait for table zTable to exist in the schema.
*/
static void waitOnTable(sqlite3 *db, const char *zWorker, const char *zTable){
  while(1){
    int eFound = 0;
    sqlite3_stmt *q = prepare(db, zWorker, __LINE__,
             "SELECT 1 FROM sqlite_schema WHERE name=%Q", zTable);
    if( sqlite3_step(q)==SQLITE_ROW && sqlite3_column_int(q,0)!=0 ){
      eFound = 1;
    }
    sqlite3_finalize(q);
    if( eFound ) return;
    sqlite3_sleep(1);
  }
}

/*
** Return true if x is  a prime number
*/
static int isPrime(int x){
  int i;
  if( x<2 ) return 1;
  for(i=2; i*i<=x; i++){
    if( (x%i)==0 ) return 0;
  }
  return 1;
}

/* Each worker thread runs an instance of the following */
static void *worker(void *pArg){
  int rc;
  const char *zName = (const char*)pArg;
  sqlite3 *db = 0;

  if( eVerbose ){
    printf("%s: startup\n", zName);
    fflush(stdout);
  }

  rc = sqlite3_open(zDbName, &db);
  error_out(rc, "sqlite3_open", __LINE__);
  sqlite3_busy_timeout(db, 2000);

  while( 1 ){
    sqlite3_stmt *q1;
    int tid = -1;
    q1 = prepare(db, zName, __LINE__,
            "UPDATE task SET doneby=%Q"
            " WHERE tid=(SELECT tid FROM task WHERE doneby IS NULL LIMIT 1)"
            "RETURNING tid", zName
    );
    if( sqlite3_step(q1)==SQLITE_ROW ){
      tid = sqlite3_column_int(q1,0);
    }
    sqlite3_finalize(q1);
    if( tid<0 ) break;
    if( eVerbose ){
      printf("%s: starting task %d\n", zName, tid);
      fflush(stdout);
    }
    if( tid==1 ){
      exec(db, zName, __LINE__,
         "CREATE TABLE IF NOT EXISTS p1(x INTEGER PRIMARY KEY);"
      );
    }else if( tid>=2 && tid<=51 ){
      int a, b, i;
      waitOnTable(db, zName, "p1");
      a = (tid-2)*200 + 1;
      b = a+200;
      for(i=a; i<b; i++){
        if( isPrime(i) ){
          exec(db, zName, __LINE__,
              "INSERT INTO p1(x) VALUES(%d)", i);
        }
      }
    }else if( tid==52 ){
      exec(db, zName, __LINE__,
         "CREATE TABLE IF NOT EXISTS p2(x INTEGER PRIMARY KEY);"
         "WITH RECURSIVE"
         "  c(x) AS (VALUES(1) UNION ALL SELECT x+1 FROM c WHERE x<10000)"
         "INSERT INTO p2(x) SELECT x FROM c;"
      );
    }else if( tid>=53 && tid<=62 ){
      int a, b, i;
      waitOnTable(db, zName, "p2");
      a = (tid-53)*10 + 2;
      b = a+9;
      for(i=a; i<=b; i++){
        exec(db, zName, __LINE__,
          "DELETE FROM p2 WHERE x>%d AND (x %% %d)==0", i, i);
      }
    }
    if( eVerbose ){
      printf("%s: completed task %d\n", zName, tid);
      fflush(stdout);
    }
    sqlite3_sleep(1);
  }

  sqlite3_close(db);

  if( eVerbose ){
    printf("%s: exit\n", zName);
    fflush(stdout);
  }
  return 0;
}

/* Print a usage comment and die */
static void usage(const char *argv0){
  printf("Usage: %s [options]\n", argv0);
  printf(
    "  -num-workers N      Run N worker threads\n"
    "  -v                  Debugging output\n"
  );
  exit(1);
}

/* Maximum number of threads */
#define MX_WORKER 100

/*
** Main routine
*/
int main(int argc, char **argv){
  int i;
  int nWorker = 4;
  int rc;
  sqlite3 *db = 0;
  sqlite3_stmt *q;
  pthread_t aWorker[MX_WORKER];
  char aWorkerName[MX_WORKER][8];

  for(i=1; i<argc; i++){
    const char *zArg = argv[i];
    if( zArg[0]!='-' ){
      if( zDbName==0 ){
        zDbName = argv[i];
        continue;
      }
      printf("unknown argument: %s\n", zArg);
      usage(argv[0]);
    }
    if( zArg[1]=='-' ) zArg++;
    if( strcmp(zArg, "-v")==0 ){
      eVerbose = 1;
      continue;
    }
    if( strcmp(zArg, "-num-workers")==0 && i+1<argc ){
      nWorker = atoi(argv[++i]);
      if( nWorker<1 || nWorker>MX_WORKER ){
        printf("number of threads must be between 1 and %d\n", MX_WORKER);
        exit(1);
      }
      continue;
    }
    printf("unknown option: %s\n", argv[i]);
    usage(argv[0]);
  }
  if( zDbName==0 ) zDbName = "file:/mem?vfs=memdb";

  sqlite3_config(SQLITE_CONFIG_URI, (int)1);
  rc = sqlite3_open(zDbName, &db);
  error_out(rc, "sqlite3_open", __LINE__);

  rc = exec(db, "SETUP", __LINE__,
    "DROP TABLE IF EXISTS task;\n"
    "DROP TABLE IF EXISTS p1;\n"
    "DROP TABLE IF EXISTS p2;\n"
    "DROP TABLE IF EXISTS verify;\n"
    "CREATE TABLE IF NOT EXISTS task(\n"
    "  tid INTEGER PRIMARY KEY,\n"
    "  doneby TEXT\n"
    ");\n"
    "WITH RECURSIVE c(x) AS (VALUES(1) UNION ALL SELECT x+1 FROM c WHERE x<100)"
    "INSERT INTO task(tid) SELECT x FROM c;\n"
  );
  error_out(rc, "sqlite3_exec", __LINE__);

  for(i=0; i<nWorker; i++){
    sqlite3_snprintf(sizeof(aWorkerName[i]), aWorkerName[i],
             "W%02d", i);
    pthread_create(&aWorker[i], 0, worker, aWorkerName[i]);
  }
  for(i=0; i<nWorker; i++){
    pthread_join(aWorker[i], 0);
  }

  for(i=0; i<nWorker; i++){
    q = prepare(db, "MAIN", __LINE__,
          "SELECT group_concat(tid,',') FROM task WHERE doneby=%Q",
          aWorkerName[i]);
    if( sqlite3_step(q)==SQLITE_ROW ){
      printf("%s: %s\n", aWorkerName[i], sqlite3_column_text(q,0));
    }
    sqlite3_finalize(q);
  }
  q = prepare(db, "MAIN", __LINE__, "SELECT count(*) FROM p2");
  if( sqlite3_step(q)!=SQLITE_ROW || sqlite3_column_int(q,0)<10 ){
    printf("incorrect result\n");
    exit(-1);
  }
  sqlite3_finalize(q);
  q = prepare(db, "MAIN", __LINE__, "SELECT x FROM p1 EXCEPT SELECT x FROM p2");
  if( sqlite3_step(q)==SQLITE_ROW ){
    printf("incorrect result\n");
    exit(-1);
  }
  sqlite3_finalize(q);
  q = prepare(db, "MAIN", __LINE__, "SELECT x FROM p2 EXCEPT SELECT x FROM p1");
  if( sqlite3_step(q)==SQLITE_ROW ){
    printf("incorrect result\n");
    exit(-1);
  }
  sqlite3_finalize(q);
  printf("OK\n");

  sqlite3_close(db);
  return 0;
}