Skip to content

Commit

Permalink
Send messages in AttachmentMultisend
Browse files Browse the repository at this point in the history
  • Loading branch information
harry-signal authored Apr 22, 2024
1 parent e3dce9b commit e34e019
Showing 1 changed file with 141 additions and 8 deletions.
149 changes: 141 additions & 8 deletions SignalUI/AttachmentMultisend/AttachmentMultisend.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,117 @@ public class AttachmentMultisend {
approvedMessageBody: MessageBody?,
approvedAttachments: [SignalAttachment]
) -> AttachmentMultisend.Result {
fatalError("TODO")
let (preparedPromise, preparedFuture) = Promise<[PreparedOutgoingMessage]>.pending()
let (enqueuedPromise, enqueuedFuture) = Promise<[TSThread]>.pending()

let sentPromise = Promise<[TSThread]>.wrapAsync {
let threads: [TSThread]
let preparedMessages: [PreparedOutgoingMessage]
let sendPromises: [Promise<Void>]
do {
let segmentedAttachments = try await segmentAttachmentsIfNecessary(
for: conversations,
approvedAttachments: approvedAttachments
)
(threads, preparedMessages, sendPromises) = try await deps.databaseStorage.awaitableWrite { tx in
let threads: [TSThread]
let preparedMessages: [PreparedOutgoingMessage]
(threads, preparedMessages) = try prepareForSending(
conversations: conversations,
approvedMessageBody: approvedMessageBody,
approvedAttachments: segmentedAttachments,
tx: tx
)

let sendPromises: [Promise<Void>] = preparedMessages.map {
deps.messageSenderJobQueue.add(
.promise,
message: $0,
transaction: tx
)
}
return (threads, preparedMessages, sendPromises)
}
} catch let error {
preparedFuture.reject(error)
enqueuedFuture.reject(error)
throw error
}
preparedFuture.resolve(preparedMessages)
enqueuedFuture.resolve(threads)

try await withThrowingTaskGroup(of: Void.self) { taskGroup in
sendPromises.forEach { promise in
taskGroup.addTask {
try await promise.awaitable()
}
}
try await taskGroup.waitForAll()
}
return threads
}

return .init(
preparedPromise: preparedPromise,
enqueuedPromise: enqueuedPromise,
sentPromise: sentPromise
)
}

public class func sendTextAttachment(
_ textAttachment: UnsentTextAttachment,
to conversations: [ConversationItem]
) -> AttachmentMultisend.Result {
fatalError("TODO")
let (preparedPromise, preparedFuture) = Promise<[PreparedOutgoingMessage]>.pending()
let (enqueuedPromise, enqueuedFuture) = Promise<[TSThread]>.pending()

let sentPromise = Promise<[TSThread]>.wrapAsync {
let threads: [TSThread]
let preparedMessages: [PreparedOutgoingMessage]
let sendPromises: [Promise<Void>]
do {
(threads, preparedMessages, sendPromises) = try await deps.databaseStorage.awaitableWrite { tx in
let threads: [TSThread]
let preparedMessages: [PreparedOutgoingMessage]
(threads, preparedMessages) = try prepareForSending(
conversations: conversations,
textAttachment,
tx: tx
)

let sendPromises: [Promise<Void>] = preparedMessages.map {
deps.messageSenderJobQueue.add(
.promise,
message: $0,
transaction: tx
)
}
return (threads, preparedMessages, sendPromises)
}
} catch let error {
preparedFuture.reject(error)
enqueuedFuture.reject(error)
throw error
}
preparedFuture.resolve(preparedMessages)
enqueuedFuture.resolve(threads)

try await withThrowingTaskGroup(of: Void.self) { taskGroup in
sendPromises.forEach { promise in
taskGroup.addTask {
try await promise.awaitable()
}
}
try await taskGroup.waitForAll()
}
return threads
}

return .init(
preparedPromise: preparedPromise,
enqueuedPromise: enqueuedPromise,
sentPromise: sentPromise
)
}

// MARK: - Dependencies
Expand All @@ -57,6 +160,41 @@ public class AttachmentMultisend {
tsAccountManager: DependenciesBridge.shared.tsAccountManager
)

// MARK: - Segmenting Attachments

private class func segmentAttachmentsIfNecessary(
for conversations: [ConversationItem],
approvedAttachments: [SignalAttachment]
) async throws -> [SignalAttachment.SegmentAttachmentResult] {
let maxSegmentDurations = conversations.compactMap(\.videoAttachmentDurationLimit)
guard !maxSegmentDurations.isEmpty, let requiredSegmentDuration = maxSegmentDurations.min() else {
// No need to segment!
return approvedAttachments.map { .init($0) }
}

let qualityLevel = deps.databaseStorage.read(block: deps.imageQualityLevel.resolvedQuality(tx:))

let segmentedResults = try await withThrowingTaskGroup(
of: (Int, SignalAttachment.SegmentAttachmentResult).self
) { taskGroup in
for (index, attachment) in approvedAttachments.enumerated() {
taskGroup.addTask(operation: {
let result = try await attachment.preparedForOutput(qualityLevel: qualityLevel)
.segmentedIfNecessary(on: ThreadUtil.enqueueSendQueue, segmentDuration: requiredSegmentDuration)
.awaitable()
return (index, result)
})
}
var segmentedResults = [SignalAttachment.SegmentAttachmentResult?].init(repeating: nil, count: approvedAttachments.count)
for try await result in taskGroup {
segmentedResults[result.0] = result.1
}
return segmentedResults.compacted()
}

return segmentedResults
}

// MARK: - Preparing messages

private class func prepareForSending(
Expand All @@ -66,7 +204,7 @@ public class AttachmentMultisend {
tx: SDSAnyWriteTransaction
) throws -> ([TSThread], [PreparedOutgoingMessage]) {
let segmentedAttachments = approvedAttachments.reduce([], { arr, segmented in
return arr + (segmented.segmented ?? [])
return arr + (segmented.segmented ?? [segmented.original])
})
let unsegmentedAttachments = approvedAttachments.map(\.original)

Expand Down Expand Up @@ -196,11 +334,6 @@ public class AttachmentMultisend {
thread: TSThread,
tx: SDSAnyWriteTransaction
) throws -> PreparedOutgoingMessage {
let messageBodyForContext = messageBody?.forForwarding(
to: thread,
transaction: tx.unwrapGrdbRead
).asMessageBodyForForwarding()

let unpreparedMessage = UnpreparedOutgoingMessage.build(
thread: thread,
messageBody: messageBody,
Expand Down

0 comments on commit e34e019

Please sign in to comment.