1
/* Copyright (c) 2007 SNAP Innovation GmbH
3
* BLOB Streaming for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25
* Test JDBC-based BLOB streaming.
30
import java.io.Reader;
31
import java.io.InputStream;
32
import java.io.IOException;
33
import java.io.ByteArrayInputStream;
34
import java.io.Reader;
35
import java.io.CharArrayReader;
36
import javax.sql.rowset.serial.SerialException;
37
import javax.sql.rowset.serial.SerialBlob;
39
public class TestJDBC {
40
public static void main (String args[]) {
41
// insert code here...
42
System.out.println("JDBC BLOB Streaming Test");
43
System.out.println("java.class.path: "+System.getProperty("java.class.path"));
46
// The newInstance() call is a work around for some
47
// broken Java implementations
49
Class.forName("com.mysql.jdbc.Driver").newInstance();
52
catch (Exception ex) {
56
Connection conn = null;
58
conn = DriverManager.getConnection("jdbc:mysql://localhost/test?user=root&password=&enableBlobStreaming=true");
60
execute(conn, "CREATE TABLE IF NOT EXISTS mybs_test_tab (n_id int(11) NOT NULL PRIMARY KEY AUTO_INCREMENT, n_text LONGBLOB) ENGINE=PBXT");
64
catch (SQLException ex) {
66
System.out.println("SQLException: " + ex.getMessage());
67
System.out.println("SQLState: " + ex.getSQLState());
68
System.out.println("VendorError: " + ex.getErrorCode());
71
catch (Exception e1) {
79
catch (Exception e2) {
86
static final int SET_ASCII_STREAM_TEST = 1; // setAsciiStream(int parameterIndex, InputStream x, long length)
87
static final int SET_BINARY_STREAM_TEST = 2; // setBinaryStream(int parameterIndex, InputStream x, long length)
88
static final int SET_BLOB_TEST = 3; // setBlob(int parameterIndex, Blob x)
89
static final int SET_BLOB_STREAM_TEST = 4; // setBlob(int parameterIndex, InputStream inputStream, long length)
90
static final int SET_CHAR_STREAM_TEST = 5; // setCharacterStream(int parameterIndex, Reader reader, long length)
91
static final int SET_CLOB_TEST = 6; // setClob(int parameterIndex, Clob x)
92
static final int SET_CLOB_READER_TEST = 7; // setClob(int parameterIndex, Reader reader, long length)
94
public static class BigByteStream extends InputStream {
98
public BigByteStream(int s) {
111
public int getLength()
117
public static class LocalBlob extends SerialBlob {
118
public LocalBlob(byte[] b) throws SerialException, SQLException
124
public byte[] getBytes(long pos, int length) throws SerialException
126
if (pos >= 1 && length == 0)
128
return super.getBytes(pos, length);
132
public static class StreamBuffer {
134
public byte buffer[];
136
public StreamBuffer(int typ, byte buf[])
142
public InputStream getStream()
144
return new ByteArrayInputStream(buffer);
147
public long getLength()
149
return (long) buffer.length;
152
public Blob getBlob() throws SerialException, SQLException
154
return new LocalBlob(buffer);
157
public char[] getChars()
159
char chars[] = new char[buffer.length];
161
for (int i=0; i<buffer.length; i++)
162
chars[i] = (char) buffer[i];
166
public Clob getClob() throws SerialException, SQLException
168
return new javax.sql.rowset.serial.SerialClob(getChars());
171
public Reader getReader()
173
return new CharArrayReader(getChars());
177
public static class BigDownload extends Thread {
179
Connection conn = null;
182
conn = DriverManager.getConnection("jdbc:mysql://localhost/test?user=root&password=&enableBlobStreaming=true");
184
ResultSet rs = executeQuery(conn, "select n_text from mybs_test_tab order by n_id");
186
Blob blob = rs.getBlob(1);
187
InputStream in = blob.getBinaryStream();
189
for (int i=0; i<blob.length(); i++) {
192
throw new IOException("Error reading stream");
200
catch (SQLException ex) {
202
System.out.println("SQLException: " + ex.getMessage());
203
System.out.println("SQLState: " + ex.getSQLState());
204
System.out.println("VendorError: " + ex.getErrorCode());
205
ex.printStackTrace();
207
catch (Exception e2) {
208
e2.printStackTrace();
215
catch (Exception e3) {
216
e3.printStackTrace();
222
public static void bigDataTest(Connection conn) throws IOException, SQLException
224
System.out.println("BIG DATA TEST:");
226
execute(conn, "delete from mybs_test_tab");
228
Object list[] = new Object[1];
230
for (int i=0; i<10; i++) {
231
list[0] = new BigByteStream(100000);
232
execute(conn, "insert mybs_test_tab(n_text) values(?)", list);
235
ResultSet rs = executeQuery(conn, "select n_text from mybs_test_tab order by n_id");
237
Blob blob = rs.getBlob(1);
238
InputStream in = blob.getBinaryStream();
240
for (int i=0; i<blob.length(); i++) {
243
throw new IOException("Error reading stream");
251
System.out.println("Getting data using multiple threads...");
254
BigDownload t[] = new BigDownload[threads];
256
for (int i=0; i<threads; i++)
257
t[i] = new BigDownload();
260
for (int i=0; i<threads; i++)
262
for (int i=0; i<threads; i++)
265
catch (Exception e) {
269
System.out.println("Done OK");
272
public static void funcTest(Connection conn) throws IOException, SQLException
274
System.out.println("FUNCTIONALITY TEST:");
276
execute(conn, "delete from mybs_test_tab");
278
Object list[] = new Object[1];
280
list[0] = new StreamBuffer(SET_ASCII_STREAM_TEST, "ABC This is a test stream DEF".getBytes());
281
execute(conn, "insert mybs_test_tab(n_text) values(?)", list);
283
list[0] = new StreamBuffer(SET_BINARY_STREAM_TEST, "ABC This is a test stream DEF".getBytes());
284
execute(conn, "insert mybs_test_tab(n_text) values(?)", list);
286
list[0] = new StreamBuffer(SET_BLOB_TEST, "ABC This is a test stream DEF".getBytes());
287
execute(conn, "insert mybs_test_tab(n_text) values(?)", list);
289
list[0] = new StreamBuffer(SET_ASCII_STREAM_TEST, "".getBytes());
290
execute(conn, "insert mybs_test_tab(n_text) values(?)", list);
292
list[0] = new StreamBuffer(SET_BINARY_STREAM_TEST, "".getBytes());
293
execute(conn, "insert mybs_test_tab(n_text) values(?)", list);
295
list[0] = new StreamBuffer(SET_BLOB_TEST, "".getBytes());
296
execute(conn, "insert mybs_test_tab(n_text) values(?)", list);
299
execute(conn, "insert mybs_test_tab(n_text) values(?)", list);
302
execute(conn, "insert mybs_test_tab(n_text) values(?)", list);
305
execute(conn, "insert mybs_test_tab(n_text) values(?)", list);
307
/* Characters are not streamed
308
list[0] = new StreamBuffer(SET_CLOB_TEST, "ABC This is a test stream DEF".getBytes());
309
execute(conn, "insert mybs_test_tab(n_text) values(?)", list);
311
list[0] = new StreamBuffer(SET_CHAR_STREAM_TEST, "ABC This is a test stream DEF".getBytes());
312
execute(conn, "insert mybs_test_tab(n_text) values(?)", list);
316
list[0] = new StreamBuffer(SET_BLOB_STREAM_TEST, "ABC This is a test stream DEF".getBytes());
317
execute(conn, "insert mybs_test_tab(n_text) values(?)", list);
318
list[0] = new StreamBuffer(SET_CLOB_READER_TEST, "ABC This is a test stream DEF".getBytes());
319
execute(conn, "insert mybs_test_tab(n_text) values(?)", list);
322
/* Read the back, in various ways: */
323
System.out.println("getBinaryStream():");
324
execute(conn, "select n_id, n_text from mybs_test_tab order by n_id", GET_BINARY_STREAM_TEST);
325
System.out.println("getString():");
326
execute(conn, "select n_id, n_text from mybs_test_tab order by n_id", GET_STRING_TEST);
327
System.out.println("getBytes():");
328
execute(conn, "select n_id, n_text from mybs_test_tab order by n_id", GET_BYTES_TEST);
329
System.out.println("getAsciiStream():");
330
execute(conn, "select n_id, n_text from mybs_test_tab order by n_id", GET_ASCII_STREAM_TEST);
331
System.out.println("getBlob():");
332
execute(conn, "select n_id, n_text from mybs_test_tab order by n_id", GET_BLOB_TEST);
333
System.out.println("getCharacterStream():");
334
execute(conn, "select n_id, n_text from mybs_test_tab order by n_id", GET_CHAR_STREAM_TEST);
335
System.out.println("getClob():");
336
execute(conn, "select n_id, n_text from mybs_test_tab order by n_id", GET_CLOB_TEST);
337
System.out.println("getObject():");
338
execute(conn, "select n_id, n_text from mybs_test_tab order by n_id", GET_OBJECT_TEST);
341
static final int GET_ASCII_STREAM_TEST = 1;
342
static final int GET_BINARY_STREAM_TEST = 2;
343
static final int GET_BLOB_TEST = 3;
344
static final int GET_BYTES_TEST = 4;
345
static final int GET_CHAR_STREAM_TEST = 5;
346
static final int GET_CLOB_TEST = 6;
347
static final int GET_STRING_TEST = 7;
348
static final int GET_OBJECT_TEST = 8;
350
public static class ByteBuffer {
352
byte buffer[] = null;
354
public void append(int val)
357
buffer = new byte[10];
358
else if (pos == buffer.length) {
361
new_buffer = new byte[buffer.length + 10];
362
System.arraycopy(buffer, 0, new_buffer, 0, buffer.length);
365
buffer[pos] = (byte) val;
375
new_buffer = new byte[pos];
376
System.arraycopy(buffer, 0, new_buffer, 0, pos);
381
public static void printValue(Object obj) throws SQLException, IOException
383
if (obj instanceof Blob) {
385
obj = ((Blob) obj).getBinaryStream();
387
else if (obj instanceof Clob) {
389
obj = ((Clob) obj).getCharacterStream();
392
if (obj instanceof InputStream) {
393
// getAsciiStream, getBinaryStream
394
ByteBuffer buf = new ByteBuffer();
400
ch = ((InputStream) obj).read();
407
((InputStream) obj).close();
409
obj = buf.getBytes();
412
if (obj instanceof Reader) {
413
// getCharacterStream
418
ch = ((Reader) obj).read();
421
System.out.print((char) ch);
426
((Reader) obj).close();
429
else if (obj instanceof String) {
431
System.out.print((String) obj);
433
else if (obj instanceof byte[]) {
435
byte buffer[] = (byte[]) obj;
436
boolean binary = false;
438
for (int i=0; i<buffer.length; i++) {
439
if (buffer[i] != '\n' && buffer[i] != '\r' && !(buffer[i] >= ' ' && buffer[i] <= 127)) {
448
for (int i=0; i<buffer.length; i++) {
450
if (ch >= 0 && ch <= 9)
451
System.out.print((char) ('0' + ch));
453
System.out.print((char) ('A' + ch - 10));
454
ch = buffer[i] & 0xF;
455
if (ch >= 0 && ch <= 9)
456
System.out.print((char) ('0' + ch));
458
System.out.print((char) ('A' + ch - 10));
462
for (int i=0; i<buffer.length; i++)
463
System.out.print((char) buffer[i]);
467
else if (obj == null)
468
System.out.print("null");
470
System.out.print(obj.toString());
474
public static void printResult(ResultSet rs, int testType) throws SQLException, IOException, SerialException
476
ResultSetMetaData md = rs.getMetaData();
477
int colsPerRow = md.getColumnCount();
483
for (int i=1; i<=colsPerRow; i++) {
484
String name = md.getTableName(i)+"."+md.getColumnName(i);
485
String type = md.getColumnTypeName(i) + "(" + md.getScale(i) + "," + md.getPrecision(i) + ")";
486
System.out.print(i + "> " + name + " " + type + " ");
490
case GET_ASCII_STREAM_TEST:
491
obj = rs.getAsciiStream(i);
493
case GET_BINARY_STREAM_TEST:
494
obj = rs.getBinaryStream(i);
500
obj = rs.getBytes(i);
502
case GET_CHAR_STREAM_TEST:
503
obj = rs.getCharacterStream(i);
508
case GET_STRING_TEST:
509
obj = rs.getString(i);
511
case GET_OBJECT_TEST:
512
obj = rs.getObject(i);
516
System.out.println("NULL");
519
System.out.println("");
522
System.out.println("");
524
System.out.println("Row count = "+rows);
527
public static void printAll(Statement stat, boolean more, int testType) throws SQLException, IOException, SerialException
533
rs = stat.getResultSet();
534
printResult(rs, testType);
537
int cnt = stat.getUpdateCount();
540
System.out.println("Update count = " + cnt);
542
more = stat.getMoreResults();
544
System.out.println("Execution completed.");
547
public static void execute(Connection conn, String sql, int testType, Object list[]) throws SQLException, IOException, SerialException
553
st = conn.createStatement();
554
more = st.execute(sql);
557
PreparedStatement ps;
559
ps = conn.prepareStatement(sql);
560
for (int i=0; i<list.length; i++) {
561
if (list[i] instanceof StreamBuffer) {
562
StreamBuffer bb = (StreamBuffer) list[i];
564
case SET_ASCII_STREAM_TEST:
565
ps.setAsciiStream(i+1, bb.getStream(), (int) bb.getLength());
567
case SET_BINARY_STREAM_TEST:
568
ps.setBinaryStream(i+1, bb.getStream(), (int) bb.getLength());
571
ps.setBlob(i+1, bb.getBlob());
573
case SET_BLOB_STREAM_TEST:
574
//ps.setBlob(i+1, bb.getStream(), bb.getLength());
576
case SET_CHAR_STREAM_TEST:
577
ps.setCharacterStream(i+1, bb.getReader(), (int) bb.getLength());
580
ps.setClob(i+1, bb.getClob());
582
case SET_CLOB_READER_TEST:
583
//ps.setClob(i+1, bb.getReader(), bb.getLength());
587
else if (list[i] instanceof BigByteStream)
588
ps.setBinaryStream(i+1, (InputStream) list[i], (int) ((BigByteStream) list[i]).getLength());
590
ps.setObject(i+1, list[i]);
597
printAll(st, more, testType);
604
public static void insertBLOB(Connection conn, InputStream in, int length) throws SQLException
606
PreparedStatement ps = null;
609
ps = conn.prepareStatement("insert mybs_test_tab(n_text) values(?)");
610
ps.setBinaryStream(1, in, (int) length);
611
int rows = ps.executeUpdate();
619
public static InputStream selectBLOB(Connection conn, int id) throws SQLException
622
InputStream in = null;
625
st = conn.createStatement();
626
ResultSet rs = st.executeQuery("select n_text from mybs_test_tab where n_id = "+id);
628
in = rs.getBinaryStream("n_text");
637
public static void execute(Connection conn, String sql, int testType) throws SQLException, IOException
639
execute(conn, sql, testType, null);
642
public static void execute(Connection conn, String sql) throws SQLException, IOException
644
execute(conn, sql, GET_STRING_TEST, null);
647
public static void execute(Connection conn, String sql, Object list[]) throws SQLException, IOException
649
execute(conn, sql, GET_STRING_TEST, list);
652
public static ResultSet executeQuery(Connection conn, String sql) throws SQLException, IOException
656
st = conn.createStatement();
657
return st.executeQuery(sql);