Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async support in WS #134

Merged
merged 7 commits into from
Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ app.start(port=5000)

## Features
- Under active development!
- Written in Russt, btw xD
- Written in Rust, btw xD
- A multithreaded Runtime
- Extensible
- A simple API
Expand Down
38 changes: 37 additions & 1 deletion docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ app = Robyn(__file__)
websocket = WS(app, "/web_socket")
```

Now, you can define 3 methods for every web_socket for their lifecycle, they are as follows:
Now, you can define 3 methods for every web_socket for their life cycle, they are as follows:

```python3
@websocket.on("message")
Expand Down Expand Up @@ -196,6 +196,42 @@ The three methods:

To see a complete service in action, you can go to the folder [../integration_tests/base_routes.py](../integration_tests/base_routes.py)

### Update(20/12/21)

Async functions are supported in Web Sockets now!

Async functions are executed out of order for web sockets. i.e. the order of response is not guaranteed. This is done to achieve a non blocking concurrent effect.

A blocking async web socket is in plans for the future.

### Usage

```python3
@websocket.on("message")
async def connect():
global i
i+=1
if i==0:
return "Whaaat??"
elif i==1:
return "Whooo??"
elif i==2:
return "*chika* *chika* Slim Shady."
elif i==3:
i= -1
return ""

@websocket.on("close")
async def close():
return "Goodbye world, from ws"

@websocket.on("connect")
async def message():
return "Hello world, from ws"

```


## MutliCore Scaling

To run Robyn across multiple cores, you can use the following command:
Expand Down
6 changes: 2 additions & 4 deletions integration_tests/base_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
i = -1

@websocket.on("message")
def connect():
async def connect():
global i
i+=1
if i==0:
Expand All @@ -21,12 +21,10 @@ def connect():

@websocket.on("close")
def close():
print("Hello world")
return "Hello world, from ws"
return "GoodBye world, from ws"

@websocket.on("connect")
def message():
print("Hello world")
return "Hello world, from ws"


Expand Down
4 changes: 2 additions & 2 deletions integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

@pytest.fixture
def session():
subprocess.call(["freeport", "5000"])
subprocess.call(["yes | freeport 5000"], shell=True)
os.environ["ROBYN_URL"] = "127.0.0.1"
current_file_path = pathlib.Path(__file__).parent.resolve()
base_routes = os.path.join(current_file_path, "./base_routes.py")
process = subprocess.Popen(["python3", base_routes])
time.sleep(1)
time.sleep(5)
yield
process.terminate()
del os.environ["ROBYN_URL"]
Expand Down
1 change: 1 addition & 0 deletions integration_tests/test_web_sockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
def test_web_socket(session):
async def start_ws(uri):
async with connect(uri) as websocket:
assert( await websocket.recv() == "Hello world, from ws")
await websocket.send("My name is?")
assert( await websocket.recv() == "Whaaat??")
await websocket.send("My name is?")
Expand Down
44 changes: 21 additions & 23 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use pyo3::prelude::*;
use pyo3::types::PyAny;

use actix_web::http::Method;
use dashmap::DashMap;
use matchit::Node;

/// Contains the thread safe hashmaps of different routes
Expand All @@ -21,7 +20,7 @@ pub struct Router {
options_routes: Arc<RwLock<Node<(PyFunction, u8)>>>,
connect_routes: Arc<RwLock<Node<(PyFunction, u8)>>>,
trace_routes: Arc<RwLock<Node<(PyFunction, u8)>>>,
web_socket_routes: DashMap<String, HashMap<String, (PyFunction, u8)>>,
web_socket_routes: Arc<RwLock<HashMap<String, HashMap<String, (PyFunction, u8)>>>>,
}

impl Router {
Expand All @@ -36,7 +35,7 @@ impl Router {
options_routes: Arc::new(RwLock::new(Node::new())),
connect_routes: Arc::new(RwLock::new(Node::new())),
trace_routes: Arc::new(RwLock::new(Node::new())),
web_socket_routes: DashMap::new(),
web_socket_routes: Arc::new(RwLock::new(HashMap::new())),
}
}

Expand All @@ -57,7 +56,9 @@ impl Router {
}

#[inline]
pub fn get_web_socket_map(&self) -> &DashMap<String, HashMap<String, (PyFunction, u8)>> {
pub fn get_web_socket_map(
&self,
) -> &Arc<RwLock<HashMap<String, HashMap<String, (PyFunction, u8)>>>> {
&self.web_socket_routes
}

Expand Down Expand Up @@ -117,42 +118,39 @@ impl Router {
let (close_route_function, close_route_is_async, close_route_params) = close_route;
let (message_route_function, message_route_is_async, message_route_params) = message_route;

let insert_in_router = |table: &DashMap<String, HashMap<String, (PyFunction, u8)>>,
handler: Py<PyAny>,
is_async: bool,
number_of_params: u8,
socket_type: &str| {
let function = if is_async {
PyFunction::CoRoutine(handler)
} else {
PyFunction::SyncFunction(handler)
let insert_in_router =
|handler: Py<PyAny>, is_async: bool, number_of_params: u8, socket_type: &str| {
let function = if is_async {
PyFunction::CoRoutine(handler)
} else {
PyFunction::SyncFunction(handler)
};

println!("socket type is {:?} {:?}", table, route);

table
.write()
.unwrap()
.entry(route.to_string())
.or_default()
.insert(socket_type.to_string(), (function, number_of_params))
};

let mut route_map = HashMap::new();
route_map.insert(socket_type.to_string(), (function, number_of_params));

println!("{:?}", table);
table.insert(route.to_string(), route_map);
};

insert_in_router(
table,
connect_route_function,
connect_route_is_async,
connect_route_params,
"connect",
);

insert_in_router(
table,
close_route_function,
close_route_is_async,
close_route_params,
"close",
);

insert_in_router(
table,
message_route_function,
message_route_is_async,
message_route_params,
Expand Down
22 changes: 12 additions & 10 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,10 @@ impl Server {
return Ok(());
}

println!("{}", name);

let borrow = socket.try_borrow_mut()?;
let held_socket: &SocketHeld = &*borrow;

let raw_socket = held_socket.get_socket();
println!("Got our socket {:?}", raw_socket);

let router = self.router.clone();
let headers = self.headers.clone();
Expand Down Expand Up @@ -123,17 +120,23 @@ impl Server {
.app_data(web::Data::new(headers.clone()));

let web_socket_map = router_copy.get_web_socket_map();
for elem in (web_socket_map).iter() {
let route = elem.key().clone();
let params = elem.value().clone();
for (elem, value) in (web_socket_map.read().unwrap()).iter() {
let route = elem.clone();
let params = value.clone();
let event_loop_hdl = event_loop_hdl.clone();
app = app.route(
&route,
&route.clone(),
web::get().to(
move |_router: web::Data<Arc<Router>>,
_headers: web::Data<Arc<Headers>>,
stream: web::Payload,
req: HttpRequest| {
start_web_socket(req, stream, Arc::new(params.clone()))
start_web_socket(
req,
stream,
Arc::new(params.clone()),
event_loop_hdl.clone(),
)
},
),
);
Expand Down Expand Up @@ -205,14 +208,13 @@ impl Server {
/// Add a new web socket route to the routing tables
/// can be called after the server has been started
pub fn add_web_socket_route(
&self,
&mut self,
route: &str,
// handler, is_async, number of params
connect_route: (Py<PyAny>, bool, u8),
close_route: (Py<PyAny>, bool, u8),
message_route: (Py<PyAny>, bool, u8),
) {
println!("WS Route added for {} ", route);
self.router
.add_websocket_route(route, connect_route, close_route, message_route);
}
Expand Down
Loading