<dependency> <groupId>io.vertx</groupId> <artifactId>vertx-mysql-client</artifactId> <version>3.8.0</version> </dependency>
Reactive MySQL Client是MySQL的客户端,具备直观的API,侧重于可伸缩性和低开销。javascript
特征html
事件驱动java
轻量级mysql
内置链接池sql
准备好的查询缓存api
游标支持缓存
行流服务器
RxJava 1和RxJava 2架构
将内存直接写入对象而不须要没必要要app
Java 8日期和时间
MySQL实用程序命令支持
兼容MySQL 5.6和5.7
要使用Reactive MySQL Client,请将如下依赖项添加到构建描述符的dependencies部分:
Maven(在你的pom.xml
):
<dependency> <groupId>io.vertx</groupId> <artifactId>vertx-mysql-client</artifactId> <version>3.8.0</version> </dependency>
Gradle(在您的build.gradle
文件中):
dependencies {
compile 'io.vertx:vertx-mysql-client:3.8.0' }
这是链接,查询和断开链接的最简单方法
MySQLConnectOptions connectOptions = new MySQLConnectOptions() .setPort(3306) .setHost("the-host") .setDatabase("the-db") .setUser("user") .setPassword("secret"); // Pool options PoolOptions poolOptions = new PoolOptions() .setMaxSize(5); // Create the client pool MySQLPool client = MySQLPool.pool(connectOptions, poolOptions); // A simple query client.query("SELECT * FROM users WHERE id='julien'", ar -> { if (ar.succeeded()) { RowSet result = ar.result(); System.out.println("Got " + result.size() + " rows "); } else { System.out.println("Failure: " + ar.cause().getMessage()); } // Now close the pool client.close(); });
大多数状况下,您将使用池链接到MySQL:
MySQLConnectOptions connectOptions = new MySQLConnectOptions() .setPort(3306) .setHost("the-host") .setDatabase("the-db") .setUser("user") .setPassword("secret"); // Pool options PoolOptions poolOptions = new PoolOptions() .setMaxSize(5); // Create the pooled client MySQLPool client = MySQLPool.pool(connectOptions, poolOptions);
池化客户端使用链接池,任何操做都将从池中借用链接以执行操做并将其释放到池中。
若是您使用Vert.x运行,能够将Vertx实例传递给它:
MySQLConnectOptions connectOptions = new MySQLConnectOptions() .setPort(3306) .setHost("the-host") .setDatabase("the-db") .setUser("user") .setPassword("secret"); // Pool options PoolOptions poolOptions = new PoolOptions() .setMaxSize(5); // Create the pooled client MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions);
您须要在再也不须要时释放池:
pool.close();
当您须要在同一链接上执行多个操做时,您须要使用客户端 connection
。
您能够从游泳池轻松得到一个:
MySQLConnectOptions connectOptions = new MySQLConnectOptions() .setPort(3306) .setHost("the-host") .setDatabase("the-db") .setUser("user") .setPassword("secret"); // Pool options PoolOptions poolOptions = new PoolOptions() .setMaxSize(5); // Create the pooled client MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions); // Get a connection from the pool client.getConnection(ar1 -> { if (ar1.succeeded()) { System.out.println("Connected"); // Obtain our connection SqlConnection conn = ar1.result(); // All operations execute on the same connection conn.query("SELECT * FROM users WHERE id='julien'", ar2 -> { if (ar2.succeeded()) { conn.query("SELECT * FROM users WHERE id='emad'", ar3 -> { // Release the connection to the pool conn.close(); }); } else { // Release the connection to the pool conn.close(); } }); } else { System.out.println("Could not connect: " + ar1.cause().getMessage()); } });
完成链接后,必须将其关闭以将其释放到池中,以即可以重复使用。
您可使用多种方法来配置客户端。
配置客户端的一种简单方法是指定MySQLConnectOptions
数据对象。
MySQLConnectOptions connectOptions = new MySQLConnectOptions() .setPort(3306) .setHost("the-host") .setDatabase("the-db") .setUser("user") .setPassword("secret"); // Pool Options PoolOptions poolOptions = new PoolOptions().setMaxSize(5); // Create the pool from the data object MySQLPool pool = MySQLPool.pool(vertx, connectOptions, poolOptions); pool.getConnection(ar -> { // Handling your connection });
您还可使用setProperties
或addProperty
方法配置链接属性。注意setProperties
将覆盖默认的客户端属性。
MySQLConnectOptions connectOptions = new MySQLConnectOptions(); // Add a connection attribute connectOptions.addProperty("_java_version", "1.8.0_212"); // Override the attributes Map<String, String> attributes = new HashMap<>(); attributes.put("_client_name", "myapp"); attributes.put("_client_version", "1.0.0"); connectOptions.setProperties(attributes);
有关客户端链接属性的更多信息,请参阅MySQL参考手册。
除了使用MySQLConnectOptions
数据对象进行配置以外,当您要使用链接URI进行配置时,咱们还为您提供了另外一种链接方式:
String connectionUri = "mysql://dbuser:secretpassword@database.server.com:3211/mydb"; // Create the pool from the connection URI MySQLPool pool = MySQLPool.pool(connectionUri); // Create the connection from the connection URI MySQLConnection.connect(vertx, connectionUri, res -> { // Handling your connection });
有关链接字符串格式的更多信息,请参阅MySQL参考手册。
目前,客户端支持链接uri中的如下参数关键字
主办
港口
用户
密码
模式
插座
当您不须要事务或运行单个查询时,您能够直接在池上运行查询; 池将使用其中一个链接来运行查询并将结果返回给您。
如下是如何运行简单查询:
client.query("SELECT * FROM users WHERE id='julien'", ar -> { if (ar.succeeded()) { RowSet result = ar.result(); System.out.println("Got " + result.size() + " rows "); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });
您可使用准备好的查询执行相同操做。
SQL字符串能够经过位置参考参数,使用$1
,$2
等...
client.preparedQuery("SELECT * FROM users WHERE id=?", Tuple.of("julien"), ar -> { if (ar.succeeded()) { RowSet rows = ar.result(); System.out.println("Got " + rows.size() + " rows "); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });
查询方法提供了一个RowSet
适用于SELECT查询的异步实例
client.preparedQuery("SELECT first_name, last_name FROM users", ar -> { if (ar.succeeded()) { RowSet rows = ar.result(); for (Row row : rows) { System.out.println("User " + row.getString(0) + " " + row.getString(1)); } } else { System.out.println("Failure: " + ar.cause().getMessage()); } });
或UPDATE / INSERT查询:
client.preparedQuery("INSERT INTO users (first_name, last_name) VALUES (?, ?)", Tuple.of("Julien", "Viet"), ar -> { if (ar.succeeded()) { RowSet rows = ar.result(); System.out.println(rows.rowCount()); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });
将Row
让你经过索引访问您的数据
System.out.println("User " + row.getString(0) + " " + row.getString(1));
或按名称
System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));
您能够访问各类类型
String firstName = row.getString("first_name"); Boolean male = row.getBoolean("male"); Integer age = row.getInteger("age");
您能够执行准备批处理
您能够缓存准备好的查询:
您能够在查询中使用“RETURNING”子句获取生成的键:
当您须要执行顺序查询(没有事务)时,您能够建立新链接或从池中借用一个链接:
pool.getConnection(ar1 -> {
if (ar1.succeeded()) { SqlConnection connection = ar1.result(); connection.query("SELECT * FROM users WHERE id='julien'", ar2 -> { if (ar1.succeeded()) { connection.query("SELECT * FROM users WHERE id='paulo'", ar3 -> { // Do something with rows and return the connection to the pool connection.close(); }); } else { // Return the connection to the pool connection.close(); } }); } });
能够建立准备好的查询:
connection.prepare("SELECT * FROM users WHERE first_name LIKE ?", ar1 -> { if (ar1.succeeded()) { PreparedQuery pq = ar1.result(); pq.execute(Tuple.of("julien"), ar2 -> { if (ar2.succeeded()) { // All rows RowSet rows = ar2.result(); } }); } });
注意
|
准备好的查询缓存取决于setCachePreparedStatements 而且不依赖于您是建立准备好的查询仍是使用direct prepared queries |
PreparedQuery
能够执行有效的批处理:
默认状况下,准备好的查询执行会获取全部行,您可使用a Cursor
来控制要读取的行数:
connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> { if (ar1.succeeded()) { PreparedQuery pq = ar1.result(); // Create a cursor Cursor cursor = pq.cursor(Tuple.of(18)); // Read 50 rows cursor.read(50, ar2 -> { if (ar2.succeeded()) { RowSet rows = ar2.result(); // Check for more ? if (cursor.hasMore()) { // Repeat the process... } else { // No more rows - close the cursor cursor.close(); } } }); } });
PostreSQL在事务结束时销毁游标,所以游标API将在事务中使用,不然您可能会收到34000
PostgreSQL错误。
游标过早释放时应关闭:
cursor.read(50, ar2 -> { if (ar2.succeeded()) { // Close the cursor cursor.close(); } });
流API也可用于游标,这能够更方便,特别是使用Rxified版本。
connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> { if (ar1.succeeded()) { PreparedQuery pq = ar1.result(); // Fetch 50 rows at a time RowStream<Row> stream = pq.createStream(50, Tuple.of(18)); // Use the stream stream.exceptionHandler(err -> { System.out.println("Error: " + err.getMessage()); }); stream.endHandler(v -> { System.out.println("End of stream"); }); stream.handler(row -> { System.out.println("User: " + row.getString("last_name")); }); } });
流按批次读取行50
并流式传输,当行已传递给处理程序时,将50
读取新批次,依此类推。
流能够恢复或暂停,加载的行将保留在内存中,直到它们被传递而且光标将中止迭代。
目前,客户端支持如下MySQL类型
BOOL,BOOLEAN(java.lang.Byte
)
TINYINT(java.lang.Byte
)
SMALLINT(java.lang.Short
)
MEDIUMINT(java.lang.Integer
)
INT,INTEGER(java.lang.Integer
)
BIGINT(java.lang.Long
)
FLOAT(java.lang.Float
)
DOUBLE(java.lang.Double
)
NUMERIC(io.vertx.sqlclient.data.Numeric
)
日期(java.time.LocalDate
)
DATETIME(java.time.LocalDateTime
)
时间(java.time.Duration
)
TIMESTAMP(java.time.LocalDateTime
)
年(java.lang.Short
)
CHAR(java.lang.String
)
VARCHAR(java.lang.String
)
BINARY(io.vertx.core.buffer.Buffer
)
VARBINARY(io.vertx.core.buffer.Buffer
)
TINYBLOB(io.vertx.core.buffer.Buffer
)
TINYTEXT(java.lang.String
)
BLOB(io.vertx.core.buffer.Buffer
)
文字(java.lang.String
)
MEDIUMBLOB(io.vertx.core.buffer.Buffer
)
MEDIUMTEXT(java.lang.String
)
LONGBLOB(io.vertx.core.buffer.Buffer
)
LONGTEXT(java.lang.String
)
元组解码在存储值时使用上述类型
在MySQL中BOOLEAN
,BOOL
数据类型是同义词TINYINT(1)
。零值被视为假,非零值被视为真。一个BOOLEAN
数据类型值存储在Row
或Tuple
做为java.lang.Byte
类型,你能够调用Row#getValue
来检索它的java.lang.Byte
值,或者能够称之为Row#getBoolean
检索它java.lang.Boolean
的价值。
client.query("SELECT graduated FROM students WHERE id = 0", ar -> { if (ar.succeeded()) { RowSet rowSet = ar.result(); for (Row row : rowSet) { int pos = row.getColumnIndex("graduated"); Byte value = row.get(Byte.class, pos); Boolean graduated = row.getBoolean("graduated"); } } else { System.out.println("Failure: " + ar.cause().getMessage()); } });
若是要使用BOOLEAN
值的参数执行预准备语句,只需将该java.lang.Boolean
值添加到参数列表便可。
client.preparedQuery("UPDATE students SET graduated = ? WHERE id = 0", Tuple.of(true), ar -> { if (ar.succeeded()) { System.out.println("Updated with the boolean value"); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });
该Numeric
Java类型用于表示MySQL的NUMERIC
类型。
Numeric numeric = row.get(Numeric.class, 0); if (numeric.isNaN()) { // Handle NaN } else { BigDecimal value = numeric.bigDecimalValue(); }
您能够将Java收集器与查询API一块儿使用:
Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap( row -> row.getLong("id"), row -> row.getString("last_name")); // Run the query with the collector client.query("SELECT * FROM users", collector, ar -> { if (ar.succeeded()) { SqlResult<Map<Long, String>> result = ar.result(); // Get the map created by the collector Map<Long, String> map = result.value(); System.out.println("Got " + map); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });
收集器处理不能保留对它的引用,Row
由于有一行用于处理整个集合。
Java Collectors
提供了许多有趣的预约义收集器,例如,您能够直接从行集建立一个字符串:
Collector<Row, ?, String> collector = Collectors.mapping( row -> row.getString("last_name"), Collectors.joining(",", "(", ")") ); // Run the query with the collector client.query("SELECT * FROM users", collector, ar -> { if (ar.succeeded()) { SqlResult<String> result = ar.result(); // Get the string created by the collector String list = result.value(); System.out.println("Got " + list); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });
有时您想使用MySQL实用程序命令,咱们为此提供支持。能够在MySQL实用程序命令中找到更多信息。
您可使用COM_PING
命令检查服务器是否处于活动状态。若是服务器响应PING,将通知处理程序,不然将永远不会调用处理程序。
connection.ping(ar -> {
System.out.println("The server has responded to the PING"); });
您可使用COM_RESET_CONNECTION
命令重置会话状态,这将重置链接状态,如: - 用户变量 - 临时表 - 预准备语句
connection.resetConnection(ar -> {
if (ar.succeeded()) { System.out.println("Connection has been reset now"); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });
您能够更改当前链接的用户,这将执行从新身份验证并重置链接状态COM_RESET_CONNECTION
。
MySQLConnectOptions authenticationOptions = new MySQLConnectOptions() .setUser("newuser") .setPassword("newpassword") .setDatabase("newdatabase"); connection.changeUser(authenticationOptions, ar -> { if (ar.succeeded()) { System.out.println("User of current connection has been changed."); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });
您可使用COM_INIT_DB
命令更改链接的默认架构。
connection.specifySchema("newschema", ar -> { if (ar.succeeded()) { System.out.println("Default schema changed to newschema"); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });
您可使用COM_STATISTICS
命令在MySQL服务器中获取一些人类可读的内部状态变量字符串。
connection.getInternalStatistics(ar -> {
if (ar.succeeded()) { System.out.println("Statistics: " + ar.result()); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });
您可使用COM_DEBUG
命令将调试信息转储到MySQL服务器的STDOUT。
connection.debug(ar -> {
if (ar.succeeded()) { System.out.println("Debug info dumped to server's STDOUT"); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });
您可使用COM_SET_OPTION
命令为当前链接设置选项。目前只能CLIENT_MULTI_STATEMENTS
设置。
例如,您能够CLIENT_MULTI_STATEMENTS
使用此命令禁用。
connection.setOption(MySQLSetOption.MYSQL_OPTION_MULTI_STATEMENTS_OFF, ar -> { if (ar.succeeded()) { System.out.println("CLIENT_MULTI_STATEMENTS is off now"); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });