xdasd_mqueue.c

Go to the documentation of this file.
00001 /*----------------------------------------------------------------------------
00002  * Copyright (c) 2006, Novell, Inc.
00003  * All rights reserved.
00004  * 
00005  * Redistribution and use in source and binary forms, with or without 
00006  * modification, are permitted provided that the following conditions are 
00007  * met:
00008  * 
00009  *     * Redistributions of source code must retain the above copyright 
00010  *       notice, this list of conditions and the following disclaimer.
00011  *     * Redistributions in binary form must reproduce the above copyright 
00012  *       notice, this list of conditions and the following disclaimer in the 
00013  *       documentation and/or other materials provided with the distribution.
00014  *     * Neither the name of the Novell nor the names of its contributors 
00015  *       may be used to endorse or promote products derived from this 
00016  *       software without specific prior written permission.
00017  * 
00018  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
00019  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00020  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
00021  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR 
00022  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
00023  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
00024  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
00025  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
00026  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
00027  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
00028  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00029  *--------------------------------------------------------------------------*/
00030 
00044 #include "xdasd_mqueue.h"
00045 #include "xdasd_thread.h"
00046 
00047 #include <stdio.h>
00048 #include <string.h>
00049 #include <malloc.h>
00050 
00057 typedef struct mqueue_tag
00058 {
00059    int shutting_down;         
00060    thread_t bgproc_th;        
00061    mutex_t queue_mutex;       
00062    cv_t queue_cv;             
00063    MsgPut_t * msg_put;        
00064    MsgGet_t * msg_get;        
00065    MsgProc_t * msg_process;   
00066 } mqueue_t;
00067 
00077 static void * bg_proc(void * data)
00078 {
00079    mqueue_t * mqp = (mqueue_t *)data;
00080    int last_chance = 0;
00081 
00082    mutex_acquire(&mqp->queue_mutex);
00083 
00084    do 
00085    {
00086       void * msg;
00087       while ((msg = mqp->msg_get()) != 0)
00088       {
00089          mutex_release(&mqp->queue_mutex);
00090 
00091          mqp->msg_process(msg);     /* process msg OUTSIDE the lock */
00092 
00093          mutex_acquire(&mqp->queue_mutex);
00094       }
00095       if (!mqp->shutting_down)
00096          cv_wait(&mqp->queue_cv, &mqp->queue_mutex);
00097    } while (!mqp->shutting_down || !last_chance++);
00098 
00099    mutex_release(&mqp->queue_mutex);
00100 
00101    return 0;
00102 }
00103 
00107 /*--------------------------------------------------------------------------*/
00108 
00122 int xdasd_mqueue_append(MsgQueue mq, void * msg)
00123 {
00124    int err;
00125    mqueue_t * mqp = (mqueue_t *)mq;
00126 
00127    mutex_acquire(&mqp->queue_mutex);
00128 
00129    if ((err = mqp->msg_put(msg)) == 0)
00130       cv_signal(&mqp->queue_cv);
00131 
00132    mutex_release(&mqp->queue_mutex);
00133 
00134    return err;
00135 }
00136 
00148 MsgQueue xdasd_mqueue_create(MsgPut_t * put, MsgGet_t * get, MsgProc_t * proc)
00149 {
00150    mqueue_t * mqp;
00151 
00152    /* allocate queue object */
00153    if ((mqp = (mqueue_t *)malloc(sizeof(*mqp))) != 0)
00154    {
00155       memset(mqp, 0, sizeof(*mqp));
00156 
00157       /* store user-provided function pointers */
00158       mqp->msg_put = put;
00159       mqp->msg_get = get;
00160       mqp->msg_process = proc;
00161 
00162       /* create the queue cv */
00163       if (cv_create(&mqp->queue_cv) != 0)
00164       {
00165          free(mqp);
00166          return 0;
00167       }
00168    
00169       /* create the queue mutex */
00170       mutex_create(&mqp->queue_mutex);
00171    
00172       /* create background thread */
00173       if ((mqp->bgproc_th = thread_create(bg_proc, mqp)) == 0)
00174       {
00175          mutex_destroy(&mqp->queue_mutex);
00176          cv_destroy(&mqp->queue_cv);
00177          free(mqp);
00178          return 0;
00179       }
00180    }
00181    return (MsgQueue)mqp;
00182 }
00183 
00190 void xdasd_mqueue_destroy(MsgQueue mq)
00191 {
00192    mqueue_t * mqp = (mqueue_t *)mq;
00193 
00194    /* signal background thread to shutdown */
00195    mutex_acquire(&mqp->queue_mutex);
00196    mqp->shutting_down = 1;
00197    cv_signal(&mqp->queue_cv);
00198    mutex_release(&mqp->queue_mutex);
00199 
00200    /* wait for background logger thread to terminate */
00201    (void)thread_wait(mqp->bgproc_th);
00202 
00203    /* destroy sync resources */
00204    cv_destroy(&mqp->queue_cv);
00205    mutex_destroy(&mqp->queue_mutex);
00206    free(mqp);
00207 }
00208 
00209 /*==========================================================================*/
00210 
00211 #ifdef XDASD_MQUEUE_TEST
00212 
00213 #include <stdarg.h>
00214 
00215 static void output(const char * fmt, ... ) 
00216 {
00217 #ifdef LOG_TO_STDOUT
00218    va_list args;
00219    va_start(args, fmt);
00220    vprintf(fmt, args);
00221    va_end(args);
00222 #else
00223    (void)fmt;
00224 #endif
00225 }
00226 
00227 #define TESTDATA (void*)(-1)
00228 
00229 static int msgque = 0;
00230 static int processed = 0;
00231 
00232 static int testput(void * data) 
00233 {
00234    if (data == TESTDATA)
00235       ++msgque;
00236    return 0;
00237 }
00238 static void * testget(void) 
00239 {
00240    if (msgque == 0)
00241       return 0;
00242    msgque--;
00243    return TESTDATA;
00244 }
00245 static void testproc(void * data) 
00246 {
00247    if (data == TESTDATA)
00248       processed++;
00249 }
00250 
00263 int main(void)
00264 {
00265 #define LOOP_COUNT 50000
00266 
00267    MsgQueue mq;
00268    int rv = 0;
00269    int err = 0;
00270 
00271    if ((mq = xdasd_mqueue_create(testput, testget, testproc)) != 0)
00272    {
00273       int i;
00274       for (i = 0; i < LOOP_COUNT; i++)
00275          if ((err = xdasd_mqueue_append(mq, TESTDATA)) != 0)
00276             break;
00277       xdasd_mqueue_destroy(mq);
00278    }
00279    if (!mq) rv++;
00280    if (err) rv++;
00281    if (msgque) rv++;
00282    if (processed != LOOP_COUNT) rv++;
00283    if (rv)
00284       output("Unit test error count: %d.\n", rv);
00285    return rv;
00286 }
00287 
00288 #endif
00289 

Generated on Thu Aug 20 22:33:06 2009 for OpenXDAS by  doxygen 1.5.6