PQX stands for Priority Queue Execution. Inspired by the official tutorial, PQX-APP uses RabbitMQ as the message system, and serves as a RPC client which pulls messages from MQ and deserialize messages then executes commands.
Retry functionality is based on RabbitMQ plugin delayed_message_exchange
, check this for more detail.
A full command in Json expression looks like this:
{
"consumer_ids": ["h1", "h3"],
"retry": 5,
"poke": 60,
"waiting_timeout": 180,
"consuming_timeout": 270,
"cmd": {
"CondaPython": {
"env": "py310",
"dir": "$HOME/Code/pqx/scripts",
"script": "print_csv_in_line.py"
}
}
}
where:
-
consumer_ids
means which consumer to receive this message; -
retry
the number of retries, default0
; -
poke
retry frequency in seconds; -
waiting_timeout
the message lives in the queue (in seconds), default infinity; -
consuming_timeout
theacking
timeout in a consumer (in seconds); -
cmd
the command wants to be executed, for more detail seeCmdArg
in adt.rs.
and the full definition in Rust:
pub struct Command {
pub consumer_ids: Vec<String>,
pub retry: Option<u8>,
pub poke: Option<u16>,
pub waiting_timeout: Option<u32>,
pub consuming_timeout: Option<u32>,
pub cmd: CmdArg,
}
pub enum CmdArg {
Ping {
addr: String,
},
Bash {
cmd: Vec<String>,
},
Ssh {
ip: String,
user: String,
cmd: Vec<String>,
},
Sshpass {
ip: String,
user: String,
pass: String,
cmd: Vec<String>,
},
CondaPython {
env: String,
dir: String,
script: String,
},
DockerExec {
container: String,
cmd: Vec<String>,
},
}
-
pqx-util
: utilities-
cfg
: config and misc -
db
: persistent connection -
logging
: logging utils -
mq
: RabbitMQ management APIs
-
-
pqx
: library-
ec
: commands and executors -
mq
: publisher and subscriber
-
-
pqx-app
: applications-
initiator
: check existences | create tables | declare exchanges, queues and etc. -
subscriber
: app
-
.
├── pqx
│ └── src
│ ├── ec
│ │ ├── cmd.rs
│ │ ├── exec.rs
│ │ ├── mod.rs
│ │ └── util.rs
│ ├── mq
│ │ ├── client.rs
│ │ ├── consumer.rs
│ │ ├── mod.rs
│ │ ├── predefined.rs
│ │ ├── publish.rs
│ │ └── subscribe.rs
│ ├── error.rs
│ └── lib.rs
├── pqx-app
│ └── src
│ ├── bin
│ │ ├── initiator.rs
│ │ └── subscriber.rs
│ ├── entities
│ │ ├── message_history.rs
│ │ ├── message_result.rs
│ │ └── mod.rs
│ ├── adt.rs
│ ├── cfg.rs
│ ├── exec.rs
│ ├── lib.rs
│ └── persist.rs
├── pqx-util
│ └── src
│ ├── db.rs
│ ├── error.rs
│ ├── lib.rs
│ ├── log.rs
│ ├── misc.rs
│ └── mq.rs
└── LICENSE
-
Build image for RabbitMQ (including plugins):
make facilities-build
-
Make sure RabbitMQ and PostgreSQL has been started, simply by executing
make facilities-setup
. Check docker-compose for composing detail. -
Add RabbitMQ user:
make mq-adduser
; for supervisor role (enable website operation):make mq-supervisor
-
Running the test cases
-
Following the same steps described in Tests
-
Build image for Pqx:
make pqx-build
-
Follow the template files under
./docker/server/config
, create config files:conn.yml
&init.yml
. -
Build and run a Pqx container:
make pqx-build
thenmake pqx-setup
-
Check container & initialization's availability:
docker exec pqx-dev initiator -o insp
-
Create tables for message persistence and declare exchanges, queues and bindings:
docker exec pqx-dev initiator -o init
-
cmd:
cmd
module, commands composition and execution -
mq:
mq
module, basic pub/sub -
subscriber: pub/sub combined with a command executor
-
dlx: dead letter exchange
-
topics: topic exchange
-
headers: header exchange
-
custom consumer: a further test case from subscriber, with custom consumer, command execution and logging. Moreover, a Python script for message publishing is also provided.
-
callback registration: connection & channel callback registration
-
delay retry: based on plugin delayed_message_exchange, implementation of message retry
-
message persistence: database interaction
-
mq api: RabbitMQ management APIs
- By using header-exchange, we are able to play with more complicated message delivery. Hence a flexible
Command
is required.