forked from gophish/gophish
-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy pathworker.go
157 lines (146 loc) · 4.11 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package worker
import (
"context"
"time"
log "github.com/gophish/gophish/logger"
"github.com/gophish/gophish/mailer"
"github.com/gophish/gophish/models"
"github.com/sirupsen/logrus"
)
// Worker is an interface that defines the operations needed for a background worker
type Worker interface {
Start()
LaunchCampaign(c models.Campaign)
SendTestEmail(s *models.EmailRequest) error
}
// DefaultWorker is the background worker that handles watching for new campaigns and sending emails appropriately.
type DefaultWorker struct {
mailer mailer.Mailer
}
// New creates a new worker object to handle the creation of campaigns
func New(options ...func(Worker) error) (Worker, error) {
defaultMailer := mailer.NewMailWorker()
w := &DefaultWorker{
mailer: defaultMailer,
}
for _, opt := range options {
if err := opt(w); err != nil {
return nil, err
}
}
return w, nil
}
// WithMailer sets the mailer for a given worker.
// By default, workers use a standard, default mailworker.
func WithMailer(m mailer.Mailer) func(*DefaultWorker) error {
return func(w *DefaultWorker) error {
w.mailer = m
return nil
}
}
// processCampaigns loads maillogs scheduled to be sent before the provided
// time and sends them to the mailer.
func (w *DefaultWorker) processCampaigns(t time.Time) error {
ms, err := models.GetQueuedMailLogs(t.UTC())
if err != nil {
log.Error(err)
return err
}
// Lock the MailLogs (they will be unlocked after processing)
err = models.LockMailLogs(ms, true)
if err != nil {
return err
}
campaignCache := make(map[int64]models.Campaign)
// We'll group the maillogs by campaign ID to (roughly) group
// them by sending profile. This lets the mailer re-use the Sender
// instead of having to re-connect to the SMTP server for every
// email.
msg := make(map[int64][]mailer.Mail)
for _, m := range ms {
// We cache the campaign here to greatly reduce the time it takes to
// generate the message (ref #1726)
c, ok := campaignCache[m.CampaignId]
if !ok {
c, err = models.GetCampaignMailContext(m.CampaignId, m.UserId)
if err != nil {
return err
}
campaignCache[c.Id] = c
}
m.CacheCampaign(&c)
msg[m.CampaignId] = append(msg[m.CampaignId], m)
}
// Next, we process each group of maillogs in parallel
for cid, msc := range msg {
go func(cid int64, msc []mailer.Mail) {
c := campaignCache[cid]
if c.Status == models.CampaignQueued {
err := c.UpdateStatus(models.CampaignInProgress)
if err != nil {
log.Error(err)
return
}
}
log.WithFields(logrus.Fields{
"num_emails": len(msc),
}).Info("Sending emails to mailer for processing")
w.mailer.Queue(msc)
}(cid, msc)
}
return nil
}
// Start launches the worker to poll the database every minute for any pending maillogs
// that need to be processed.
func (w *DefaultWorker) Start() {
log.Info("Background Worker Started Successfully - Waiting for Campaigns")
go w.mailer.Start(context.Background())
for t := range time.Tick(1 * time.Minute) {
err := w.processCampaigns(t)
if err != nil {
log.Error(err)
continue
}
}
}
// LaunchCampaign starts a campaign
func (w *DefaultWorker) LaunchCampaign(c models.Campaign) {
ms, err := models.GetMailLogsByCampaign(c.Id)
if err != nil {
log.Error(err)
return
}
models.LockMailLogs(ms, true)
// This is required since you cannot pass a slice of values
// that implements an interface as a slice of that interface.
mailEntries := []mailer.Mail{}
currentTime := time.Now().UTC()
campaignMailCtx, err := models.GetCampaignMailContext(c.Id, c.UserId)
if err != nil {
log.Error(err)
return
}
for _, m := range ms {
// Only send the emails scheduled to be sent for the past minute to
// respect the campaign scheduling options
if m.SendDate.After(currentTime) {
m.Unlock()
continue
}
err = m.CacheCampaign(&campaignMailCtx)
if err != nil {
log.Error(err)
return
}
mailEntries = append(mailEntries, m)
}
w.mailer.Queue(mailEntries)
}
// SendTestEmail sends a test email
func (w *DefaultWorker) SendTestEmail(s *models.EmailRequest) error {
go func() {
ms := []mailer.Mail{s}
w.mailer.Queue(ms)
}()
return <-s.ErrorChan
}