Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,3 @@
path = vendor/liburing
url = https://github.com/axboe/liburing

[submodule "vendor/libressl"]
path = vendor/libressl
url = https://github.com/libressl/portable
9 changes: 9 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
source 'https://gem.coop'

gemspec

group :development do
gem 'localhost'
gem 'rake-compiler', '~>1.3.0'
gem 'minitest', '~>6.0.1'
gem 'benchmark'
gem 'benchmark-ips'
gem 'http_parser.rb', '~>0.8.0'
end
30 changes: 21 additions & 9 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,29 @@

- Fix all futex value (Queue, Mutex) to be aligned

## Sidecar thread
## OpenSSL

The sidecar thread is an auxiliary thread that is used to wait for CQEs. It
calls `io_uring_wait_cqe` (or equivalent lower-level interface) in a loop, and
each time a CQE is available, it signals this to the primary UringMachine
thread (using a futex).
Since the custom BIO PR I submitted will probably not be accepted, an
alternative plan of action is:

The primary UringMachine thread runs fibers from the runqueue. When the runqueue
is exhausted, it performs a `io_uring_submit` for unsubmitted ops. It then waits
for the futex to become signalled (non-zero), and then processes all available
completions.
- Add UM API for setting up a custom BIO for an SSL connection. Example usage:

```ruby
ssl = OpenSSL::SSL::SSLSocket.open("127.0.0.1", 1234)
@machine.ssl_set_bio(ssl)
```

In this context, since the SSLSocket object wraps an `SSL` C struct, we can
simply use `RTYPEDDATA_GET_DATA` to get at the underlying SSL C struct, and
install the custom BIO.

- We can also add APIs for directly invoking `SSL_read` and `SSL_write` with the
custom BIO:

```ruby
len_received = @machine.ssl_read(ssl, (buf = +''), 8192)
len_sent = @machine.ssl_write(ssl, buf, buf.bytesize)
```

## Buffer rings - automatic management

Expand Down
2 changes: 2 additions & 0 deletions benchmark/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
gem 'async'
gem 'pg'
gem 'gvltools'
gem 'openssl'
gem 'localhost'
end

require 'uringmachine/fiber_scheduler'
Expand Down
77 changes: 77 additions & 0 deletions benchmark/openssl.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# frozen_string_literal: true

require 'bundler/inline'

gemfile do
source 'https://rubygems.org'
gem 'uringmachine', path: '..'
gem 'benchmark'
gem 'benchmark-ips'
gem 'openssl'
gem 'localhost'
end

require 'uringmachine'
require 'benchmark/ips'
require 'openssl'
require 'localhost/authority'

authority = Localhost::Authority.fetch
ctx = authority.server_context
ctx.verify_mode = OpenSSL::SSL::VERIFY_NONE

Socket.do_not_reverse_lookup = true
tcps = TCPServer.new("127.0.0.1", 0)
port = tcps.connect_address.ip_port

ssls = OpenSSL::SSL::SSLServer.new(tcps, ctx)

Thread.new do
Thread.current.report_on_exception = false
loop do
begin
ssl = ssls.accept
rescue OpenSSL::SSL::SSLError, IOError, Errno::EBADF, Errno::EINVAL,
Errno::ECONNABORTED, Errno::ENOTSOCK, Errno::ECONNRESET
retry
end

Thread.new do
Thread.current.report_on_exception = false

begin
while line = ssl.gets
ssl.write(line)
end
ensure
ssl.close
end
true
end
end
end

@ssl_stock = OpenSSL::SSL::SSLSocket.open("127.0.0.1", port)
@ssl_stock.sync_close = true
@ssl_stock.connect

um = UM.new

@ssl_um = OpenSSL::SSL::SSLSocket.open("127.0.0.1", port)
@ssl_um.sync_close = true
um.ssl_set_bio(@ssl_um)
@ssl_um.connect

@msg = 'abc' * 1000

def do_io(ssl)
ssl.puts @msg
ssl.gets
end

Benchmark.ips do |x|
x.report('stock') { do_io(@ssl_stock) }
x.report('UM BIO') { do_io(@ssl_um) }

x.compare!(order: :baseline)
end
112 changes: 112 additions & 0 deletions benchmark/openssl_socketpair.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# frozen_string_literal: true

require 'bundler/inline'

gemfile do
source 'https://rubygems.org'
gem 'uringmachine', path: '..'
gem 'benchmark'
gem 'io-event'
gem 'async'
gem 'pg'
gem 'gvltools'
gem 'openssl'
gem 'localhost'
end

require 'uringmachine/fiber_scheduler'
require 'socket'
require 'openssl'
require 'localhost/authority'

GROUPS = 48
ITERATIONS = 10000

SIZE = 1 << 14
DATA = '*' * SIZE

AUTHORITY = Localhost::Authority.fetch

def do_io(um)
GROUPS.times do
r, w = Socket.socketpair(:AF_UNIX, :SOCK_STREAM, 0)
r.sync = true
w.sync = true
ctx = AUTHORITY.server_context
ctx.verify_mode = OpenSSL::SSL::VERIFY_NONE
s1 = OpenSSL::SSL::SSLSocket.new(r, ctx)
s1.sync_close = true
um.ssl_set_bio(s1) if um
Fiber.schedule do
s1.accept
ITERATIONS.times { s1.readpartial(SIZE) }
s1.close rescue nil
rescue => e
p e
p e.backtrace
end

s2 = OpenSSL::SSL::SSLSocket.new(w, OpenSSL::SSL::SSLContext.new)
s2.sync_close = true
um.ssl_set_bio(s2) if um
Fiber.schedule do
s2.connect
ITERATIONS.times { s2.write(DATA) }
s2.close rescue nil
rescue => e
p e
p e.backtrace
end
end
end

def do_um_io(um)
GROUPS.times do
r, w = Socket.socketpair(:AF_UNIX, :SOCK_STREAM, 0)
r.sync = true
w.sync = true
ctx = AUTHORITY.server_context
ctx.verify_mode = OpenSSL::SSL::VERIFY_NONE
s1 = OpenSSL::SSL::SSLSocket.new(r, ctx)
s1.sync_close = true
um.ssl_set_bio(s1)
Fiber.schedule do
s1.accept
ITERATIONS.times { um.ssl_read(s1, +'', SIZE) }
s1.close rescue nil
rescue => e
p e
p e.backtrace
end

s2 = OpenSSL::SSL::SSLSocket.new(w, OpenSSL::SSL::SSLContext.new)
s2.sync_close = true
um.ssl_set_bio(s2)
Fiber.schedule do
s2.connect
ITERATIONS.times { um.ssl_write(s2, DATA, DATA.bytesize) }
s2.close rescue nil
rescue => e
p e
p e.backtrace
end
end
end

def run(custom_bio, um_io)
machine = UM.new
scheduler = UM::FiberScheduler.new(machine)
Fiber.set_scheduler(scheduler)
if um_io
do_um_io(machine)
else
do_io(custom_bio ? machine : nil)
end
scheduler.join
end

Benchmark.bm do |b|
b.report("stock") { run(false, false) }
b.report("UM BIO") { run(true, false) }
b.report("UM I/O") { run(true, true) }
end
15 changes: 15 additions & 0 deletions ext/um/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,21 @@ def get_config
raise "Couldn't find liburing.a"
end

if !have_header("openssl/ssl.h")
raise "Couldn't find OpenSSL headers"
end

if !have_library("ssl", "SSL_new")
raise "Couldn't find OpenSSL library"
end

version_ok = checking_for("OpenSSL version >= 1.1.1") {
try_static_assert("OPENSSL_VERSION_NUMBER >= 0x10101000L", "openssl/opensslv.h")
}
unless version_ok
raise "OpenSSL >= 1.1.1 or LibreSSL >= 3.9.0 is required"
end

have_func("&rb_process_status_new")

$defs << "-DUM_KERNEL_VERSION=#{config[:kernel_version]}"
Expand Down
16 changes: 16 additions & 0 deletions ext/um/um.c
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,22 @@ VALUE um_write(struct um *machine, int fd, VALUE buffer, size_t len, __u64 file_
return ret;
}

size_t um_write_raw(struct um *machine, int fd, const char *buffer, size_t maxlen) {
struct um_op op;
um_prep_op(machine, &op, OP_WRITE, 0);
struct io_uring_sqe *sqe = um_get_sqe(machine, &op);
io_uring_prep_write(sqe, fd, buffer, maxlen, 0);

VALUE ret = um_yield(machine);

if (um_check_completion(machine, &op))
return op.result.res;

RAISE_IF_EXCEPTION(ret);
RB_GC_GUARD(ret);
return 0;
}

VALUE um_writev(struct um *machine, int fd, int argc, VALUE *argv) {
__u64 file_offset = -1;
if (TYPE(argv[argc - 1]) == T_FIXNUM) {
Expand Down
5 changes: 5 additions & 0 deletions ext/um/um.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ VALUE um_read(struct um *machine, int fd, VALUE buffer, size_t maxlen, ssize_t b
size_t um_read_raw(struct um *machine, int fd, char *buffer, size_t maxlen);
VALUE um_read_each(struct um *machine, int fd, int bgid);
VALUE um_write(struct um *machine, int fd, VALUE buffer, size_t len, __u64 file_offset);
size_t um_write_raw(struct um *machine, int fd, const char *buffer, size_t maxlen);
VALUE um_writev(struct um *machine, int fd, int argc, VALUE *argv);
VALUE um_write_async(struct um *machine, int fd, VALUE buffer, size_t len, __u64 file_offset);
VALUE um_close(struct um *machine, int fd);
Expand Down Expand Up @@ -368,4 +369,8 @@ void um_sidecar_teardown(struct um *machine);
void um_sidecar_signal_wait(struct um *machine);
void um_sidecar_signal_wake(struct um *machine);

void um_ssl_set_bio(struct um *machine, VALUE ssl_obj);
int um_ssl_read(struct um *machine, VALUE ssl, VALUE buf, int maxlen);
int um_ssl_write(struct um *machine, VALUE ssl, VALUE buf, int len);

#endif // UM_H
22 changes: 22 additions & 0 deletions ext/um/um_class.c
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,24 @@ VALUE UM_prep_timeout(VALUE self, VALUE interval) {
return um_prep_timeout(machine, NUM2DBL(interval));
}

VALUE UM_ssl_set_bio(VALUE self, VALUE ssl) {
struct um *machine = um_get_machine(self);
um_ssl_set_bio(machine, ssl);
return self;
}

VALUE UM_ssl_read(VALUE self, VALUE ssl, VALUE buf, VALUE maxlen) {
struct um *machine = um_get_machine(self);
int ret = um_ssl_read(machine, ssl, buf, NUM2INT(maxlen));
return INT2NUM(ret);
}

VALUE UM_ssl_write(VALUE self, VALUE ssl, VALUE buf, VALUE len) {
struct um *machine = um_get_machine(self);
int ret = um_ssl_write(machine, ssl, buf, NUM2INT(len));
return INT2NUM(ret);
}

VALUE UM_pipe(VALUE self) {
int fds[2];
int ret = pipe(fds);
Expand Down Expand Up @@ -697,6 +715,10 @@ void Init_UM(void) {
rb_define_method(cUM, "synchronize", UM_mutex_synchronize, 1);
rb_define_method(cUM, "unshift", UM_queue_unshift, 2);

rb_define_method(cUM, "ssl_set_bio", UM_ssl_set_bio, 1);
rb_define_method(cUM, "ssl_read", UM_ssl_read, 3);
rb_define_method(cUM, "ssl_write", UM_ssl_write, 3);

eUMError = rb_define_class_under(cUM, "Error", rb_eStandardError);

um_define_net_constants(cUM);
Expand Down
Loading