1
import java.sql.Timestamp;
3
import org.apache.log4j.*;
6
* This class prunes superfluous data from the records_logs table.
8
* Its primary method (prune) is invoked every time an instance report is
9
* generated. It can also be invoked from the command line.
15
private static Integer QUERY_LIMIT = 1000000 //HACK to avoid MySQL memory leak; @see pruneRepeatedly
16
private static Logger LOG = Logger.getLogger( Pruner.class )
21
public Pruner(Sql sql)
29
* Prunes redundant data from the database, by deleting all data except the
30
* earliest and latest n rows for each instance.
32
* It's necessary to delete superfluous data because of a bug in the code that re-adds
33
* instance data to the log table every 20 seconds. That bug creates a vast amount of
34
* superfluous log data for long-running instances, which slows down report generation.
36
* This function can be run repeatedly, and will incrementally re-prune the
37
* records_logs table each time.
39
* This function uses an algoritm which requires only one full table scan and does
40
* not require mysql to sort the results.
42
* @param redundantRowsDeleteThreshold How many redundant rows an instance must have
44
* @param targetRowsNum The number of rows to retain after pruning for each instance,
45
* at the beginning and at the end (n rows at the beginning, and n at the end).
47
public void prune(Integer redundantRowsDeleteThreshold=100, Integer targetRowsNum=80,
48
Long onlyAfterTimestamp=0, Long onlyBeforeTimestamp=9999999999) {
50
assert redundantRowsDeleteThreshold != null
51
assert targetRowsNum != null
52
assert onlyAfterTimestamp != null
53
assert onlyBeforeTimestamp != null
54
assert onlyAfterTimestamp < onlyBeforeTimestamp
55
assert redundantRowsDeleteThreshold > 0
56
assert targetRowsNum > 0
58
LOG.info("Begin prune")
60
/* Find earliest and latest Nth rows, according to the following algorithm.
61
* Establish two lists for each instance: one for the earliest nth rows, and one
62
* for the latest nth rows. Whenever we encounter a row for an instance which
63
* is earlier than the latest row in the early list, displace the earliest
64
* row in the early list and add the current row. Likewise (but in reverse)
65
* with the latest rows. At the end of this procedure, we will have two
66
* lists for each instance that contain the earliest and latest n rows for
67
* that instance. Then, delete all rows for the instance with a timestamp
68
* greater than the latest timestamp in the early list, and less than the
69
* earliest timestamp in the late list. What remains will be the earlist n
70
* and latest n rows for an instance; everything in between will be deleted.
72
* This algorithm was chosen for the following reasons: 1) It requires only
73
* one full table scan of the data; 2) it does not require sorting the data,
74
* which would cause MySQL to do a slow disk sort; 3) it uses a minmum of heap
75
* space; 4) it can be run incrementally without storing any data between
76
* invocations such as last pruning timestamp
79
/* Find earliest and latest n rows */
81
SELECT record_correlation_id, UNIX_TIMESTAMP(record_timestamp) as ts
83
WHERE record_class = 'VM'
84
AND UNIX_TIMESTAMP(record_timestamp) > ?
85
AND UNIX_TIMESTAMP(record_timestamp) < ?
88
/* HACK to avoid memory leak from MySQL driver; add LIMIT clause to SQL */
89
if (QUERY_LIMIT != null && QUERY_LIMIT > 0) { query = query + " LIMIT " + QUERY_LIMIT }
91
def instanceInfoMap = [:]
93
this.sql.eachRow( query, [onlyAfterTimestamp,onlyBeforeTimestamp] ) {
95
if (! instanceInfoMap.containsKey(it.record_correlation_id)) {
96
instanceInfoMap[it.record_correlation_id]=new InstanceInfo()
97
LOG.debug("Found new instance:" + it.record_correlation_id)
99
InstanceInfo info = instanceInfoMap[it.record_correlation_id]
101
//latestEarlyTs is only set when earlyList has reached n rows
102
if (info.latestEarlyTs==null) {
103
info.earlyList.add(it.ts)
104
if (info.earlyList.size() >= targetRowsNum) {
105
info.earlyList.sort()
106
info.latestEarlyTs = info.earlyList.last()
108
} else if (it.ts <= info.latestEarlyTs) {
110
info.earlyList.add(ts)
111
info.earlyList.sort()
112
info.earlyList.remove(info.earlyList.last())
113
info.latestEarlyTs = info.earlyList.last()
116
//earliestLateTs is only set when lateList has reached n rows
117
if (info.earliestLateTs==null) {
118
info.lateList.add(it.ts)
119
if (info.lateList.size() >= targetRowsNum) {
121
info.earliestLateTs = info.lateList.first()
123
} else if (it.ts >= info.earliestLateTs) {
125
info.lateList.add(ts)
127
info.lateList.remove(info.lateList.first())
128
info.earliestLateTs = info.lateList.first()
131
info.rowCnt = info.rowCnt+1
136
/* Delete data for each instance which is later than the latest early timestamp,
137
* but earlier than the earliest late timestamp.
140
DELETE FROM records_logs
141
WHERE record_correlation_id = ?
142
AND record_class = 'VM'
143
AND UNIX_TIMESTAMP(record_timestamp) > ?
144
AND UNIX_TIMESTAMP(record_timestamp) < ?
146
LOG.debug("Begin deleting")
147
Integer redundantRowsCnt
148
instanceInfoMap.each { key, value ->
149
redundantRowsCnt = value.rowCnt-(targetRowsNum*2)
150
LOG.debug("INSTANCE id:${key} rowsAboveThreshold:${redundantRowsCnt}")
151
if (value.rowCnt-(targetRowsNum*2) > redundantRowsDeleteThreshold) {
152
this.sql.executeUpdate(query, [key, value.latestEarlyTs, value.earliestLateTs])
153
LOG.debug(String.format("DELETE id:%s %d-%d",
154
key, value.latestEarlyTs, value.earliestLateTs))
158
LOG.info("End prune")
159
} //end: prune method
163
* HACK to handle memory leak in mysql/groovy. The MySQL JDBC driver will throw an
164
* OutOfMemory exception sometimes for ResultSets greater than 10M rows or so.
165
* This happens even when you execute the query by itself in a groovy script with no
168
* To get around this problem, I added a "LIMIT" clause to the prune() method which
169
* limits resultsets to 4M rows.
171
* This method calls prune repeatedly, so the entire log will be pruned (4M rows at
174
public void pruneRepeatedly()
176
def res = sql.firstRow("SELECT count(*) AS cnt FROM records_logs")
177
if (QUERY_LIMIT != null) {
178
for (int i=0; i<res.cnt.intdiv(QUERY_LIMIT); i++) {
179
LOG.info("Prune iteration ${i}")
180
prune() //prune 4M rows
183
LOG.info("Prune iteration final")
184
prune() //prune remainder of rows less than 4M
189
* Command-line invocation of the prune method
191
public static void main(String[] args) {
193
/* Read cmd-line args */
194
CliBuilder cli = new CliBuilder(usage:"Pruner.groovy -p mysqlPassword [options]")
195
cli.D(args:1, required:false, argName:"dbName", "Database Name (default eucalyptus_records)")
196
cli.u(args:1, required:false, argName:"username", "username (default eucalyptus)")
197
cli.p(args:1, required:true, argName:"password", "password for mysql")
198
cli.h(args:1, required:false, argName:"host", "host of mysqld (default localhost)")
199
cli.P(args:1, required:false, argName:"port", "port or mysqld (default 8777)")
200
cli.g(args:0, required:false, argName:"debug", "debugging output (default false)")
201
cli.a(args:1, required:false, argName:"onlyAfterTimestamp", "only prunes after timestamp in secs (default 0)")
202
cli.b(args:1, required:false, argName:"onlyBeforeTimestamp", "only prunes before timestamp in secs (default max timestamp)")
203
cli.t(args:1, required:false, argName:"redundantRowsDeleteThreshold", "only prunes instances more than n redundant rows (default 100)")
204
cli.r(args:1, required:false, argName:"targetRowsNum", "how many rows to preserve at the beginning and end for each instance (default 80)")
205
cli.e(args:0, required:false, argName:"pruneRepeatedly", "repeatedly prunes one section at a time")
206
def options = cli.parse(args)
207
if (!options) System.exit(-1)
209
/* Parse cmd-line args into appropriate types and provide default values */
211
optsMap['D']=options.D ? options.D : "eucalyptus_records"
212
optsMap['u']=options.u ? options.u : "eucalyptus"
213
optsMap['p']=options.p
214
optsMap['g']=options.g
215
optsMap['h']=options.h ? options.h : "localhost"
216
optsMap['P']=options.P ? Long.parseLong(options.P) : 8777l
217
optsMap['a']=options.a ? Long.parseLong(options.a) : 0l
218
optsMap['b']=options.b ? Long.parseLong(options.b) : 9999999999l
219
optsMap['t']=options.t ? Integer.parseInt(options.t) : 100
220
optsMap['r']=options.r ? Integer.parseInt(options.r) : 80
221
optsMap['e']=options.e
224
LOG.setLevel(Level.DEBUG)
226
LOG.setLevel(Level.OFF)
229
LOG.debug(String.format("Using db:%s user:%s host:%s port:%d debug:%s " +
230
"after:%d before:%d threshold:%d target:%d i:%b",
231
optsMap.D, optsMap.u, optsMap.h, optsMap.P, optsMap.g,
232
optsMap.a, optsMap.b, optsMap.t, optsMap.r, optsMap.e))
235
/* Create a mysql connection, then prune */
236
def connStr = "jdbc:mysql://${optsMap['h']}:${optsMap['P']}/${optsMap['D']}"
237
Sql sql = Sql.newInstance(connStr, optsMap['u'],optsMap['p'],'com.mysql.jdbc.Driver')
239
Pruner pruner = new Pruner(sql);
241
pruner.pruneRepeatedly()
243
pruner.prune(optsMap['t'], optsMap['r'], optsMap['a'], optsMap['b'])
249
} //end: Pruner class
255
Long latestEarlyTs = null
256
Long earliestLateTs = null