~mordred/ubuntu/maverick/drizzle/prerelease

« back to all changes in this revision

Viewing changes to plugin/multi_thread/multi_thread.cc

  • Committer: Monty Taylor
  • Date: 2010-09-26 16:09:02 UTC
  • mto: This revision was merged to the branch mainline in revision 1383.
  • Revision ID: mordred@inaugust.com-20100926160902-r30v5hegk16cjk22
Tags: upstream-2010.09.1794
ImportĀ upstreamĀ versionĀ 2010.09.1794

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
#include <drizzled/module/option_map.h>
21
21
#include <drizzled/errmsg_print.h>
22
22
 
 
23
#include <boost/thread.hpp>
 
24
#include <boost/bind.hpp>
 
25
 
23
26
namespace po= boost::program_options;
24
27
using namespace std;
25
28
using namespace drizzled;
35
38
  extern size_t my_thread_stack_size;
36
39
}
37
40
 
38
 
/**
39
 
 * Function to be run as a thread for each session.
40
 
 */
41
 
namespace
42
 
{
43
 
  extern "C" pthread_handler_t session_thread(void *arg);
44
 
}
45
 
 
46
 
namespace
47
 
{
48
 
  extern "C" pthread_handler_t session_thread(void *arg)
49
 
  {
50
 
    Session *session= static_cast<Session*>(arg);
51
 
    MultiThreadScheduler *sched= static_cast<MultiThreadScheduler*>(session->scheduler);
52
 
    sched->runSession(session);
53
 
    return NULL;
54
 
  }
55
 
}
56
 
 
57
 
 
58
 
bool MultiThreadScheduler::addSession(Session *session)
59
 
{
60
 
  if (thread_count >= max_threads)
61
 
    return true;
62
 
 
 
41
void MultiThreadScheduler::runSession(drizzled::Session *session)
 
42
{
 
43
  if (drizzled::internal::my_thread_init())
 
44
  {
 
45
    session->disconnect(drizzled::ER_OUT_OF_RESOURCES, true);
 
46
    session->status_var.aborted_connects++;
 
47
    killSessionNow(session);
 
48
  }
 
49
  boost::this_thread::at_thread_exit(&internal::my_thread_end);
 
50
 
 
51
  session->thread_stack= (char*) &session;
 
52
  session->run();
 
53
  killSessionNow(session);
 
54
}
 
55
 
 
56
void MultiThreadScheduler::setStackSize()
 
57
{
 
58
  pthread_attr_t attr;
 
59
 
 
60
  (void) pthread_attr_init(&attr);
 
61
 
 
62
  /* Get the thread stack size that the OS will use and make sure
 
63
    that we update our global variable. */
 
64
  int err= pthread_attr_getstacksize(&attr, &my_thread_stack_size);
 
65
  pthread_attr_destroy(&attr);
 
66
 
 
67
  if (err != 0)
 
68
  {
 
69
    errmsg_printf(ERRMSG_LVL_ERROR, _("Unable to get thread stack size\n"));
 
70
    my_thread_stack_size= 524288; // At the time of the writing of this code, this was OSX's
 
71
  }
 
72
 
 
73
  if (my_thread_stack_size == 0)
 
74
  {
 
75
    my_thread_stack_size= 524288; // At the time of the writing of this code, this was OSX's
 
76
  }
63
77
#ifdef __sun
64
78
  /*
65
79
   * Solaris will return zero for the stack size in a call to
71
85
   * will be used.
72
86
   */
73
87
  if (my_thread_stack_size == 0)
 
88
  {
74
89
    my_thread_stack_size= 2 * 1024 * 1024;
 
90
  }
75
91
#endif
76
 
 
77
 
  /* Thread stack size of zero means just use the OS default */
78
 
  if (my_thread_stack_size != 0)
79
 
  {
80
 
    int err= pthread_attr_setstacksize(&attr, my_thread_stack_size);
81
 
 
82
 
    if (err != 0)
83
 
    {
84
 
      errmsg_printf(ERRMSG_LVL_ERROR,
85
 
                    _("Unable to set thread stack size to %" PRId64 "\n"),
86
 
                    static_cast<uint64_t>(my_thread_stack_size));
87
 
      return true;
88
 
    }
89
 
  }
90
 
  else
91
 
  {
92
 
    /* Get the thread stack size that the OS will use and make sure
93
 
       that we update our global variable. */
94
 
    int err= pthread_attr_getstacksize(&attr, &my_thread_stack_size);
95
 
 
96
 
    if (err != 0)
97
 
    {
98
 
      errmsg_printf(ERRMSG_LVL_ERROR, _("Unable to get thread stack size\n"));
99
 
      return true;
100
 
    }
101
 
  }
 
92
}
 
93
 
 
94
bool MultiThreadScheduler::addSession(Session *session)
 
95
{
 
96
  if (thread_count >= max_threads)
 
97
    return true;
102
98
 
103
99
  thread_count.increment();
104
100
 
105
 
  if (pthread_create(&session->real_id, &attr, session_thread,
106
 
                     static_cast<void*>(session)))
 
101
  boost::thread new_thread(boost::bind(&MultiThreadScheduler::runSession, this, session));
 
102
 
 
103
  if (not new_thread.joinable())
107
104
  {
108
105
    thread_count.decrement();
109
106
    return true;
118
115
  /* Locks LOCK_thread_count and deletes session */
119
116
  Session::unlink(session);
120
117
  thread_count.decrement();
121
 
  internal::my_thread_end();
122
 
  pthread_exit(0);
123
 
  /* We should never reach this point. */
124
118
}
125
119
 
126
120
MultiThreadScheduler::~MultiThreadScheduler()
127
121
{
128
 
  LOCK_thread_count.lock();
 
122
  boost::mutex::scoped_lock scopedLock(LOCK_thread_count);
129
123
  while (thread_count)
130
124
  {
131
 
    pthread_cond_wait(COND_thread_count.native_handle(), LOCK_thread_count.native_handle());
 
125
    COND_thread_count.wait(scopedLock);
132
126
  }
133
 
 
134
 
  LOCK_thread_count.unlock();
135
 
  (void) pthread_attr_destroy(&attr);
136
127
}
137
128
 
138
129