public void start(Future<Void> fut) { vertx .createHttpServer() .requestHandler(r -> { LocalDateTime start = LocalDateTime.now(); System.out.println("Request received - "+start.format(DateTimeFormatter.ISO_DATE_TIME)); final MyModel model = new MyModel(); try { for(int i=0;i<10000000;i++){ //some simple operation } model.data = start.format(DateTimeFormatter.ISO_DATE_TIME) +" - "+LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME); } catch (Exception e1) { // TODO Auto-generated catch block e1.printStackTrace(); } r.response().end( new Gson().toJson(model) ); }) .listen(4568,result -> { if (result.succeeded()) { fut.complete(); } else { fut.fail(result.cause()); } }); System.out.println("Server started .."); }
>我只是想模拟一个长时间运行的请求处理程序来了解这个模型的工作原理.
>我观察到的是所谓的事件循环被阻塞,直到我的第一个请求完成.无论什么时间需要,后续请求不会被执行,直到前一个完成.
显然,我在这里错过了一件,这就是我在这里的问题.
根据答案编辑到目前为止:
>不接受被认为是异步的所有请求?如果一个新的
只有当前一个清除时才能接受连接
关闭,如何异步?
>假设典型的请求需要100 ms到1 sec之间(根据请求的种类和性质).所以这意味着
事件循环直到先前的请求才能接受新的连接
完成(即使它在一秒钟内风起来).如果我是一个程序员
必须考虑所有这些,并将这些请求处理程序推送到
工作线程,那么它如何与线程/连接有所不同
模型?
>我只是想了解传统线程/连接服务器型号的这个模型是如何更好的?假设没有I / O操作或
所有的I / O操作是异步处理的?它甚至如何解决
c10k问题,当它不能并行启动所有并发请求,并且必须等到上一个终止?
>即使我决定将所有这些操作推送到工作线程(pooled),那么我回到同样的问题呢?上下文切换线程?
编辑并填补这个问题的赏金
>不完全明白这个模型是如何声称异步的.
> Vert.x有一个异步JDBC客户端(Asyncronous是关键字),我试图用RXJava进行调整.
>这里是代码示例(相关部分)
server.requestStream().toObservable().subscribe(req – > {
LocalDateTime start = LocalDateTime.now(); System.out.println("Request for " + req.absoluteURI() +" received - " +start.format(DateTimeFormatter.ISO_DATE_TIME)); jdbc.getConnectionObservable().subscribe( conn -> { // Now chain some statements using flatmap composition Observable<ResultSet> resa = conn.queryObservable("SELECT * FROM CALL_OPTION WHERE UNDERLYING='NIFTY'"); // Subscribe to the final result resa.subscribe(resultSet -> { req.response().end(resultSet.getRows().toString()); System.out.println("Request for " + req.absoluteURI() +" Ended - " +LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME)); },err -> { System.out.println("Database problem"); err.printStackTrace(); }); },// Could not connect err -> { err.printStackTrace(); } ); }); server.listen(4568);
>选择查询需要3秒钟,大约返回完整的表转储.
>当我启动并发请求(仅使用2)时,我看到第二个请求完全等待第一个请求完成.
>如果JDBC select是异步的,在等待select查询返回任何东西的时候,框架是否处理第二个连接是不是一个很好的期望?
解决方法
编辑您的更新:
首先,当您处理事件循环时,主线程对于所有请求都应该很快地工作.你不应该在这个循环中做任何长工作.当然你也不应该等到你对数据库的回应.
– 异步调度一个调用
– 将回调(处理程序)分配给结果
– 回调将在工作线程,非事件循环线程中执行.例如,此回调将返回对套接字的响应.
所以,你在事件循环中的操作应该只是调用所有异步操作与回调,并转到下一个请求,而不等待任何结果.
Assume a typical request takes anywhere between 100 ms to 1 sec (based on the kind and nature of the request).
在这种情况下,您的请求有一些计算昂贵的部分或访问IO – 您的事件循环中的代码不应该等待此操作的结果.
I’m just trying to understand how is this model better from a traditional thread/conn server models? Assume there is no I/O op or all the I/O op are handled asynchronously?
当您有太多的并发请求和传统的编程模型时,您将根据每个请求进行线程.这个线程会做什么?他们将主要等待IO操作(例如,数据库的结果).浪费资源在我们的事件循环模型中,您有一个主线程用于计划操作,并为长任务预先分配了工作线程数量.没有一个这个工作人员实际上等待响应,他们只需执行另一个代码,同时等待IO结果(可以实现为当前正在进行的IO作业的回调或定期检查状态).我建议您通过Java NIO和Java NIO 2来了解如何在框架内实际执行此异步IO. Green threads也是非常相关的概念,那将是很好的理解.绿色线程和协同程序是一种阴影事件循环,试图实现相同的事情 – 少线程,因为我们可以重用系统线程,而绿色线程等待某事.
How does it even solve c10k problem,when it can’t start all concurrent requests parallel and have to wait till the prevIoUs one terminates?
确定我们不要等待主线程发送先前请求的响应.获取请求,安排长/ IO任务执行,下次请求.
Even if I decide to push all these operations to a worker thread(pooled),then I’m back to the same problem isn’t it? Context switching between threads?
如果你使一切正确 – 不.更重要的是,您将获得良好的数据位置和执行流预测.一个cpu内核将执行您的短事件循环并调度异步工作,无需上下文切换,而无需再进行任何操作.其他核心调用数据库并返回响应,只有这样.在回调之间切换或检查不同通道的IO状态实际上不需要任何系统线程的上下文切换 – 它实际上在一个工作线程中工作.因此,我们每个核心都有一个工作线程,这个系统线程等待/检查从多个连接到数据库的结果可用性.回顾Java NIO概念,了解它如何以这种方式工作. (NIO代理服务器的典型示例 – 可以接受多个并行连接(数千个)的代理服务器,向其他一些远程服务器发送代理请求,收听响应并将响应发送回客户端,所有这些使用一个或两个线程)
关于你的代码,我为你做了一个样例project来证明一切都符合预期:
public class MyFirstVerticle extends AbstractVerticle { @Override public void start(Future<Void> fut) { JDBCClient client = JDBCClient.createShared(vertx,new JsonObject() .put("url","jdbc:hsqldb:mem:test?shutdown=true") .put("driver_class","org.hsqldb.jdbcDriver") .put("max_pool_size",30)); client.getConnection(conn -> { if (conn.Failed()) {throw new RuntimeException(conn.cause());} final sqlConnection connection = conn.result(); // create a table connection.execute("create table test(id int primary key,name varchar(255))",create -> { if (create.Failed()) {throw new RuntimeException(create.cause());} }); }); vertx .createHttpServer() .requestHandler(r -> { int requestId = new Random().nextInt(); System.out.println("Request " + requestId + " received"); client.getConnection(conn -> { if (conn.Failed()) {throw new RuntimeException(conn.cause());} final sqlConnection connection = conn.result(); connection.execute("insert into test values ('" + requestId + "','World')",insert -> { // query some data with arguments connection .queryWithParams("select * from test where id = ?",new JsonArray().add(requestId),rs -> { connection.close(done -> {if (done.Failed()) {throw new RuntimeException(done.cause());}}); System.out.println("Result " + requestId + " returned"); r.response().end("Hello"); }); }); }); }) .listen(8080,result -> { if (result.succeeded()) { fut.complete(); } else { fut.fail(result.cause()); } }); } } @RunWith(VertxUnitRunner.class) public class MyFirstVerticleTest { private Vertx vertx; @Before public void setUp(TestContext context) { vertx = Vertx.vertx(); vertx.deployVerticle(MyFirstVerticle.class.getName(),context.asyncAssertSuccess()); } @After public void tearDown(TestContext context) { vertx.close(context.asyncAssertSuccess()); } @Test public void testMyApplication(TestContext context) { for (int i = 0; i < 10; i++) { final Async async = context.async(); vertx.createHttpClient().getNow(8080,"localhost","/",response -> response.handler(body -> { context.assertTrue(body.toString().contains("Hello")); async.complete(); }) ); } } }
输出:
Request 1412761034 received Request -1781489277 received Request 1008255692 received Request -853002509 received Request -919489429 received Request 1902219940 received Request -2141153291 received Request 1144684415 received Request -1409053630 received Request -546435082 received Result 1412761034 returned Result -1781489277 returned Result 1008255692 returned Result -853002509 returned Result -919489429 returned Result 1902219940 returned Result -2141153291 returned Result 1144684415 returned Result -1409053630 returned Result -546435082 returned
所以,我们接受请求 – 调度请求到数据库,去下一个请求,我们消耗所有它们,并发送每个请求的响应只有当所有的数据库完成.
关于你的代码示例我看到两个可能的问题 – 首先,看起来你不关闭()连接,这是重要的返回到池.二,池如何配置?如果只有一个免费连接 – 这个请求将序列化等待这个连接.
我建议您添加一些打印时间戳的两个请求,以查找序列化的位置.你把一些让事件循环阻塞的调用.或者…检查您在测试中并发发送请求.以前没有得到响应之后.