~ubuntu-branches/debian/sid/scala/sid

« back to all changes in this revision

Viewing changes to test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala

  • Committer: Package Import Robot
  • Author(s): Emmanuel Bourg, Mehdi Dogguy, Lucas Satabin, Frank S. Thomas, Emmanuel Bourg
  • Date: 2015-06-05 23:52:59 UTC
  • mfrom: (1.2.11)
  • Revision ID: package-import@ubuntu.com-20150605235259-wk00vgk83dh8o19g
Tags: 2.10.5-1
* Team upload.

[ Mehdi Dogguy ]
* New upstream release (Closes: #744278).

[ Lucas Satabin ]
* Update patches
* Update the clean target
* Update paths of elements to install
* Update watch file

[ Frank S. Thomas ]
* Remove myself from Uploaders.

[ Emmanuel Bourg ]
* The package has been adopted by the Java Team (Closes: #754935)
* Patched the build to avoid downloading libraries from the Internet
* Replaced the minified JavaScript files with unobfuscated ones
* No longer build scala-partest.jar until diffutils is packaged or replaced
* debian/watch: Fixed the versions matched (x.y.z instead of x.y.z..z)
* debian/rules:
  - Added the missing get-orig-source target (Closes: #724704)
  - Improved the clean target
* debian/control:
  - Build depend on scala (>= 2.10) and bnd
  - Use canonical URLs for the Vcs-* fields
  - Standards-Version updated to 3.9.6 (no changes)
* Switch to debhelper level 9

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/**
 
2
 *    Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
 
3
 */
 
4
 
 
5
package akka.dispatch
 
6
 
 
7
import akka.actor.{ ActorRef, Actor, IllegalActorStateException }
 
8
import akka.util.{ ReflectiveAccess, Switch }
 
9
 
 
10
import java.util.Queue
 
11
import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
 
12
import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue }
 
13
import util.DynamicVariable
 
14
 
 
15
/**
 
16
 * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
 
17
 * that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors. I.e. the
 
18
 * actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message.
 
19
 * <p/>
 
20
 * Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably
 
21
 * best described as "work donating" because the actor of which work is being stolen takes the initiative.
 
22
 * <p/>
 
23
 * The preferred way of creating dispatchers is to use
 
24
 * the {@link akka.dispatch.Dispatchers} factory object.
 
25
 *
 
26
 * @see akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher
 
27
 * @see akka.dispatch.Dispatchers
 
28
 *
 
29
 * @author Viktor Klang
 
30
 */
 
31
class ExecutorBasedEventDrivenWorkStealingDispatcher(
 
32
  _name: String,
 
33
  throughput: Int = Dispatchers.THROUGHPUT,
 
34
  throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
 
35
  mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
 
36
  config: ThreadPoolConfig = ThreadPoolConfig())
 
37
  extends ExecutorBasedEventDrivenDispatcher(_name, throughput, throughputDeadlineTime, mailboxType, config) {
 
38
 
 
39
  def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
 
40
    this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage
 
41
 
 
42
  def this(_name: String, throughput: Int, mailboxType: MailboxType) =
 
43
    this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
 
44
 
 
45
  def this(_name: String, throughput: Int) =
 
46
    this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
 
47
 
 
48
  def this(_name: String, _config: ThreadPoolConfig) =
 
49
    this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config)
 
50
 
 
51
  def this(_name: String, memberType: Class[_ <: Actor]) =
 
52
    this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
 
53
 
 
54
  def this(_name: String, mailboxType: MailboxType) =
 
55
    this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
 
56
 
 
57
  @volatile
 
58
  private var actorType: Option[Class[_]] = None
 
59
  @volatile
 
60
  private var members = Vector[ActorRef]()
 
61
  private val donationInProgress = new DynamicVariable(false)
 
62
 
 
63
  private[akka] override def register(actorRef: ActorRef) = {
 
64
    //Verify actor type conformity
 
65
    actorType match {
 
66
      case None => actorType = Some(actorRef.actor.getClass)
 
67
      case Some(aType) =>
 
68
        if (aType != actorRef.actor.getClass)
 
69
          throw new IllegalActorStateException(String.format(
 
70
            "Can't register actor %s in a work stealing dispatcher which already knows actors of type %s",
 
71
            actorRef, aType))
 
72
    }
 
73
 
 
74
    synchronized { members :+= actorRef } //Update members
 
75
    super.register(actorRef)
 
76
  }
 
77
 
 
78
  private[akka] override def unregister(actorRef: ActorRef) = {
 
79
    synchronized { members = members.filterNot(actorRef eq) } //Update members
 
80
    super.unregister(actorRef)
 
81
  }
 
82
 
 
83
  override private[akka] def dispatch(invocation: MessageInvocation) = {
 
84
    val mbox = getMailbox(invocation.receiver)
 
85
    if (donationInProgress.value == false && (!mbox.isEmpty || mbox.dispatcherLock.locked) && attemptDonationOf(invocation, mbox)) {
 
86
      //We were busy and we got to donate the message to some other lucky guy, we're done here
 
87
    } else {
 
88
      mbox enqueue invocation
 
89
      registerForExecution(mbox)
 
90
    }
 
91
  }
 
92
 
 
93
  override private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
 
94
    try {
 
95
      donationInProgress.value = true
 
96
      while (donateFrom(mbox)) {} //When we reregister, first donate messages to another actor
 
97
    } finally { donationInProgress.value = false }
 
98
 
 
99
    if (!mbox.isEmpty) //If we still have messages left to process, reschedule for execution
 
100
      super.reRegisterForExecution(mbox)
 
101
  }
 
102
 
 
103
  /**
 
104
   * Returns true if it successfully donated a message
 
105
   */
 
106
  protected def donateFrom(donorMbox: MessageQueue with ExecutableMailbox): Boolean = {
 
107
    val actors = members // copy to prevent concurrent modifications having any impact
 
108
 
 
109
    // we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means
 
110
    // the dispatcher is being shut down...
 
111
    // Starts at is seeded by current time
 
112
    doFindDonorRecipient(donorMbox, actors, (System.currentTimeMillis % actors.size).asInstanceOf[Int]) match {
 
113
      case null      => false
 
114
      case recipient => donate(donorMbox.dequeue, recipient)
 
115
    }
 
116
  }
 
117
 
 
118
  /**
 
119
   * Returns true if the donation succeeded or false otherwise
 
120
   */
 
121
  protected def attemptDonationOf(message: MessageInvocation, donorMbox: MessageQueue with ExecutableMailbox): Boolean = try {
 
122
    donationInProgress.value = true
 
123
    val actors = members // copy to prevent concurrent modifications having any impact
 
124
    doFindDonorRecipient(donorMbox, actors, System.identityHashCode(message) % actors.size) match {
 
125
      case null      => false
 
126
      case recipient => donate(message, recipient)
 
127
    }
 
128
  } finally { donationInProgress.value = false }
 
129
 
 
130
  /**
 
131
   * Rewrites the message and adds that message to the recipients mailbox
 
132
   * returns true if the message is non-null
 
133
   */
 
134
  protected def donate(organ: MessageInvocation, recipient: ActorRef): Boolean = {
 
135
    if (organ ne null) {
 
136
      if (organ.senderFuture.isDefined) recipient.postMessageToMailboxAndCreateFutureResultWithTimeout[Any](
 
137
        organ.message, recipient.timeout, organ.sender, organ.senderFuture)
 
138
      else if (organ.sender.isDefined) recipient.postMessageToMailbox(organ.message, organ.sender)
 
139
      else recipient.postMessageToMailbox(organ.message, None)
 
140
      true
 
141
    } else false
 
142
  }
 
143
 
 
144
  /**
 
145
   * Returns an available recipient for the message, if any
 
146
   */
 
147
  protected def doFindDonorRecipient(donorMbox: MessageQueue with ExecutableMailbox, potentialRecipients: Vector[ActorRef], startIndex: Int): ActorRef = {
 
148
    val prSz = potentialRecipients.size
 
149
    var i = 0
 
150
    var recipient: ActorRef = null
 
151
 
 
152
    while ((i < prSz) && (recipient eq null)) {
 
153
      val actor = potentialRecipients((i + startIndex) % prSz) //Wrap-around, one full lap
 
154
      val mbox = getMailbox(actor)
 
155
 
 
156
      if ((mbox ne donorMbox) && mbox.isEmpty) { //Don't donate to yourself
 
157
        recipient = actor //Found!
 
158
      }
 
159
 
 
160
      i += 1
 
161
    }
 
162
 
 
163
    recipient // nothing found, reuse same start index next time
 
164
  }
 
165
}