summaryrefslogtreecommitdiffstats
path: root/ml/dlib/examples/pipe_ex.cpp
blob: 9298dba1b12bdaa4299c255179f3ea08a1a6a82d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
// The contents of this file are in the public domain. See LICENSE_FOR_EXAMPLE_PROGRAMS.txt


/*
    This is an example illustrating the use of the threading API and pipe object 
    from the dlib C++ Library.

    In this example we will create three threads that will read "jobs" off the end of 
    a pipe object and process them.  It shows you how you can use the pipe object
    to communicate between threads.  


    Example program output:
    0 INFO  [0] pipe_example: Add job 0 to pipe
    0 INFO  [0] pipe_example: Add job 1 to pipe
    0 INFO  [0] pipe_example: Add job 2 to pipe
    0 INFO  [0] pipe_example: Add job 3 to pipe
    0 INFO  [0] pipe_example: Add job 4 to pipe
    0 INFO  [0] pipe_example: Add job 5 to pipe
    0 INFO  [1] pipe_example: got job 0
    0 INFO  [0] pipe_example: Add job 6 to pipe
    0 INFO  [2] pipe_example: got job 1
    0 INFO  [0] pipe_example: Add job 7 to pipe
    0 INFO  [3] pipe_example: got job 2
  103 INFO  [0] pipe_example: Add job 8 to pipe
  103 INFO  [1] pipe_example: got job 3
  103 INFO  [0] pipe_example: Add job 9 to pipe
  103 INFO  [2] pipe_example: got job 4
  103 INFO  [0] pipe_example: Add job 10 to pipe
  103 INFO  [3] pipe_example: got job 5
  207 INFO  [0] pipe_example: Add job 11 to pipe
  207 INFO  [1] pipe_example: got job 6
  207 INFO  [0] pipe_example: Add job 12 to pipe
  207 INFO  [2] pipe_example: got job 7
  207 INFO  [0] pipe_example: Add job 13 to pipe
  207 INFO  [3] pipe_example: got job 8
  311 INFO  [1] pipe_example: got job 9
  311 INFO  [2] pipe_example: got job 10
  311 INFO  [3] pipe_example: got job 11
  311 INFO  [0] pipe_example: Add job 14 to pipe
  311 INFO  [0] pipe_example: main ending
  311 INFO  [0] pipe_example: destructing pipe object: wait for job_pipe to be empty
  415 INFO  [1] pipe_example: got job 12
  415 INFO  [2] pipe_example: got job 13
  415 INFO  [3] pipe_example: got job 14
  415 INFO  [0] pipe_example: destructing pipe object: job_pipe is empty
  519 INFO  [1] pipe_example: thread ending
  519 INFO  [2] pipe_example: thread ending
  519 INFO  [3] pipe_example: thread ending
  519 INFO  [0] pipe_example: destructing pipe object: all threads have ended


  The first column is the number of milliseconds since program start, the second
  column is the logging level, the third column is the thread id, and the rest
  is the log message.
*/


#include <dlib/threads.h>
#include <dlib/misc_api.h>  // for dlib::sleep
#include <dlib/pipe.h>
#include <dlib/logger.h>

using namespace dlib;

struct job
{
    /*
        This object represents the jobs we are going to send out to our threads.  
    */
    int id;
};

dlib::logger dlog("pipe_example");

// ----------------------------------------------------------------------------------------

class pipe_example : private multithreaded_object
{
public:
    pipe_example(
    ) : 
        job_pipe(4) // This 4 here is the size of our job_pipe.  The significance is that
                    // if you try to enqueue more than 4 jobs onto the pipe then enqueue() will
                    // block until there is room.  
    {
        // register 3 threads
        register_thread(*this,&pipe_example::thread);
        register_thread(*this,&pipe_example::thread);
        register_thread(*this,&pipe_example::thread);

        // start the 3 threads we registered above
        start();
    }

    ~pipe_example (
    )
    {
        dlog << LINFO << "destructing pipe object: wait for job_pipe to be empty";
        // wait for all the jobs to be processed
        job_pipe.wait_until_empty();

        dlog << LINFO << "destructing pipe object: job_pipe is empty";

        // now disable the job_pipe.  doing this will cause all calls to 
        // job_pipe.dequeue() to return false so our threads will terminate
        job_pipe.disable();

        // now block until all the threads have terminated
        wait();
        dlog << LINFO << "destructing pipe object: all threads have ended";
    }

    // Here we declare our pipe object.  It will contain our job objects.
    // There are only two requirements on the type of objects you can use in a
    // pipe, first they must have a default constructor and second they must
    // be swappable by a global swap().
    dlib::pipe<job> job_pipe;

private:
    void thread ()
    {
        job j;
        // Here we loop on jobs from the job_pipe.  
        while (job_pipe.dequeue(j))
        {
            // process our job j in some way.   
            dlog << LINFO << "got job " << j.id;

            // sleep for 0.1 seconds
            dlib::sleep(100);
        }
        dlog << LINFO << "thread ending";
    }

};

// ----------------------------------------------------------------------------------------

int main()
{
    // Set the dlog object so that it logs everything.
    dlog.set_level(LALL);

    pipe_example pe;

    for (int i = 0; i < 15; ++i)
    {
        dlog << LINFO << "Add job " << i << " to pipe";
        job j;
        j.id = i;


        // Add this job to the pipe.  One of our three threads will get it and process it.
        // It should also be pointed out that the enqueue() function uses the global
        // swap function to move jobs into the pipe.  This means that it modifies the
        // jobs we are passing in to it.  This allows you to implement a fast swap 
        // operator for your jobs.  For example, std::vector objects have a global
        // swap and it can execute in constant time by just swapping pointers inside 
        // std::vector.  This means that the dlib::pipe is effectively a zero-copy 
        // message passing system if you setup global swap for your jobs.   
        pe.job_pipe.enqueue(j);
    }

    dlog << LINFO << "main ending";

    // the main function won't really terminate here.  It will call the destructor for pe
    // which will block until all the jobs have been processed.
}

// ----------------------------------------------------------------------------------------