xdasd_mqueue.c
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00044 #include "xdasd_mqueue.h"
00045 #include "xdasd_thread.h"
00046
00047 #include <stdio.h>
00048 #include <string.h>
00049 #include <malloc.h>
00050
00057 typedef struct mqueue_tag
00058 {
00059 int shutting_down;
00060 thread_t bgproc_th;
00061 mutex_t queue_mutex;
00062 cv_t queue_cv;
00063 MsgPut_t * msg_put;
00064 MsgGet_t * msg_get;
00065 MsgProc_t * msg_process;
00066 } mqueue_t;
00067
00077 static void * bg_proc(void * data)
00078 {
00079 mqueue_t * mqp = (mqueue_t *)data;
00080 int last_chance = 0;
00081
00082 mutex_acquire(&mqp->queue_mutex);
00083
00084 do
00085 {
00086 void * msg;
00087 while ((msg = mqp->msg_get()) != 0)
00088 {
00089 mutex_release(&mqp->queue_mutex);
00090
00091 mqp->msg_process(msg);
00092
00093 mutex_acquire(&mqp->queue_mutex);
00094 }
00095 if (!mqp->shutting_down)
00096 cv_wait(&mqp->queue_cv, &mqp->queue_mutex);
00097 } while (!mqp->shutting_down || !last_chance++);
00098
00099 mutex_release(&mqp->queue_mutex);
00100
00101 return 0;
00102 }
00103
00107
00108
00122 int xdasd_mqueue_append(MsgQueue mq, void * msg)
00123 {
00124 int err;
00125 mqueue_t * mqp = (mqueue_t *)mq;
00126
00127 mutex_acquire(&mqp->queue_mutex);
00128
00129 if ((err = mqp->msg_put(msg)) == 0)
00130 cv_signal(&mqp->queue_cv);
00131
00132 mutex_release(&mqp->queue_mutex);
00133
00134 return err;
00135 }
00136
00148 MsgQueue xdasd_mqueue_create(MsgPut_t * put, MsgGet_t * get, MsgProc_t * proc)
00149 {
00150 mqueue_t * mqp;
00151
00152
00153 if ((mqp = (mqueue_t *)malloc(sizeof(*mqp))) != 0)
00154 {
00155 memset(mqp, 0, sizeof(*mqp));
00156
00157
00158 mqp->msg_put = put;
00159 mqp->msg_get = get;
00160 mqp->msg_process = proc;
00161
00162
00163 if (cv_create(&mqp->queue_cv) != 0)
00164 {
00165 free(mqp);
00166 return 0;
00167 }
00168
00169
00170 mutex_create(&mqp->queue_mutex);
00171
00172
00173 if ((mqp->bgproc_th = thread_create(bg_proc, mqp)) == 0)
00174 {
00175 mutex_destroy(&mqp->queue_mutex);
00176 cv_destroy(&mqp->queue_cv);
00177 free(mqp);
00178 return 0;
00179 }
00180 }
00181 return (MsgQueue)mqp;
00182 }
00183
00190 void xdasd_mqueue_destroy(MsgQueue mq)
00191 {
00192 mqueue_t * mqp = (mqueue_t *)mq;
00193
00194
00195 mutex_acquire(&mqp->queue_mutex);
00196 mqp->shutting_down = 1;
00197 cv_signal(&mqp->queue_cv);
00198 mutex_release(&mqp->queue_mutex);
00199
00200
00201 (void)thread_wait(mqp->bgproc_th);
00202
00203
00204 cv_destroy(&mqp->queue_cv);
00205 mutex_destroy(&mqp->queue_mutex);
00206 free(mqp);
00207 }
00208
00209
00210
00211 #ifdef XDASD_MQUEUE_TEST
00212
00213 #include <stdarg.h>
00214
00215 static void output(const char * fmt, ... )
00216 {
00217 #ifdef LOG_TO_STDOUT
00218 va_list args;
00219 va_start(args, fmt);
00220 vprintf(fmt, args);
00221 va_end(args);
00222 #else
00223 (void)fmt;
00224 #endif
00225 }
00226
00227 #define TESTDATA (void*)(-1)
00228
00229 static int msgque = 0;
00230 static int processed = 0;
00231
00232 static int testput(void * data)
00233 {
00234 if (data == TESTDATA)
00235 ++msgque;
00236 return 0;
00237 }
00238 static void * testget(void)
00239 {
00240 if (msgque == 0)
00241 return 0;
00242 msgque--;
00243 return TESTDATA;
00244 }
00245 static void testproc(void * data)
00246 {
00247 if (data == TESTDATA)
00248 processed++;
00249 }
00250
00263 int main(void)
00264 {
00265 #define LOOP_COUNT 50000
00266
00267 MsgQueue mq;
00268 int rv = 0;
00269 int err = 0;
00270
00271 if ((mq = xdasd_mqueue_create(testput, testget, testproc)) != 0)
00272 {
00273 int i;
00274 for (i = 0; i < LOOP_COUNT; i++)
00275 if ((err = xdasd_mqueue_append(mq, TESTDATA)) != 0)
00276 break;
00277 xdasd_mqueue_destroy(mq);
00278 }
00279 if (!mq) rv++;
00280 if (err) rv++;
00281 if (msgque) rv++;
00282 if (processed != LOOP_COUNT) rv++;
00283 if (rv)
00284 output("Unit test error count: %d.\n", rv);
00285 return rv;
00286 }
00287
00288 #endif
00289