Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build_test_deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ jobs:
FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }}
environment:
name: fly
url: https://invisiblethreads.fly.dev
url: https://invisiblethreads.jdav.dev
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ config :invisible_threads, InvisibleThreadsWeb.Endpoint,
#
# For production it's recommended to configure a different adapter
# at the `config/runtime.exs`.
config :invisible_threads, InvisibleThreads.Mailer, adapter: Swoosh.Adapters.Local
config :invisible_threads, InvisibleThreads.Mailer, adapter: InvisibleThreads.LocalSwooshAdapter

# Configure esbuild (the version is required)
config :esbuild,
Expand Down
2 changes: 1 addition & 1 deletion fly.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ kill_signal = 'SIGTERM'

[env]
DATA_DIR = '/data'
PHX_HOST = 'invisiblethreads.fly.dev'
PHX_HOST = 'invisiblethreads.jdav.dev'
PORT = '8080'

[http_service]
Expand Down
113 changes: 101 additions & 12 deletions lib/invisible_threads/conversations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ defmodule InvisibleThreads.Conversations do

alias InvisibleThreads.Accounts
alias InvisibleThreads.Accounts.Scope
alias InvisibleThreads.Accounts.User
alias InvisibleThreads.Conversations.EmailRecipient
alias InvisibleThreads.Conversations.EmailThread
alias InvisibleThreads.Conversations.ThreadNotifier

Expand Down Expand Up @@ -75,12 +77,20 @@ defmodule InvisibleThreads.Conversations do
"""
def create_email_thread(%Scope{} = scope, attrs) do
with {:ok, email_thread} <- EmailThread.new(scope, attrs),
{:ok, message_id} <-
{:ok, metadatas} <-
ThreadNotifier.deliver_introduction(email_thread, scope.user) do
updated_email_thread = struct!(email_thread, first_message_id: message_id)
ids_by_address = Map.new(metadatas, &{String.downcase(&1.to), &1.id})

updated_email_thread =
Map.update!(email_thread, :recipients, fn recipients ->
Enum.map(recipients, fn recipient ->
first_message_id = ids_by_address[String.downcase(recipient.address)]
Map.replace!(recipient, :first_message_id, first_message_id)
end)
end)

Accounts.update_user!(scope.user.id, fn user ->
struct!(user, email_threads: [updated_email_thread | user.email_threads])
Map.update!(user, :email_threads, &[updated_email_thread | &1])
end)

broadcast(scope, {:created, updated_email_thread})
Expand All @@ -99,10 +109,11 @@ defmodule InvisibleThreads.Conversations do

"""
def delete_email_thread(%Scope{} = scope, %EmailThread{} = email_thread) do
with {:ok, _message_id} <-
ThreadNotifier.deliver_closing(email_thread, scope.user) do
with {:ok, _metadatas} <- ThreadNotifier.deliver_closing(email_thread, scope.user) do
Accounts.update_user!(scope.user.id, fn user ->
struct!(user, email_threads: Enum.reject(user.email_threads, &(&1.id == email_thread.id)))
Map.update!(user, :email_threads, fn email_threads ->
Enum.reject(email_threads, &(&1.id == email_thread.id))
end)
end)

broadcast(scope, {:deleted, email_thread})
Expand All @@ -124,13 +135,91 @@ defmodule InvisibleThreads.Conversations do
EmailThread.changeset(email_thread, attrs, scope)
end

def forward_inbound_email(%Scope{} = scope, %{"MailboxHash" => email_thread_id} = params) do
case get_email_thread(scope, email_thread_id) do
%EmailThread{} = email_thread ->
ThreadNotifier.forward(email_thread, scope.user, params)
@doc """
Forward an inbound message to an email thread.
"""
def forward_inbound_email(%Scope{} = scope, %{"MailboxHash" => mailbox_hash} = params) do
with [email_thread_id, recipient_id] <- String.split(mailbox_hash, "_", parts: 2),
%EmailThread{} = email_thread <- get_email_thread(scope, email_thread_id),
%EmailRecipient{} = from_recipient <-
Enum.find(email_thread.recipients, &(&1.id == recipient_id)) do
ThreadNotifier.forward(email_thread, from_recipient, scope.user, params)
else
_other -> {:error, :unknown_thread}
end
end

@doc """
Remove a participant from an email thread.

nil ->
{:error, :unknown_thread}
If less than two participants remain, the thread is deleted.
"""
def unsubscribe!(user_id, email_thread_id, recipient_id) do
if original_user = Accounts.get_user(user_id) do
updated_user =
Accounts.update_user!(user_id, &reject_recipient(&1, email_thread_id, recipient_id))

updated_email_thread = Enum.find(updated_user.email_threads, &(&1.id == email_thread_id))

if length(updated_email_thread.recipients) < 2 do
updated_user
|> Scope.for_user()
|> delete_email_thread(updated_email_thread)
else
original_email_thread =
Enum.find(original_user.email_threads, &(&1.id == email_thread_id))

unsubscribed_recipient =
Enum.find(original_email_thread.recipients, &(&1.id == recipient_id))

{:ok, _metadatas} =
ThreadNotifier.deliver_unsubscribe(
updated_email_thread,
updated_user,
unsubscribed_recipient
)
end
end

:ok
end

defp reject_recipient(%User{} = user, email_thread_id, recipient_id) do
Map.update!(user, :email_threads, &reject_recipient(&1, email_thread_id, recipient_id))
end

defp reject_recipient(email_threads, email_thread_id, recipient_id) do
Enum.map(email_threads, fn
%EmailThread{id: ^email_thread_id} = email_recipient ->
reject_recipient(email_recipient, recipient_id)

email_thread ->
email_thread
end)
end

defp reject_recipient(email_thread, recipient_id) do
Map.update!(email_thread, :recipients, fn recipients ->
Enum.reject(recipients, &(&1.id == recipient_id))
end)
end

@doc """
Remove a participant from an email thread by recipient email address.

If less than two participants remain, the thread is deleted.
"""
def unsubscribe_by_address!(user_id, email_thread_id, recipient_address) do
recipient_address = String.downcase(recipient_address)

with %User{email_threads: email_threads} <- Accounts.get_user(user_id),
%EmailThread{recipients: recipients} <-
Enum.find(email_threads, &(&1.id == email_thread_id)),
%EmailRecipient{id: recipient_id} <-
Enum.find(recipients, &(String.downcase(&1.address) == recipient_address)) do
unsubscribe!(user_id, email_thread_id, recipient_id)
end

:ok
end
end
6 changes: 4 additions & 2 deletions lib/invisible_threads/conversations/email_recipient.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ defmodule InvisibleThreads.Conversations.EmailRecipient do
import Ecto.Changeset

@derive {Swoosh.Email.Recipient, name: :name, address: :address}
@primary_key false
@primary_key {:id, :binary_id, autogenerate: true}
embedded_schema do
field :name, :string
field :address, :string, primary_key: true, redact: true
field :address, :string, redact: true
field :first_message_id, :string
end

@doc false
Expand All @@ -22,5 +23,6 @@ defmodule InvisibleThreads.Conversations.EmailRecipient do
|> validate_length(:name, count: :codepoints, max: 255)
# Postmark limits at least some addresses to 255 UTF-16 code points
|> validate_length(:address, count: :codepoints, max: 255)
|> put_change(:id, Ecto.UUID.generate())
end
end
1 change: 0 additions & 1 deletion lib/invisible_threads/conversations/email_thread.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ defmodule InvisibleThreads.Conversations.EmailThread do
field :from, :string
field :subject, :string
embeds_many :recipients, InvisibleThreads.Conversations.EmailRecipient, on_replace: :delete
field :first_message_id, :string

timestamps type: :utc_datetime, updated_at: false
end
Expand Down
59 changes: 40 additions & 19 deletions lib/invisible_threads/conversations/thread_notifier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,53 @@ defmodule InvisibleThreads.Conversations.ThreadNotifier do
Deliver messages to email threads.
"""

use InvisibleThreadsWeb, :verified_routes

import Swoosh.Email

alias InvisibleThreads.Mailer

defp deliver(email, email_thread, user) do
[mailbox_name, domain] = String.split(user.inbound_address, "@", parts: 2)

email =
email
|> put_reply_to(email_thread, user.inbound_address)
|> bcc(email_thread.recipients)
|> subject(email_thread.subject)
|> put_headers(email_thread.first_message_id)
|> put_provider_option(:message_stream, email_thread.message_stream)
|> put_provider_option(:tag, email_thread.subject)

with {:ok, %{id: message_id}} <- Mailer.deliver(email, api_key: user.server_token) do
{:ok, message_id}
end
end
emails =
for recipient <- email_thread.recipients do
recipient_reply_to = "#{mailbox_name}+#{email_thread.id}_#{recipient.id}@#{domain}"

defp put_reply_to(email, email_thread, inbound_address) do
[mailbox_name, domain] = String.split(inbound_address, "@", parts: 2)
reply_to(email, "#{mailbox_name}+#{email_thread.id}@#{domain}")
email
|> to(recipient)
|> reply_to(recipient_reply_to)
|> put_in_reply_headers(recipient.first_message_id)
|> put_unsubscribe_headers(user, email_thread, recipient, recipient_reply_to)
end

Mailer.deliver_many(emails, api_key: user.server_token)
end

defp put_headers(email, in_reply_to) when is_binary(in_reply_to) do
defp put_in_reply_headers(email, in_reply_to) when is_binary(in_reply_to) do
email
|> header("In-Reply-To", in_reply_to)
|> header("References", in_reply_to)
end

defp put_headers(email, nil), do: email
defp put_in_reply_headers(email, nil), do: email

defp put_unsubscribe_headers(email, user, email_thread, recipient, recipient_reply_to) do
unsubscribe_url = url(~p"/api/postmark/unsubscribe/#{user}/#{email_thread}/#{recipient}")

email
|> header("List-Unsubscribe-Post", "List-Unsubscribe=One-Click")
|> header(
"List-Unsubscribe",
"<#{unsubscribe_url}>, <mailto:#{recipient_reply_to}?subject=unsubscribe>"
)
end

def deliver_introduction(email_thread, user) do
participants = Enum.map_join(email_thread.recipients, "\n- ", & &1.name)
Expand All @@ -54,6 +70,15 @@ defmodule InvisibleThreads.Conversations.ThreadNotifier do
|> deliver(email_thread, user)
end

def deliver_unsubscribe(email_thread, user, unsubscribed_recipient) do
new()
|> from({unsubscribed_recipient.name, email_thread.from})
|> text_body("""
#{unsubscribed_recipient.name} has unsubscribed from this thread.
""")
|> deliver(email_thread, user)
end

def deliver_closing(email_thread, user) do
new()
|> from({"Invisible Threads", email_thread.from})
Expand All @@ -63,18 +88,14 @@ defmodule InvisibleThreads.Conversations.ThreadNotifier do
|> deliver(email_thread, user)
end

def forward(email_thread, user, params) do
from_email = String.downcase(params["FromFull"]["Email"])

def forward(email_thread, from_recipient, user, params) do
email_thread =
Map.update!(email_thread, :recipients, fn recipients ->
Enum.reject(recipients, fn email_recipient ->
String.downcase(email_recipient.address) == from_email
end)
Enum.reject(recipients, &(&1.id == from_recipient.id))
end)

new()
|> from({params["FromFull"]["Name"], email_thread.from})
|> from({from_recipient.name, email_thread.from})
|> text_body(params["TextBody"])
|> html_body(params["HtmlBody"])
|> put_attachments(params["Attachments"])
Expand Down
28 changes: 28 additions & 0 deletions lib/invisible_threads/local_swoosh_adapter.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
defmodule InvisibleThreads.LocalSwooshAdapter do
@moduledoc ~S"""
An adapter that stores the email locally, using the specified storage driver.

This is a wrapper of `Swoosh.Adapters.Local` that includes `to` in the metadata returned by
`deliver_many/2`. This is consistent with `Swoosh.Adapters.Postmark`.
"""

use Swoosh.Adapter

defdelegate deliver(email, config), to: Swoosh.Adapters.Local

def deliver_many(emails, config) when is_list(emails) do
driver = storage_driver(config)

sent_email_metadatas =
Enum.map(emails, fn email ->
%Swoosh.Email{to: to, headers: %{"Message-ID" => id}} = driver.push(email)
%{id: id, to: to |> List.first() |> elem(1)}
end)

{:ok, sent_email_metadatas}
end

defp storage_driver(config) do
config[:storage_driver] || Swoosh.Adapters.Local.Storage.Memory
end
end
11 changes: 8 additions & 3 deletions lib/invisible_threads/postmark.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,18 @@ defmodule InvisibleThreads.Postmark do
@doc """
[List message streams](https://postmarkapp.com/developer/api/message-streams-api#list-message-streams).
"""
def list_broadcast_streams(server_token) do
def list_message_streams(server_token) do
server_token
|> new_req()
|> Req.get(url: "/message-streams", params: %{"MessageStreamType" => "Broadcasts"})
|> Req.get(url: "/message-streams")
|> case do
{:ok, %Req.Response{status: 200, body: %{"MessageStreams" => message_streams}}} ->
options = for %{"Name" => name, "ID" => id} <- message_streams, do: {name, id}
options =
for %{"ID" => id, "Name" => name, "MessageStreamType" => type} <- message_streams,
type != "Inbound" do
{name, id}
end

{:ok, options}

error ->
Expand Down
16 changes: 14 additions & 2 deletions lib/invisible_threads_web/controllers/postmark_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,22 @@ defmodule InvisibleThreadsWeb.PostmarkController do

plug :auth

def inbound_webhook(conn, %{
"Subject" => "unsubscribe",
"user_id" => user_id,
"MailboxHash" => email_thread_id,
"FromFull" => %{
"Email" => recipient_address
}
}) do
Conversations.unsubscribe_by_address!(user_id, email_thread_id, recipient_address)
send_resp(conn, 200, "")
end

def inbound_webhook(conn, params) do
case Conversations.forward_inbound_email(conn.assigns.current_scope, params) do
{:ok, message_id} ->
json(conn, %{id: message_id})
{:ok, _metadatas} ->
send_resp(conn, 200, "")

{:error, :unknown_thread} ->
# 403 will stop Postmark from retrying
Expand Down
14 changes: 14 additions & 0 deletions lib/invisible_threads_web/controllers/unsubscribe_controller.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
defmodule InvisibleThreadsWeb.UnsubscribeController do
use InvisibleThreadsWeb, :controller

alias InvisibleThreads.Conversations

def unsubscribe(conn, %{
"user_id" => user_id,
"email_thread_id" => email_thread_id,
"recipient_id" => recipient_id
}) do
Conversations.unsubscribe!(user_id, email_thread_id, recipient_id)
send_resp(conn, 200, "")
end
end
Loading