2
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
7
import akka.actor.{ ActorRef, Actor, IllegalActorStateException }
8
import akka.util.{ ReflectiveAccess, Switch }
10
import java.util.Queue
11
import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
12
import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue }
13
import util.DynamicVariable
16
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
17
* that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors. I.e. the
18
* actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message.
20
* Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably
21
* best described as "work donating" because the actor of which work is being stolen takes the initiative.
23
* The preferred way of creating dispatchers is to use
24
* the {@link akka.dispatch.Dispatchers} factory object.
26
* @see akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher
27
* @see akka.dispatch.Dispatchers
29
* @author Viktor Klang
31
class ExecutorBasedEventDrivenWorkStealingDispatcher(
33
throughput: Int = Dispatchers.THROUGHPUT,
34
throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
35
mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
36
config: ThreadPoolConfig = ThreadPoolConfig())
37
extends ExecutorBasedEventDrivenDispatcher(_name, throughput, throughputDeadlineTime, mailboxType, config) {
39
def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
40
this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage
42
def this(_name: String, throughput: Int, mailboxType: MailboxType) =
43
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
45
def this(_name: String, throughput: Int) =
46
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
48
def this(_name: String, _config: ThreadPoolConfig) =
49
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config)
51
def this(_name: String, memberType: Class[_ <: Actor]) =
52
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
54
def this(_name: String, mailboxType: MailboxType) =
55
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
58
private var actorType: Option[Class[_]] = None
60
private var members = Vector[ActorRef]()
61
private val donationInProgress = new DynamicVariable(false)
63
private[akka] override def register(actorRef: ActorRef) = {
64
//Verify actor type conformity
66
case None => actorType = Some(actorRef.actor.getClass)
68
if (aType != actorRef.actor.getClass)
69
throw new IllegalActorStateException(String.format(
70
"Can't register actor %s in a work stealing dispatcher which already knows actors of type %s",
74
synchronized { members :+= actorRef } //Update members
75
super.register(actorRef)
78
private[akka] override def unregister(actorRef: ActorRef) = {
79
synchronized { members = members.filterNot(actorRef eq) } //Update members
80
super.unregister(actorRef)
83
override private[akka] def dispatch(invocation: MessageInvocation) = {
84
val mbox = getMailbox(invocation.receiver)
85
if (donationInProgress.value == false && (!mbox.isEmpty || mbox.dispatcherLock.locked) && attemptDonationOf(invocation, mbox)) {
86
//We were busy and we got to donate the message to some other lucky guy, we're done here
88
mbox enqueue invocation
89
registerForExecution(mbox)
93
override private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
95
donationInProgress.value = true
96
while (donateFrom(mbox)) {} //When we reregister, first donate messages to another actor
97
} finally { donationInProgress.value = false }
99
if (!mbox.isEmpty) //If we still have messages left to process, reschedule for execution
100
super.reRegisterForExecution(mbox)
104
* Returns true if it successfully donated a message
106
protected def donateFrom(donorMbox: MessageQueue with ExecutableMailbox): Boolean = {
107
val actors = members // copy to prevent concurrent modifications having any impact
109
// we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means
110
// the dispatcher is being shut down...
111
// Starts at is seeded by current time
112
doFindDonorRecipient(donorMbox, actors, (System.currentTimeMillis % actors.size).asInstanceOf[Int]) match {
114
case recipient => donate(donorMbox.dequeue, recipient)
119
* Returns true if the donation succeeded or false otherwise
121
protected def attemptDonationOf(message: MessageInvocation, donorMbox: MessageQueue with ExecutableMailbox): Boolean = try {
122
donationInProgress.value = true
123
val actors = members // copy to prevent concurrent modifications having any impact
124
doFindDonorRecipient(donorMbox, actors, System.identityHashCode(message) % actors.size) match {
126
case recipient => donate(message, recipient)
128
} finally { donationInProgress.value = false }
131
* Rewrites the message and adds that message to the recipients mailbox
132
* returns true if the message is non-null
134
protected def donate(organ: MessageInvocation, recipient: ActorRef): Boolean = {
136
if (organ.senderFuture.isDefined) recipient.postMessageToMailboxAndCreateFutureResultWithTimeout[Any](
137
organ.message, recipient.timeout, organ.sender, organ.senderFuture)
138
else if (organ.sender.isDefined) recipient.postMessageToMailbox(organ.message, organ.sender)
139
else recipient.postMessageToMailbox(organ.message, None)
145
* Returns an available recipient for the message, if any
147
protected def doFindDonorRecipient(donorMbox: MessageQueue with ExecutableMailbox, potentialRecipients: Vector[ActorRef], startIndex: Int): ActorRef = {
148
val prSz = potentialRecipients.size
150
var recipient: ActorRef = null
152
while ((i < prSz) && (recipient eq null)) {
153
val actor = potentialRecipients((i + startIndex) % prSz) //Wrap-around, one full lap
154
val mbox = getMailbox(actor)
156
if ((mbox ne donorMbox) && mbox.isEmpty) { //Don't donate to yourself
157
recipient = actor //Found!
163
recipient // nothing found, reuse same start index next time