diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..1ff0c42 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,63 @@ +############################################################################### +# Set default behavior to automatically normalize line endings. +############################################################################### +* text=auto + +############################################################################### +# Set default behavior for command prompt diff. +# +# This is need for earlier builds of msysgit that does not have it on by +# default for csharp files. +# Note: This is only used by command line +############################################################################### +#*.cs diff=csharp + +############################################################################### +# Set the merge driver for project and solution files +# +# Merging from the command prompt will add diff markers to the files if there +# are conflicts (Merging from VS is not affected by the settings below, in VS +# the diff markers are never inserted). Diff markers may cause the following +# file extensions to fail to load in VS. An alternative would be to treat +# these files as binary and thus will always conflict and require user +# intervention with every merge. To do so, just uncomment the entries below +############################################################################### +#*.sln merge=binary +#*.csproj merge=binary +#*.vbproj merge=binary +#*.vcxproj merge=binary +#*.vcproj merge=binary +#*.dbproj merge=binary +#*.fsproj merge=binary +#*.lsproj merge=binary +#*.wixproj merge=binary +#*.modelproj merge=binary +#*.sqlproj merge=binary +#*.wwaproj merge=binary + +############################################################################### +# behavior for image files +# +# image files are treated as binary by default. +############################################################################### +#*.jpg binary +#*.png binary +#*.gif binary + +############################################################################### +# diff behavior for common document formats +# +# Convert binary document formats to text before diffing them. This feature +# is only available from the command line. Turn it on by uncommenting the +# entries below. +############################################################################### +#*.doc diff=astextplain +#*.DOC diff=astextplain +#*.docx diff=astextplain +#*.DOCX diff=astextplain +#*.dot diff=astextplain +#*.DOT diff=astextplain +#*.pdf diff=astextplain +#*.PDF diff=astextplain +#*.rtf diff=astextplain +#*.RTF diff=astextplain diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b7f3865 --- /dev/null +++ b/.gitignore @@ -0,0 +1,18 @@ +# Compiled files +*.o +*.so +*.rlib +*.dll +*.rs.bk + +# Executables +*.exe + +Cargo.lock + +# Visual Studio +*.user +*.suo + +/target/ +/obj/ \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..005bcc2 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "elastic_rotor" +version = "0.0.1" +authors = ["Ashley Mannix "] + +[profile.release] +debug = true + +[dependencies] +url = "*" +rotor = "*" +futures = "*" +rotor-http = { version = "*", git = "https://github.com/KodrAus/rotor-http" } +rotor-tools = "*" +lazy_static = "*" +crossbeam = "*" + +time = "*" +stopwatch = "*" \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..876f14b --- /dev/null +++ b/README.md @@ -0,0 +1,13 @@ +# `elastic_rotor` + +This repo is an experiment to provide an asynchronous [`rotor_http`](https://github.com/tailhook/rotor-http) implementation of the Elasticsearch REST API. + +## Notes + +The idea is to provide an out-of-the-box connection pool that's suitable for streaming or other high-throughput use cases. +`rotor` itself is probably going to lose a lot of love to `tokio`, so whether this particular implementation goes anywhere is a valid question. +Personally, I think it'll depend on just _how fast_ the `futures` implementation in `hyper` ends up being. +If the difference is enough to justify `rotor` and its lack of DNS or TLS support then it might fit a niche for internal communication in a closed network. + +If that turns out the be the case, then to make this client useful, it will also need to do a lot of network programming that won't be necessary in a `tokio` implementation, like self-healing in the face of network errors. +This is an interesting requirement though, because it means we could bake support for Elasticsearch's cluster health right into the protocol. \ No newline at end of file diff --git a/src/client/client.rs b/src/client/client.rs new file mode 100644 index 0000000..16953a9 --- /dev/null +++ b/src/client/client.rs @@ -0,0 +1,111 @@ +use std::thread; +use std::net::{ SocketAddr, SocketAddrV4, Ipv4Addr }; + +use futures::{ oneshot, Oneshot }; +use rotor::{ Config, Notifier, GenericScope, Response, Void, Loop }; +use rotor::mio::tcp::TcpStream; +use rotor_http::client::Persistent; + +use super::{ Request, Queue, ResponseFuture }; +use super::fsm::{ Fsm, Context }; + +/// A client-side handle to send request messages to a running loop. +#[derive(Clone)] +pub struct Client<'a> { + q: &'a Queue, + notifiers: Vec +} + +impl <'a> Client<'a> { + /// Create a new handle with no listeners. + fn new(q: &'a Queue) -> Self { + Client { + q: q, + notifiers: Vec::new() + } + } + + /// Add a machine as a listener on this handle's queue. + fn add_listener(&mut self, notifier: Notifier) -> &'a Queue { + self.notifiers.push(notifier); + &self.q + } + + /// Push a message to the queue and return a promise representing the response. + pub fn req(&self, req: Request) -> ResponseFuture { + let (c, p) = oneshot(); + + self.q.push((req, c)); + self.wakeup(); + + p + } + + /// Attempt to wake up any active connection handlers. + /// + /// This will ensure that any messages added to the request queue outside of + /// this `Handler` will get picked up as quickly as possible. + fn wakeup(&self) { + for notifier in &self.notifiers { + notifier.wakeup().unwrap(); + } + } +} + +pub struct ClientBuilder { + client: Client<'static>, + addrs: Vec +} + +impl ClientBuilder { + pub fn new(queue: &'static Queue) -> Self { + ClientBuilder { + client: Client::new(queue), + addrs: Vec::new() + } + } + + pub fn connect_localhost(mut self) -> Self { + self.addrs.push(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 9200))); + + self + } + + pub fn connect(mut self, addr: SocketAddr) -> Self { + self.addrs.push(addr); + + self + } + + pub fn build(self) -> Oneshot> { + let (c, p) = oneshot(); + + let addrs = self.addrs; + let mut client = self.client; + + thread::spawn(move || { + let creator = Loop::new(&Config::new()).unwrap(); + let mut pool = creator.instantiate(Context); + + for addr in addrs { + pool.add_machine_with(|scope| { + connect_addr(scope, addr, &mut client) + }).unwrap(); + } + + c.complete(client); + + pool.run().unwrap(); + }); + + p + } +} + +/// Connect a persistent state machine to a node running at the given address. +fn connect_addr(scope: &mut S, addr: SocketAddr, client: &mut Client<'static>) +-> Response, TcpStream>, Void> { + let q = client.add_listener(scope.notifier()); + + Persistent::connect(scope, addr, q) +} \ No newline at end of file diff --git a/src/client/fsm.rs b/src/client/fsm.rs new file mode 100644 index 0000000..cb2db90 --- /dev/null +++ b/src/client/fsm.rs @@ -0,0 +1,67 @@ +use std::marker::PhantomData; +use std::time::Duration; + +use rotor::Scope; +use rotor_http::client::{ Client as RotorClient, Requester, Connection, ProtocolError, Task }; + +use super::Queue; +use super::req::ApiRequest; + +const DEFAULT_TIMEOUT: u64 = 500; + +/* +TODO: We should probably have a single 'wakeup' machine that uses the `Context` to wake the other machines up +This way, when a connection needs to be respawned, we don't have to try and change anything client side +So it'll be more like the sniffed pool, just without an external source of truth +This means the only notifier the Handler has is one to the wakeup machine. +It also opens the door for only waking up machines that aren't currently busy or for doing other match logic +In that case though, we'd need to put connection machines on separate queues +*/ + +#[doc(hidden)] +pub struct Context; + +/// A state machine for managing a persistent connection to an Elasticsearch node. +pub struct Fsm<'a, C> { + q: &'a Queue, + _c: PhantomData +} + +impl <'a, C> RotorClient for Fsm<'a, C> { + type Requester = ApiRequest; + type Seed = &'a Queue; + + fn create(seed: Self::Seed, _scope: &mut Scope<::Context>) -> Self { + Fsm { + q: seed, + _c: PhantomData + } + } + + fn connection_idle(self, _conn: &Connection, scope: &mut Scope) -> Task { + if let Some((req, fut)) = self.q.try_pop() { + Task::Request(self, ApiRequest::for_req(req, fut)) + } + else { + Task::Sleep(self, scope.now() + Duration::from_millis(DEFAULT_TIMEOUT)) + } + } + + fn wakeup(self, conn: &Connection, scope: &mut Scope<::Context>) -> Task { + self.connection_idle(conn, scope) + } + + fn timeout(self, conn: &Connection, scope: &mut Scope<::Context>) -> Task { + if conn.is_idle() { + self.connection_idle(conn, scope) + } + else { + Task::Sleep(self, scope.now() + Duration::from_millis(DEFAULT_TIMEOUT)) + } + } + + fn connection_error(self, _err: &ProtocolError, _scope: &mut Scope) { + //TODO: On connection error, we need to do something... The handler needs to know things have changed + unimplemented!() + } +} \ No newline at end of file diff --git a/src/client/mod.rs b/src/client/mod.rs new file mode 100644 index 0000000..57872a5 --- /dev/null +++ b/src/client/mod.rs @@ -0,0 +1,70 @@ +use crossbeam::sync::SegQueue; +use futures::{ Oneshot, Complete }; + +mod client; +mod fsm; +mod req; + +pub use self::client::*; + +/// A request message. +/// +/// This is what you supply to kick off a request. +pub struct Request { + url: String, + verb: &'static str, + body: Option> +} +impl Request { + /// Create a new GET request. + pub fn get>(url: I) -> Self { + Request { + url: url.into(), + verb: "GET", + body: None + } + } + + /// Create a new POST request. + pub fn post>(url: I, body: &[u8]) -> Self { + Request { + url: url.into(), + verb: "POST", + body: Some(body.to_vec()) + } + } +} + +//TODO: Proper error type here +/// A common data format shared between producer and consumer. +pub type Response = Result, &'static str>; + +/// The promise part of a request future. +pub type ResponseFuture = Oneshot; + +/// The completion part of a request future. +type ResponseComplete = Complete; + +/// A message representing a request and the completion part of a response. +type Message = (Request, ResponseComplete); + +/// A queue to link a client with a connection pool. +/// +/// This is essentially just a wrapped `SegQueue`. +/// Messages can't be put onto this queue directly, you need to use the +/// appropriate `Client` structure. +pub struct Queue(SegQueue); +impl Queue { + pub fn new() -> Self { + Queue(SegQueue::new()) + } + + fn push(&self, msg: Message) { + self.0.push(msg); + } + + fn try_pop(&self) -> Option { + self.0.try_pop() + } +} + diff --git a/src/client/req.rs b/src/client/req.rs new file mode 100644 index 0000000..1cc1509 --- /dev/null +++ b/src/client/req.rs @@ -0,0 +1,80 @@ +use std::marker::PhantomData; +use std::time::Duration; + +use rotor::{ Scope, Time }; +use rotor_http::client::{ Request as RotorRequest, Requester, ResponseError, Head, RecvMode, Version }; + +use super::{ Request, ResponseComplete }; + +const DEFAULT_TIMEOUT: u64 = 1000; + +/// A state machine for managing the HTTP component of an Elasticsearch connection. +pub struct ApiRequest { + req: Request, + fut: ResponseComplete, + _c: PhantomData +} + +impl ApiRequest { + pub fn for_req(req: Request, fut: ResponseComplete) -> Self { + ApiRequest { + req: req, + fut: fut, + _c: PhantomData + } + } +} + +impl Requester for ApiRequest { + type Context = C; + + fn prepare_request(self, r: &mut RotorRequest, _scope: &mut Scope) -> Option { + r.start(&self.req.verb, &self.req.url, Version::Http11); + + r.add_header("Content-Type", b"application/json").unwrap(); + + if let Some(ref body) = self.req.body { + r.add_length(body.len() as u64).unwrap(); + r.done_headers().unwrap(); + r.write_body(body); + } + else { + r.done_headers().unwrap(); + } + + r.done(); + + Some(self) + } + + fn headers_received(self, _head: Head, _req: &mut RotorRequest, scope: &mut Scope) -> Option<(Self, RecvMode, Time)> { + //NOTE: 404's will come through here too, so we can set a correct error response + Some((self, RecvMode::Buffered(1 << 20), scope.now() + Duration::new(DEFAULT_TIMEOUT, 0))) + } + + fn response_received(self, data: &[u8], _req: &mut RotorRequest, _scope: &mut Scope) { + self.fut.complete(Ok(data.to_vec())); + } + + fn bad_response(self, _err: &ResponseError, _scope: &mut Scope) { + self.fut.complete(Err("nah it's broke m8. should use a proper error type here.")); + } + + fn response_chunk(self, _chunk: &[u8], _req: &mut RotorRequest, _scope: &mut Scope) -> Option { + unreachable!(); + } + + fn response_end(self, _req: &mut RotorRequest, _scope: &mut Scope) { + unreachable!(); + } + + fn timeout(self, _req: &mut RotorRequest, scope: &mut Scope) -> Option<(Self, Time)> { + //TODO: Check for cancellation + Some((self, scope.now() + Duration::new(DEFAULT_TIMEOUT, 0))) + } + + fn wakeup(self, _req: &mut RotorRequest, _scope: &mut Scope) -> Option { + //TODO: Check for cancellation + Some(self) + } +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..e231f82 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,52 @@ +//! # `elastic_rotor` +//! +//! A WIP implementation of an asynchronous http client for Elasticsearch. +//! +//! Only _sort of_ works... But will follow the following design: +//! - Provide a simple, fast constant connection pool +//! - Provide a more complex, but robust, sniffed connection pool +//! +//! Communication to the loop is through a non-blocking `Queue`, wrapped in a `Client`. + +extern crate time; + +extern crate crossbeam; +extern crate futures; +extern crate rotor; +extern crate rotor_http; +extern crate rotor_tools; + +#[macro_use] +extern crate lazy_static; + +mod client; +pub use client::*; + +//Test usage +use futures::Future; + +lazy_static! { + static ref QUEUE: Queue = Queue::new(); +} + +fn main() { + //Build a client + let builder = ClientBuilder::new(&QUEUE) + .connect_localhost(); + + let cli = builder.build().wait().unwrap(); + + //Run a post request asynchronously + cli.req(Request::post("/testindex/testtype/1", b"{\"id\":1}")) + .wait() + .unwrap() + .unwrap(); + + //Run some search requests asynchronously + let total_reqs = 100; + let search_reqs: Vec = (0..total_reqs).map(|_| { + cli.req(Request::get("/testindex/testtype/_search")) + }).collect(); + + futures::collect(search_reqs).wait().unwrap(); +} \ No newline at end of file