Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add file upload support #23

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add file upload support
  • Loading branch information
clementauger committed Aug 25, 2020
commit c5c2de7233d86b42ab91a7cb70dd9facb003bb4f
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ module github.com/knadh/niltalk
go 1.13

require (
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d
github.com/go-chi/chi v4.1.0+incompatible
github.com/gomodule/redigo v2.0.0+incompatible
github.com/gorilla/websocket v1.4.2
github.com/karrick/tparse/v2 v2.8.2
github.com/knadh/koanf v0.9.1
github.com/knadh/stuffbin v1.1.0
github.com/kr/pretty v0.1.0 // indirect
github.com/spf13/pflag v1.0.5
golang.org/x/crypto v0.0.0-20200403201458-baeed622b8d8
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
)
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -14,6 +16,8 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/karrick/tparse/v2 v2.8.2 h1:NhvrrB7nXYa0VLn0JKn9L3oG/GZN+LB/+g5QfWE30rU=
github.com/karrick/tparse/v2 v2.8.2/go.mod h1:OzmKMqNal7LYYHaO/Ie1f/wXmLWAaGKwJmxUFNQCVxg=
github.com/knadh/koanf v0.9.1 h1:qfcwiF9/Z8buTJ0QXaZvOxJ6eKJmOiiWKP/PktiW5RE=
github.com/knadh/koanf v0.9.1/go.mod h1:31bzRSM7vS5Vm9LNLo7B2Re1zhLOZT6EQKeodixBikE=
github.com/knadh/stuffbin v1.1.0 h1:f5S5BHzZALjuJEgTIOMC9NidEnBJM7Ze6Lu1GHR/lwU=
Expand All @@ -35,6 +39,8 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200403201458-baeed622b8d8 h1:fpnn/HnJONpIu6hkXi1u/7rR0NzilgWr4T0JmWkEitk=
golang.org/x/crypto v0.0.0-20200403201458-baeed622b8d8/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand All @@ -46,8 +52,11 @@ golang.org/x/sys v0.0.0-20200331124033-c3d80250170d h1:nc5K6ox/4lTFbMVSL9WRR81ix
golang.org/x/sys v0.0.0-20200331124033-c3d80250170d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e h1:EHBhcS0mlXEAVwNyO2dLfjToGsyY4j24pTs2ScHnX7s=
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
108 changes: 108 additions & 0 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,20 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"strings"
"sync"
"time"

"github.com/go-chi/chi"
"github.com/gorilla/websocket"
"github.com/knadh/niltalk/internal/hub"
"github.com/knadh/niltalk/internal/upload"
"golang.org/x/crypto/bcrypt"
"golang.org/x/time/rate"
)

const (
Expand Down Expand Up @@ -327,3 +334,104 @@ func readJSONReq(r *http.Request, o interface{}) error {
}
return json.Unmarshal(b, o)
}

// handleUpload handles file uploads.
func handleUpload(store *upload.Store, maxUploadSize int64, rlPeriod time.Duration, rlCount float64, rlBurst int) func(w http.ResponseWriter, r *http.Request) {

type roomLimiter struct {
limiter *rate.Limiter
expire time.Time
}
var mu sync.Mutex
roomLimiters := map[string]roomLimiter{}
go func() {
t := time.NewTicker(rlPeriod + (time.Minute))
defer t.Stop()
for range t.C {
now := time.Now()
mu.Lock()
for k, r := range roomLimiters {
if r.expire.Before(now) {
delete(roomLimiters, k)
}
}
mu.Unlock()
}
}()

return func(w http.ResponseWriter, r *http.Request) {
r.ParseMultipartForm(maxUploadSize)

roomID := chi.URLParam(r, "roomID")
mu.Lock()
// no defer here becasue file upload can be slow, thus lock for too long
x, ok := roomLimiters[roomID]
if !ok {
x = roomLimiter{
limiter: rate.NewLimiter(rate.Every(rlPeriod/time.Duration(rlCount)), rlBurst),
expire: time.Now().Add(time.Minute * 10),
}
roomLimiters[roomID] = x
}
x.expire = time.Now().Add(time.Minute * 10)
roomLimiters[roomID] = x
mu.Unlock()
if !x.limiter.Allow() {
err := errors.New(http.StatusText(http.StatusTooManyRequests))
respondJSON(w, nil, err, http.StatusTooManyRequests)
return
}

var ids []string
for i := 0; i < 20; i++ {
key := fmt.Sprintf("file%v", i)
file, handler, err := r.FormFile(key)
if err == http.ErrMissingFile {
break
}
if err != nil {
continue
}
defer file.Close()
b, err := ioutil.ReadAll(file)
if err != nil {
continue
}
mimeType := http.DetectContentType(b)
if mimeType == "image/gif" || mimeType == "image/jpeg" || mimeType == "image/png" {
name := handler.Filename
up, err := store.Add(name, mimeType, b)
if err != nil {
continue
}
ids = append(ids, fmt.Sprintf("%v_%v", up.ID, up.Name))
}
}

respondJSON(w, struct {
IDs []string `json:"ids"`
}{ids}, nil, http.StatusOK)
}
}

// handleUploaded uploaded files display.
func handleUploaded(store *upload.Store, maxAge time.Duration) func(w http.ResponseWriter, r *http.Request) {
maxAgeHeader := fmt.Sprintf("max-age=%v", int64(maxAge/time.Second))
return func(w http.ResponseWriter, r *http.Request) {
fileID := chi.URLParam(r, "fileID")
fileID = strings.Split(fileID, "_")[0]
up, err := store.Get(fileID)
if err != nil {
log.Println(err)
respondJSON(w, nil, err, http.StatusNotFound)
return
}
w.Header().Add("Content-Type", up.MimeType)
w.Header().Add("Content-Length", fmt.Sprint(len(up.Data)))
if maxAge > 0 {
w.Header().Add("Cache-Control", maxAgeHeader)
}
w.WriteHeader(http.StatusOK)
w.Write(up.Data)
}
}
1 change: 1 addition & 0 deletions internal/hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
const (
TypeTyping = "typing"
TypeMessage = "message"
TypeUpload = "upload"
TypePeerList = "peer.list"
TypePeerInfo = "peer.info"
TypePeerJoin = "peer.join"
Expand Down
31 changes: 30 additions & 1 deletion internal/hub/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,36 @@ func (p *Peer) processMessage(b []byte) {
// TODO: Respond
return
}
p.room.Broadcast(p.room.makeMessagePayload(msg, p), true)
p.room.Broadcast(p.room.makeMessagePayload(msg, p, m.Type), true)

case TypeUpload:
// Check rate limits and update counters.
now := time.Now()
if p.numMessages > 0 {
if (p.numMessages%p.room.hub.cfg.RateLimitMessages+1) >= p.room.hub.cfg.RateLimitMessages &&
time.Since(p.lastMessage) < p.room.hub.cfg.RateLimitInterval {
p.room.hub.Store.RemoveSession(p.ID, p.room.ID)
p.writeWSControl(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, TypePeerRateLimited))
p.ws.Close()
return
}
}
p.lastMessage = now
p.numMessages++

msgs, ok := m.Data.([]interface{})
if !ok {
// TODO: Respond
return
}
for _, msg := range msgs {
x, ok := msg.(string)
if !ok {
continue
}
p.room.Broadcast(p.room.makeMessagePayload(x, p, m.Type), true)
}

// "Typing" status.
case TypeTyping:
Expand Down
4 changes: 2 additions & 2 deletions internal/hub/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,13 @@ func (r *Room) makePeerUpdatePayload(p *Peer, peerUpdateType string) []byte {
}

// makeMessagePayload prepares a chat message.
func (r *Room) makeMessagePayload(msg string, p *Peer) []byte {
func (r *Room) makeMessagePayload(msg string, p *Peer, typ string) []byte {
d := payloadMsgChat{
PeerID: p.ID,
PeerHandle: p.Handle,
Msg: msg,
}
return r.makePayload(d, TypeMessage)
return r.makePayload(d, typ)
}

// makePayload prepares a message payload.
Expand Down
102 changes: 102 additions & 0 deletions internal/upload/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package upload

import (
"crypto/sha1"
"errors"
"fmt"
"sync"
"time"
)

// Config represents the file upload options.
type Config struct {
MaxMemory string `koanf:"max-memory"`
MaxUploadSize string `koanf:"max-upload-size"`
MaxAge string `koanf:"max-age"`
RateLimitPeriod string `koanf:"rate-limit-period"`
RateLimitCount string `koanf:"rate-limit-count"`
RateLimitBurst string `koanf:"rate-limit-burst"`
}

// Store file uploads in memory.
type Store struct {
cfg Config
maxMem int64
mu sync.Mutex
items map[string]File
size int64
}

// File represents an upload.
type File struct {
CreatedAt time.Time
Data []byte
ID string
Name string
MimeType string
}

// New returns a new file uplod store.
func New(cfg Config, maxMemory int64) *Store {
return &Store{
cfg: cfg,
maxMem: maxMemory,
items: make(map[string]File),
}
}

// Add a new item to the store.
func (s *Store) Add(name, mimeType string, data []byte) (File, error) {
h := sha1.New()
h.Write(data)
id := fmt.Sprintf("%x", h.Sum(nil))
s.mu.Lock()
defer s.mu.Unlock()
up, ok := s.items[id]
if ok {
return up, nil
}
up.CreatedAt = time.Now()
up.ID = id
up.Name = name
up.MimeType = mimeType
up.Data = make([]byte, len(data), len(data))
copy(up.Data, data)
s.items[id] = up
s.size += int64(len(data))
for s.size > s.maxMem {
var oldest *File
for _, up := range s.items {
if oldest == nil {
oldest = &up
} else if up.CreatedAt.Before(oldest.CreatedAt) {
oldest = &up
}
}
if oldest != nil {
s.size -= int64(len(oldest.Data))
delete(s.items, oldest.ID)
}
}
if len(s.items) < 1 {
return up, ErrFileTooLarge
}
return up, nil
}

// Get the file with given id.
func (s *Store) Get(id string) (File, error) {
s.mu.Lock()
defer s.mu.Unlock()
up, ok := s.items[id]
if !ok {
return up, ErrFileNotFound
}
return up, nil
}

// ErrFileNotFound indicates that the requested file was not found.
var ErrFileNotFound = errors.New("file not found")

// ErrFileTooLarge indicates that the file was too large.
var ErrFileTooLarge = errors.New("file too large")
Loading