Skip to content

Commit

Permalink
fetch the key for nobody only at startup
Browse files Browse the repository at this point in the history
  • Loading branch information
dimkr committed Mar 2, 2024
1 parent ca79b22 commit e1f31e2
Show file tree
Hide file tree
Showing 32 changed files with 394 additions and 347 deletions.
5 changes: 3 additions & 2 deletions ap/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package ap
import (
"context"
"database/sql"
"github.com/dimkr/tootik/httpsig"
"log/slog"
)

Expand All @@ -34,6 +35,6 @@ const (

// Resolver retrieves [Actor] objects given their ID.
type Resolver interface {
ResolveID(ctx context.Context, log *slog.Logger, db *sql.DB, from *Actor, id string, flags ResolverFlag) (*Actor, error)
Resolve(ctx context.Context, log *slog.Logger, db *sql.DB, from *Actor, host, name string, flags ResolverFlag) (*Actor, error)
ResolveID(ctx context.Context, log *slog.Logger, db *sql.DB, key httpsig.Key, id string, flags ResolverFlag) (*Actor, error)
Resolve(ctx context.Context, log *slog.Logger, db *sql.DB, key httpsig.Key, host, name string, flags ResolverFlag) (*Actor, error)
}
10 changes: 5 additions & 5 deletions cmd/tootik/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func main() {
panic(err)
}

nobody, err := user.CreateNobody(ctx, *domain, db)
_, nobodyKey, err := user.CreateNobody(ctx, *domain, db)
if err != nil {
panic(err)
}
Expand All @@ -196,7 +196,7 @@ func main() {
Config: &cfg,
DB: db,
Resolver: resolver,
Actor: nobody,
ActorKey: nobodyKey,
Log: log.With("listener", "https"),
Addr: *addr,
Cert: *cert,
Expand Down Expand Up @@ -278,7 +278,7 @@ func main() {
Log: log.With("queue", "inbox"),
DB: db,
Resolver: resolver,
Actor: nobody,
Key: nobodyKey,
},
},
{
Expand Down Expand Up @@ -326,7 +326,7 @@ func main() {
Log: log.With("job", "mover"),
DB: db,
Resolver: resolver,
Actor: nobody,
Key: nobodyKey,
},
},
{
Expand All @@ -338,7 +338,7 @@ func main() {
Log: log.With("job", "sync"),
DB: db,
Resolver: resolver,
Actor: nobody,
Key: nobodyKey,
},
},
{
Expand Down
38 changes: 38 additions & 0 deletions data/key.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright 2023, 2024 Dima Krasner
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless ruired by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package data

import (
"crypto/x509"
"encoding/pem"
)

// ParsePrivateKey parses private keys.
func ParsePrivateKey(privateKeyPemString string) (any, error) {
privateKeyPem, _ := pem.Decode([]byte(privateKeyPemString))

privateKey, err := x509.ParsePKCS8PrivateKey(privateKeyPem.Bytes)
if err != nil {
// fallback for openssl<3.0.0
privateKey, err = x509.ParsePKCS1PrivateKey(privateKeyPem.Bytes)
if err != nil {
return nil, err
}
}

return privateKey, nil
}
27 changes: 17 additions & 10 deletions fed/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"fmt"
"github.com/dimkr/tootik/ap"
"github.com/dimkr/tootik/cfg"
"github.com/dimkr/tootik/data"
"github.com/dimkr/tootik/httpsig"
"log/slog"
"net/url"
"time"
Expand Down Expand Up @@ -57,30 +59,36 @@ func (q *Queue) Process(ctx context.Context) error {
func (q *Queue) process(ctx context.Context) error {
q.Log.Debug("Polling delivery queue")

rows, err := q.DB.QueryContext(ctx, `select outbox.attempts, outbox.activity, outbox.activity, outbox.inserted, outbox.received, persons.actor from outbox join persons on persons.id = outbox.sender where outbox.sent = 0 and (outbox.attempts = 0 or (outbox.attempts < ? and outbox.last <= unixepoch() - ?)) order by outbox.attempts asc, outbox.last asc limit ?`, q.Config.MaxDeliveryAttempts, q.Config.DeliveryRetryInterval, q.Config.DeliveryBatchSize)
rows, err := q.DB.QueryContext(ctx, `select outbox.attempts, outbox.activity, outbox.activity, outbox.inserted, outbox.received, persons.actor, persons.privkey from outbox join persons on persons.id = outbox.sender where outbox.sent = 0 and (outbox.attempts = 0 or (outbox.attempts < ? and outbox.last <= unixepoch() - ?)) order by outbox.attempts asc, outbox.last asc limit ?`, q.Config.MaxDeliveryAttempts, q.Config.DeliveryRetryInterval, q.Config.DeliveryBatchSize)
if err != nil {
return fmt.Errorf("failed to fetch posts to deliver: %w", err)
}
defer rows.Close()

for rows.Next() {
var activity ap.Activity
var rawActivity string
var rawActivity, privKeyPem string
var actor ap.Actor
var inserted int64
var recipients ap.Audience
var deliveryAttempts int
if err := rows.Scan(&deliveryAttempts, &activity, &rawActivity, &inserted, &recipients, &actor); err != nil {
if err := rows.Scan(&deliveryAttempts, &activity, &rawActivity, &inserted, &recipients, &actor, &privKeyPem); err != nil {
q.Log.Error("Failed to fetch post to deliver", "error", err)
continue
}

privKey, err := data.ParsePrivateKey(privKeyPem)
if err != nil {
q.Log.Error("Failed to parse private key", "error", err)
continue
}

if _, err := q.DB.ExecContext(ctx, `update outbox set last = unixepoch(), attempts = ? where activity->>'$.id' = ?`, deliveryAttempts+1, activity.ID); err != nil {
q.Log.Error("Failed to save last delivery attempt time", "id", activity.ID, "attempts", deliveryAttempts, "error", err)
continue
}

if err := q.deliverWithTimeout(ctx, &activity, []byte(rawActivity), &actor, time.Unix(inserted, 0), &recipients); err != nil {
if err := q.deliverWithTimeout(ctx, &activity, []byte(rawActivity), &actor, httpsig.Key{ID: actor.PublicKey.ID, PrivateKey: privKey}, time.Unix(inserted, 0), &recipients); err != nil {
q.Log.Warn("Failed to deliver activity", "id", activity.ID, "attempts", deliveryAttempts, "error", err)

if _, err := q.DB.ExecContext(ctx, `update outbox set received = ? where activity->>'$.id' = ?`, &recipients, activity.ID); err != nil {
Expand All @@ -102,13 +110,13 @@ func (q *Queue) process(ctx context.Context) error {
return nil
}

func (q *Queue) deliverWithTimeout(parent context.Context, activity *ap.Activity, rawActivity []byte, actor *ap.Actor, inserted time.Time, sent *ap.Audience) error {
func (q *Queue) deliverWithTimeout(parent context.Context, activity *ap.Activity, rawActivity []byte, actor *ap.Actor, key httpsig.Key, inserted time.Time, sent *ap.Audience) error {
ctx, cancel := context.WithTimeout(parent, q.Config.DeliveryTimeout)
defer cancel()
return q.deliver(ctx, activity, rawActivity, actor, inserted, sent)
return q.deliver(ctx, activity, rawActivity, actor, key, inserted, sent)
}

func (q *Queue) deliver(ctx context.Context, activity *ap.Activity, rawActivity []byte, actor *ap.Actor, inserted time.Time, received *ap.Audience) error {
func (q *Queue) deliver(ctx context.Context, activity *ap.Activity, rawActivity []byte, actor *ap.Actor, key httpsig.Key, inserted time.Time, received *ap.Audience) error {
activityID, err := url.Parse(activity.ID)
if err != nil {
return err
Expand Down Expand Up @@ -167,7 +175,6 @@ func (q *Queue) deliver(ctx context.Context, activity *ap.Activity, rawActivity

sent := map[string]struct{}{}

var key key
var followers partialFollowers
if recipients.Contains(actor.Followers) {
followers = partialFollowers{}
Expand All @@ -179,7 +186,7 @@ func (q *Queue) deliver(ctx context.Context, activity *ap.Activity, rawActivity
return true
}

to, err := q.Resolver.ResolveID(ctx, q.Log, q.DB, actor, actorID, 0)
to, err := q.Resolver.ResolveID(ctx, q.Log, q.DB, key, actorID, 0)
if err != nil {
q.Log.Warn("Failed to resolve a recipient", "to", actorID, "activity", activity.ID, "error", err)
if !errors.Is(err, ErrActorGone) && !errors.Is(err, ErrBlockedDomain) {
Expand Down Expand Up @@ -211,7 +218,7 @@ func (q *Queue) deliver(ctx context.Context, activity *ap.Activity, rawActivity

q.Log.Info("Delivering activity to recipient", "to", actorID, "inbox", inbox, "activity", activity.ID)

if err := q.Resolver.post(ctx, q.Log, q.DB, actor, &key, followers, inbox, rawActivity); err != nil {
if err := q.Resolver.post(ctx, q.Log, q.DB, actor, key, followers, inbox, rawActivity); err != nil {
q.Log.Warn("Failed to send an activity", "from", actor.ID, "to", actorID, "activity", activity.ID, "error", err)
if !errors.Is(err, ErrBlockedDomain) {
anyFailed = true
Expand Down
12 changes: 6 additions & 6 deletions fed/followers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"
"github.com/dimkr/tootik/ap"
"github.com/dimkr/tootik/cfg"
"github.com/dimkr/tootik/httpsig"
"github.com/dimkr/tootik/outbox"
"io"
"log/slog"
Expand All @@ -43,7 +44,7 @@ type Syncer struct {
Log *slog.Logger
DB *sql.DB
Resolver *Resolver
Actor *ap.Actor
Key httpsig.Key
}

type followersDigest struct {
Expand Down Expand Up @@ -116,7 +117,7 @@ func (f partialFollowers) Digest(ctx context.Context, db *sql.DB, domain string,
func (l *Listener) handleFollowers(w http.ResponseWriter, r *http.Request) {
name := r.PathValue("username")

sender, err := verify(r.Context(), l.Domain, l.Config, l.Log, r, nil, l.DB, l.Resolver, l.Actor, ap.InstanceActor)
sender, err := verify(r.Context(), l.Domain, l.Config, l.Log, r, nil, l.DB, l.Resolver, l.ActorKey, ap.InstanceActor)
if err != nil {
l.Log.Warn("Failed to verify followers request", "error", err)
w.WriteHeader(http.StatusUnauthorized)
Expand Down Expand Up @@ -221,7 +222,7 @@ func (l *Listener) saveFollowersDigest(ctx context.Context, sender *ap.Actor, he
return nil
}

func (d *followersDigest) Sync(ctx context.Context, domain string, cfg *cfg.Config, log *slog.Logger, db *sql.DB, resolver *Resolver, from *ap.Actor) error {
func (d *followersDigest) Sync(ctx context.Context, domain string, cfg *cfg.Config, log *slog.Logger, db *sql.DB, resolver *Resolver, key httpsig.Key) error {
if digest, err := digestFollowers(ctx, db, d.Followed, domain); err != nil {
return err
} else if err == nil && digest == d.Digest {
Expand All @@ -236,8 +237,7 @@ func (d *followersDigest) Sync(ctx context.Context, domain string, cfg *cfg.Conf

log.Info("Synchronizing followers", "followed", d.Followed)

var key key
resp, err := resolver.get(ctx, log, db, from, &key, d.URL)
resp, err := resolver.get(ctx, log, db, key, d.URL)
if err != nil {
return err
}
Expand Down Expand Up @@ -338,7 +338,7 @@ func (s *Syncer) processBatch(ctx context.Context) (int, error) {
rows.Close()

for _, job := range jobs {
if err := job.Sync(ctx, s.Domain, s.Config, s.Log, s.DB, s.Resolver, s.Actor); err != nil {
if err := job.Sync(ctx, s.Domain, s.Config, s.Log, s.DB, s.Resolver, s.Key); err != nil {
s.Log.Warn("Failed to sync followers", "actor", job.Followed, "error", err)
}

Expand Down
2 changes: 1 addition & 1 deletion fed/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (l *Listener) handleInbox(w http.ResponseWriter, r *http.Request) {
flags |= ap.Offline
}

sender, err := verify(r.Context(), l.Domain, l.Config, l.Log, r, body, l.DB, l.Resolver, l.Actor, flags)
sender, err := verify(r.Context(), l.Domain, l.Config, l.Log, r, body, l.DB, l.Resolver, l.ActorKey, flags)
if err != nil {
if errors.Is(err, ErrActorGone) {
w.WriteHeader(http.StatusOK)
Expand Down
62 changes: 0 additions & 62 deletions fed/key.go

This file was deleted.

4 changes: 2 additions & 2 deletions fed/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"crypto/tls"
"database/sql"
"errors"
"github.com/dimkr/tootik/ap"
"github.com/dimkr/tootik/cfg"
"github.com/dimkr/tootik/httpsig"
"github.com/fsnotify/fsnotify"
"log/slog"
"math"
Expand All @@ -40,7 +40,7 @@ type Listener struct {
Config *cfg.Config
DB *sql.DB
Resolver *Resolver
Actor *ap.Actor
ActorKey httpsig.Key
Log *slog.Logger
Addr string
Cert string
Expand Down
Loading

0 comments on commit e1f31e2

Please sign in to comment.