-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit df7f766
Showing
9 changed files
with
493 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
############################################################################### | ||
# Set default behavior to automatically normalize line endings. | ||
############################################################################### | ||
* text=auto | ||
|
||
############################################################################### | ||
# Set default behavior for command prompt diff. | ||
# | ||
# This is need for earlier builds of msysgit that does not have it on by | ||
# default for csharp files. | ||
# Note: This is only used by command line | ||
############################################################################### | ||
#*.cs diff=csharp | ||
|
||
############################################################################### | ||
# Set the merge driver for project and solution files | ||
# | ||
# Merging from the command prompt will add diff markers to the files if there | ||
# are conflicts (Merging from VS is not affected by the settings below, in VS | ||
# the diff markers are never inserted). Diff markers may cause the following | ||
# file extensions to fail to load in VS. An alternative would be to treat | ||
# these files as binary and thus will always conflict and require user | ||
# intervention with every merge. To do so, just uncomment the entries below | ||
############################################################################### | ||
#*.sln merge=binary | ||
#*.csproj merge=binary | ||
#*.vbproj merge=binary | ||
#*.vcxproj merge=binary | ||
#*.vcproj merge=binary | ||
#*.dbproj merge=binary | ||
#*.fsproj merge=binary | ||
#*.lsproj merge=binary | ||
#*.wixproj merge=binary | ||
#*.modelproj merge=binary | ||
#*.sqlproj merge=binary | ||
#*.wwaproj merge=binary | ||
|
||
############################################################################### | ||
# behavior for image files | ||
# | ||
# image files are treated as binary by default. | ||
############################################################################### | ||
#*.jpg binary | ||
#*.png binary | ||
#*.gif binary | ||
|
||
############################################################################### | ||
# diff behavior for common document formats | ||
# | ||
# Convert binary document formats to text before diffing them. This feature | ||
# is only available from the command line. Turn it on by uncommenting the | ||
# entries below. | ||
############################################################################### | ||
#*.doc diff=astextplain | ||
#*.DOC diff=astextplain | ||
#*.docx diff=astextplain | ||
#*.DOCX diff=astextplain | ||
#*.dot diff=astextplain | ||
#*.DOT diff=astextplain | ||
#*.pdf diff=astextplain | ||
#*.PDF diff=astextplain | ||
#*.rtf diff=astextplain | ||
#*.RTF diff=astextplain |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
# Compiled files | ||
*.o | ||
*.so | ||
*.rlib | ||
*.dll | ||
*.rs.bk | ||
|
||
# Executables | ||
*.exe | ||
|
||
Cargo.lock | ||
|
||
# Visual Studio | ||
*.user | ||
*.suo | ||
|
||
/target/ | ||
/obj/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
[package] | ||
name = "elastic_rotor" | ||
version = "0.0.1" | ||
authors = ["Ashley Mannix <ashleymannix@live.com.au>"] | ||
|
||
[profile.release] | ||
debug = true | ||
|
||
[dependencies] | ||
url = "*" | ||
rotor = "*" | ||
futures = "*" | ||
rotor-http = { version = "*", git = "https://github.com/KodrAus/rotor-http" } | ||
rotor-tools = "*" | ||
lazy_static = "*" | ||
crossbeam = "*" | ||
|
||
time = "*" | ||
stopwatch = "*" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
# `elastic_rotor` | ||
|
||
This repo is an experiment to provide an asynchronous [`rotor_http`](https://github.com/tailhook/rotor-http) implementation of the Elasticsearch REST API. | ||
|
||
## Notes | ||
|
||
The idea is to provide an out-of-the-box connection pool that's suitable for streaming or other high-throughput use cases. | ||
`rotor` itself is probably going to lose a lot of love to `tokio`, so whether this particular implementation goes anywhere is a valid question. | ||
Personally, I think it'll depend on just _how fast_ the `futures` implementation in `hyper` ends up being. | ||
If the difference is enough to justify `rotor` and its lack of DNS or TLS support then it might fit a niche for internal communication in a closed network. | ||
|
||
If that turns out the be the case, then to make this client useful, it will also need to do a lot of network programming that won't be necessary in a `tokio` implementation, like self-healing in the face of network errors. | ||
This is an interesting requirement though, because it means we could bake support for Elasticsearch's cluster health right into the protocol. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
use std::thread; | ||
use std::net::{ SocketAddr, SocketAddrV4, Ipv4Addr }; | ||
|
||
use futures::{ oneshot, Oneshot }; | ||
use rotor::{ Config, Notifier, GenericScope, Response, Void, Loop }; | ||
use rotor::mio::tcp::TcpStream; | ||
use rotor_http::client::Persistent; | ||
|
||
use super::{ Request, Queue, ResponseFuture }; | ||
use super::fsm::{ Fsm, Context }; | ||
|
||
/// A client-side handle to send request messages to a running loop. | ||
#[derive(Clone)] | ||
pub struct Client<'a> { | ||
q: &'a Queue, | ||
notifiers: Vec<Notifier> | ||
} | ||
|
||
impl <'a> Client<'a> { | ||
/// Create a new handle with no listeners. | ||
fn new(q: &'a Queue) -> Self { | ||
Client { | ||
q: q, | ||
notifiers: Vec::new() | ||
} | ||
} | ||
|
||
/// Add a machine as a listener on this handle's queue. | ||
fn add_listener(&mut self, notifier: Notifier) -> &'a Queue { | ||
self.notifiers.push(notifier); | ||
&self.q | ||
} | ||
|
||
/// Push a message to the queue and return a promise representing the response. | ||
pub fn req(&self, req: Request) -> ResponseFuture { | ||
let (c, p) = oneshot(); | ||
|
||
self.q.push((req, c)); | ||
self.wakeup(); | ||
|
||
p | ||
} | ||
|
||
/// Attempt to wake up any active connection handlers. | ||
/// | ||
/// This will ensure that any messages added to the request queue outside of | ||
/// this `Handler` will get picked up as quickly as possible. | ||
fn wakeup(&self) { | ||
for notifier in &self.notifiers { | ||
notifier.wakeup().unwrap(); | ||
} | ||
} | ||
} | ||
|
||
pub struct ClientBuilder { | ||
client: Client<'static>, | ||
addrs: Vec<SocketAddr> | ||
} | ||
|
||
impl ClientBuilder { | ||
pub fn new(queue: &'static Queue) -> Self { | ||
ClientBuilder { | ||
client: Client::new(queue), | ||
addrs: Vec::new() | ||
} | ||
} | ||
|
||
pub fn connect_localhost(mut self) -> Self { | ||
self.addrs.push(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 9200))); | ||
|
||
self | ||
} | ||
|
||
pub fn connect(mut self, addr: SocketAddr) -> Self { | ||
self.addrs.push(addr); | ||
|
||
self | ||
} | ||
|
||
pub fn build(self) -> Oneshot<Client<'static>> { | ||
let (c, p) = oneshot(); | ||
|
||
let addrs = self.addrs; | ||
let mut client = self.client; | ||
|
||
thread::spawn(move || { | ||
let creator = Loop::new(&Config::new()).unwrap(); | ||
let mut pool = creator.instantiate(Context); | ||
|
||
for addr in addrs { | ||
pool.add_machine_with(|scope| { | ||
connect_addr(scope, addr, &mut client) | ||
}).unwrap(); | ||
} | ||
|
||
c.complete(client); | ||
|
||
pool.run().unwrap(); | ||
}); | ||
|
||
p | ||
} | ||
} | ||
|
||
/// Connect a persistent state machine to a node running at the given address. | ||
fn connect_addr<S: GenericScope, C>(scope: &mut S, addr: SocketAddr, client: &mut Client<'static>) | ||
-> Response<Persistent<Fsm<'static, C>, TcpStream>, Void> { | ||
let q = client.add_listener(scope.notifier()); | ||
|
||
Persistent::connect(scope, addr, q) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
use std::marker::PhantomData; | ||
use std::time::Duration; | ||
|
||
use rotor::Scope; | ||
use rotor_http::client::{ Client as RotorClient, Requester, Connection, ProtocolError, Task }; | ||
|
||
use super::Queue; | ||
use super::req::ApiRequest; | ||
|
||
const DEFAULT_TIMEOUT: u64 = 500; | ||
|
||
/* | ||
TODO: We should probably have a single 'wakeup' machine that uses the `Context` to wake the other machines up | ||
This way, when a connection needs to be respawned, we don't have to try and change anything client side | ||
So it'll be more like the sniffed pool, just without an external source of truth | ||
This means the only notifier the Handler has is one to the wakeup machine. | ||
It also opens the door for only waking up machines that aren't currently busy or for doing other match logic | ||
In that case though, we'd need to put connection machines on separate queues | ||
*/ | ||
|
||
#[doc(hidden)] | ||
pub struct Context; | ||
|
||
/// A state machine for managing a persistent connection to an Elasticsearch node. | ||
pub struct Fsm<'a, C> { | ||
q: &'a Queue, | ||
_c: PhantomData<C> | ||
} | ||
|
||
impl <'a, C> RotorClient for Fsm<'a, C> { | ||
type Requester = ApiRequest<C>; | ||
type Seed = &'a Queue; | ||
|
||
fn create(seed: Self::Seed, _scope: &mut Scope<<Self::Requester as Requester>::Context>) -> Self { | ||
Fsm { | ||
q: seed, | ||
_c: PhantomData | ||
} | ||
} | ||
|
||
fn connection_idle(self, _conn: &Connection, scope: &mut Scope<C>) -> Task<Self> { | ||
if let Some((req, fut)) = self.q.try_pop() { | ||
Task::Request(self, ApiRequest::for_req(req, fut)) | ||
} | ||
else { | ||
Task::Sleep(self, scope.now() + Duration::from_millis(DEFAULT_TIMEOUT)) | ||
} | ||
} | ||
|
||
fn wakeup(self, conn: &Connection, scope: &mut Scope<<Self::Requester as Requester>::Context>) -> Task<Self> { | ||
self.connection_idle(conn, scope) | ||
} | ||
|
||
fn timeout(self, conn: &Connection, scope: &mut Scope<<Self::Requester as Requester>::Context>) -> Task<Self> { | ||
if conn.is_idle() { | ||
self.connection_idle(conn, scope) | ||
} | ||
else { | ||
Task::Sleep(self, scope.now() + Duration::from_millis(DEFAULT_TIMEOUT)) | ||
} | ||
} | ||
|
||
fn connection_error(self, _err: &ProtocolError, _scope: &mut Scope<C>) { | ||
//TODO: On connection error, we need to do something... The handler needs to know things have changed | ||
unimplemented!() | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
use crossbeam::sync::SegQueue; | ||
use futures::{ Oneshot, Complete }; | ||
|
||
mod client; | ||
mod fsm; | ||
mod req; | ||
|
||
pub use self::client::*; | ||
|
||
/// A request message. | ||
/// | ||
/// This is what you supply to kick off a request. | ||
pub struct Request { | ||
url: String, | ||
verb: &'static str, | ||
body: Option<Vec<u8>> | ||
} | ||
impl Request { | ||
/// Create a new GET request. | ||
pub fn get<I: Into<String>>(url: I) -> Self { | ||
Request { | ||
url: url.into(), | ||
verb: "GET", | ||
body: None | ||
} | ||
} | ||
|
||
/// Create a new POST request. | ||
pub fn post<I: Into<String>>(url: I, body: &[u8]) -> Self { | ||
Request { | ||
url: url.into(), | ||
verb: "POST", | ||
body: Some(body.to_vec()) | ||
} | ||
} | ||
} | ||
|
||
//TODO: Proper error type here | ||
/// A common data format shared between producer and consumer. | ||
pub type Response = Result<Vec<u8>, &'static str>; | ||
|
||
/// The promise part of a request future. | ||
pub type ResponseFuture = Oneshot<Response>; | ||
|
||
/// The completion part of a request future. | ||
type ResponseComplete = Complete<Response>; | ||
|
||
/// A message representing a request and the completion part of a response. | ||
type Message = (Request, ResponseComplete); | ||
|
||
/// A queue to link a client with a connection pool. | ||
/// | ||
/// This is essentially just a wrapped `SegQueue`. | ||
/// Messages can't be put onto this queue directly, you need to use the | ||
/// appropriate `Client` structure. | ||
pub struct Queue(SegQueue<Message>); | ||
impl Queue { | ||
pub fn new() -> Self { | ||
Queue(SegQueue::new()) | ||
} | ||
|
||
fn push(&self, msg: Message) { | ||
self.0.push(msg); | ||
} | ||
|
||
fn try_pop(&self) -> Option<Message> { | ||
self.0.try_pop() | ||
} | ||
} | ||
|
Oops, something went wrong.