PQX stands for Priority Queue Execution. Inspired by the official tutorial, PQX-APP uses RabbitMQ as the message system, and serves as a RPC client which pulls messages from MQ, deserializes messages and executes commands. PQX-APP can also be placed in different machines in order to execute machine-specified commands (by mailing_to
field, see below).
Retry functionality is based on RabbitMQ plugin delayed_message_exchange
, check this for more detail.
Bin files provided, currently:
-
inspector: inspecting database table schemas, MQ settings/status and etc.
-
initiator: initializing data, such as database tables, MQ settings and etc.
-
rectifier: modifying initialized data (delete/recreate/...)
-
subscriber: consuming message from the MQ, and execute commands
-
publisher: sending message to the MQ
A full command in Json expression looks like this 🧐:
{
"mailing_to": [
{
"unique_key": "h1"
},
{
"unique_key": "h2",
"common_key": "dev"
}
],
"config": {
"retry": 5,
"poke": 60,
"waiting_timeout": 180,
"consuming_timeout": 270
},
"cmd": {
"CondaPython": {
"env": "py310",
"dir": "$HOME/Code/pqx/scripts",
"script": "print_csv_in_line.py"
}
}
}
where:
-
mailing_to
a list of matching criteria (logic 'or', meaning this message will be sent multiple times), mailing to the queues' who match one of these criteria. ifmailing_to
is empty, then send to all queues (header-exchange mechanism); -
retry
the number of retries, default0
; -
poke
retry frequency in seconds; -
waiting_timeout
the message lives in the queue (seconds), default infinity; -
consuming_timeout
theacking
timeout in a consumer (seconds); -
cmd
the command needs to be executed, for more detail seeCmdArg
in adt.rs.
and the full definition in Rust:
pub struct Command {
pub mailing_to: Vec<HashMap<String, String>>,
pub config: Config,
pub cmd: CmdArg,
}
pub struct Config {
pub retry: Option<u8>,
pub poke: Option<u16>,
pub waiting_timeout: Option<u32>,
pub consuming_timeout: Option<u32>,
}
pub enum CmdArg {
Ping {
addr: String,
},
Bash {
cmd: Vec<String>,
},
Ssh {
ip: String,
user: String,
cmd: Vec<String>,
},
Sshpass {
ip: String,
user: String,
pass: String,
cmd: Vec<String>,
},
CondaPython {
env: String,
dir: String,
script: String,
},
DockerExec {
container: String,
cmd: Vec<String>,
},
}
-
pqx-util
: utilities-
cfg
: config and misc -
db
: persistent connection -
logging
: logging utils -
mq
: RabbitMQ management APIs
-
-
pqx
: library-
ec
: commands and executors -
mq
: publisher and subscriber
-
-
pqx-app
: applications-
initiator
: check existences | create tables | declare exchanges, queues and etc. -
subscriber
: app
-
.
├── pqx
│ └── src
│ ├── ec
│ │ ├── cmd.rs
│ │ ├── exec.rs
│ │ ├── mod.rs
│ │ └── util.rs
│ ├── mq
│ │ ├── client.rs
│ │ ├── consumer.rs
│ │ ├── mod.rs
│ │ ├── predefined.rs
│ │ ├── publish.rs
│ │ └── subscribe.rs
│ ├── error.rs
│ └── lib.rs
├── pqx-app
│ └── src
│ ├── bin
│ │ ├── initiator.rs
│ │ ├── inspector.rs
│ │ ├── publisher.rs
│ │ ├── rectifier.rs
│ │ └── subscriber.rs
│ ├── entities
│ │ ├── message_history.rs
│ │ ├── message_result.rs
│ │ └── mod.rs
│ ├── adt.rs
│ ├── cfg.rs
│ ├── exec.rs
│ ├── lib.rs
│ └── persist.rs
├── pqx-util
│ └── src
│ ├── db.rs
│ ├── error.rs
│ ├── lib.rs
│ ├── log.rs
│ ├── misc.rs
│ └── mq.rs
└── LICENSE
-
Build image for RabbitMQ (including plugins):
make facilities-build
-
Make sure RabbitMQ and PostgreSQL has been started, simply by executing
make facilities-setup
. Check docker-compose for composing detail. -
Add RabbitMQ user:
make mq-adduser
; for supervisor role (enable website operation):make mq-supervisor
-
Running the test cases
-
Following the same steps described in Tests
-
Build image for Pqx:
make pqx-build
-
Create config files:
make init-config
, modify these configsconn.yml
&init.yml
. -
Build and run a Pqx container:
make pqx-build
thenmake pqx-setup
-
Check container & initialization's availability:
docker exec pqx-dev inspector -o insp
-
Create tables for message persistence and declare exchanges, queues and bindings:
docker exec pqx-dev initiator -o init
-
Subscribe to a specific queue:
docker exec pqx-dev ./run.sh sub start
, make sure./docker/server/config/secret.env
has been filled -
Stop a subscriber:
docker exec pqx-dev ./run.sh sub stop
-
cmd:
cmd
module, commands composition and execution -
mq:
mq
module, basic pub/sub -
subscriber: pub/sub combined with a command executor
-
dlx: dead letter exchange
-
topics: topic exchange
-
headers: header exchange
-
custom consumer: a further test case from subscriber, with custom consumer, command execution and logging. Moreover, a Python script for message publishing is also provided.
-
callback registration: connection & channel callback registration
-
delay retry: based on plugin delayed_message_exchange, implementation of message retry
-
message persistence: database interaction
-
mq api: RabbitMQ management APIs
-
If
retry
happened, message would send back to the original exchange, and at the moment as a header-typed exchange, delayed-exchange would deliver thisretry
message to all the matched queues, which means if one consumer failed to process this message, all the other consumers would receive this message again. Hence, a strict publish header should be introduced into the message's header so that delayed-exchange could deliver theretry
message to the right place. -
Delayed exchange cannot be removed unless used 'disable plugin' technique, see Makefile
mq-disable-delayed-exchange
, andmq-enable-delayed-exchange
.
-
list all consumers (simply by
MqQuery
) -
flexible
publisher
(not only read task from Json file) -
enhance
Command
, for instance accepting string replacement inCmdArg
-
Module
dynamic
: dynamically set/del exchange/queue/binding -
message aggregator, see this