Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.25.8

require (
firebase.google.com/go/v4 v4.20.0
github.com/android-sms-gateway/client-go v1.13.0
github.com/android-sms-gateway/client-go v1.13.1-0.20260617073313-8d7d78d0d762
github.com/ansrivas/fiberprometheus/v2 v2.17.0
github.com/capcom6/go-helpers v0.4.0
github.com/capcom6/go-infra-fx v0.5.7
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/MicahParks/keyfunc v1.9.0 h1:lhKd5xrFHLNOWrDc4Tyb/Q1AJ4LCzQ48GVJyVIID3+o=
github.com/MicahParks/keyfunc v1.9.0/go.mod h1:IdnCilugA0O/99dW+/MkvlyrsX8+L8+x95xuVNtM5jw=
github.com/android-sms-gateway/client-go v1.13.0 h1:EvKDi796R2ScCAiaekWYRekVjkjiJGK3d/LZcPKpfQQ=
github.com/android-sms-gateway/client-go v1.13.0/go.mod h1:DQsReciU1xcaVW3T5Z2bqslNdsAwCFCtghawmA6g6L4=
github.com/android-sms-gateway/client-go v1.13.1-0.20260617073313-8d7d78d0d762 h1:UEUGzQuuqp2ifPJVIoZgH7ZFms2MsAQR7ajUvJ5wk6s=
github.com/android-sms-gateway/client-go v1.13.1-0.20260617073313-8d7d78d0d762/go.mod h1:DQsReciU1xcaVW3T5Z2bqslNdsAwCFCtghawmA6g6L4=
github.com/andybalholm/brotli v1.2.1 h1:R+f5xP285VArJDRgowrfb9DqL18yVK0gKAW/F+eTWro=
github.com/andybalholm/brotli v1.2.1/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
github.com/ansrivas/fiberprometheus/v2 v2.17.0 h1:p0gqs5LsSCWGoSFF44fCJkyU+XcE6TLRqEMu80b2iCo=
Expand Down
1 change: 1 addition & 0 deletions internal/sms-gateway/handlers/converters/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func MessageToMobileDTO(m messages.Message) smsgateway.MobileMessage {
ScheduleAt: m.ScheduleAt,
Priority: m.Priority,
},
State: smsgateway.ProcessingState(m.State),
CreatedAt: m.CreatedAt,
}
}
Expand Down
30 changes: 30 additions & 0 deletions internal/sms-gateway/handlers/messages/3rdparty.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,35 @@ func (h *ThirdPartyController) get(userID string, c *fiber.Ctx) error {
return c.JSON(converters.MessageStateToDTO(*state))
}

// @Summary Cancel message
// @Description Cancels a pending message by ID. The message must be in Pending state.
// @Security ApiAuth
// @Security JWTAuth
// @Tags User, Messages
// @Param id path string true "Message ID"
// @Success 200 {object} smsgateway.GetMessageResponse "Message state after cancellation"
// @Failure 400 {object} smsgateway.ErrorResponse "Invalid request"
// @Failure 401 {object} smsgateway.ErrorResponse "Unauthorized"
// @Failure 403 {object} smsgateway.ErrorResponse "Forbidden"
// @Failure 404 {object} smsgateway.ErrorResponse "Message not found"
// @Failure 500 {object} smsgateway.ErrorResponse "Internal server error"
// @Router /3rdparty/v1/messages/{id} [delete]
//
// Cancel message.
func (h *ThirdPartyController) delete(userID string, c *fiber.Ctx) error {
id := c.Params("id")

state, err := h.messagesSvc.CancelMessage(userID, id)
if err != nil {
if errors.Is(err, messages.ErrMessageNotPending) || errors.Is(err, messages.ErrMessageNotFound) {
return fiber.NewError(fiber.StatusBadRequest, err.Error())
}
return fmt.Errorf("failed to cancel message: %w", err)
}

return c.JSON(converters.MessageStateToDTO(*state))
}

// Export inbox.
//
// Deprecated: use /3rdparty/v1/inbox/refresh instead.
Expand Down Expand Up @@ -319,6 +348,7 @@ func (h *ThirdPartyController) Register(router fiber.Router) {
router.Get("", permissions.RequireScope(ScopeList), userauth.WithUserID(h.list))
router.Post("", permissions.RequireScope(ScopeSend), userauth.WithUserID(h.post))
router.Get(":id", permissions.RequireScope(ScopeRead), userauth.WithUserID(h.get)).Name(route3rdPartyGetMessage)
router.Delete(":id", permissions.RequireScope(ScopeCancel), userauth.WithUserID(h.delete))

router.Post("inbox/export", permissions.RequireScope(ScopeExport), userauth.WithUserID(h.postInboxExport))
}
2 changes: 1 addition & 1 deletion internal/sms-gateway/handlers/messages/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (p *thirdPartyGetQueryParams) ToFilter() messages.SelectFilter {
}

if p.State != "" {
filter.State = messages.ProcessingState(p.State)
filter.State = append(filter.State, messages.ProcessingState(p.State))
}

if p.DeviceID != "" {
Expand Down
2 changes: 2 additions & 0 deletions internal/sms-gateway/handlers/messages/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ const (
ScopeRead = smsgateway.ScopeMessagesRead
// ScopeList is the permission scope required for listing messages.
ScopeList = smsgateway.ScopeMessagesList
// ScopeCancel is the permission scope required for cancelling messages.
ScopeCancel = smsgateway.ScopeMessagesCancel
// ScopeExport is the permission scope required for exporting messages.
ScopeExport = smsgateway.ScopeMessagesExport
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE `messages`
MODIFY COLUMN `state` enum(
'Pending',
'Cancelling',
'Cancelled',
'Processed',
'Sent',
'Delivered',
'Failed'
) NOT NULL DEFAULT 'Pending';
-- +goose StatementEnd
-- +goose StatementBegin
ALTER TABLE `message_recipients`
MODIFY COLUMN `state` enum(
'Pending',
'Cancelling',
'Cancelled',
'Processed',
'Sent',
'Delivered',
'Failed'
) NOT NULL DEFAULT 'Pending';
-- +goose StatementEnd
-- +goose StatementBegin
ALTER TABLE `message_states`
MODIFY COLUMN `state` enum(
'Pending',
'Cancelling',
'Cancelled',
'Processed',
'Sent',
'Delivered',
'Failed'
) NOT NULL;
-- +goose StatementEnd
---
-- +goose Down
-- +goose StatementBegin
ALTER TABLE `messages`
MODIFY COLUMN `state` enum(
'Pending',
'Processed',
'Sent',
'Delivered',
'Failed'
) NOT NULL DEFAULT 'Pending';
-- +goose StatementEnd
-- +goose StatementBegin
ALTER TABLE `message_recipients`
MODIFY COLUMN `state` enum(
'Pending',
'Processed',
'Sent',
'Delivered',
'Failed'
) NOT NULL DEFAULT 'Pending';
-- +goose StatementEnd
-- +goose StatementBegin
ALTER TABLE `message_states`
MODIFY COLUMN `state` enum(
'Pending',
'Processed',
'Sent',
'Delivered',
'Failed'
) NOT NULL;
-- +goose StatementEnd
6 changes: 6 additions & 0 deletions internal/sms-gateway/modules/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ func NewMessagesExportRequestedEvent(
)
}

func NewMessageCancelledEvent(messageID string) Event {
return NewEvent(smsgateway.PushMessageCancelled, map[string]string{
"messageId": messageID,
})
}

func NewSettingsUpdatedEvent() Event {
return NewEvent(smsgateway.PushSettingsUpdated, nil)
}
1 change: 1 addition & 0 deletions internal/sms-gateway/modules/messages/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func messageToDomain(input messageModel) (Message, error) {
ScheduleAt: input.ScheduleAt,
Priority: smsgateway.MessagePriority(input.Priority),
},
State: input.State,
CreatedAt: input.CreatedAt,
}, nil
}
Expand Down
1 change: 1 addition & 0 deletions internal/sms-gateway/modules/messages/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type MessageInput struct {
type Message struct {
MessageInput

State ProcessingState
CreatedAt time.Time
}

Expand Down
1 change: 1 addition & 0 deletions internal/sms-gateway/modules/messages/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ var (
ErrMessageNotFound = errors.New("message not found")
ErrMultipleMessagesFound = errors.New("multiple messages found")
ErrNoContent = errors.New("no text or data content")
ErrMessageNotPending = errors.New("message is not pending")

ErrQueueLimitExceeded = errors.New("queue limits exceeded")
)
Expand Down
18 changes: 10 additions & 8 deletions internal/sms-gateway/modules/messages/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ type ProcessingState string
type MessageType string

const (
ProcessingStatePending ProcessingState = "Pending"
ProcessingStateProcessed ProcessingState = "Processed"
ProcessingStateSent ProcessingState = "Sent"
ProcessingStateDelivered ProcessingState = "Delivered"
ProcessingStateFailed ProcessingState = "Failed"
ProcessingStatePending ProcessingState = "Pending"
ProcessingStateCancelling ProcessingState = "Cancelling"
ProcessingStateCancelled ProcessingState = "Cancelled"
ProcessingStateProcessed ProcessingState = "Processed"
ProcessingStateSent ProcessingState = "Sent"
ProcessingStateDelivered ProcessingState = "Delivered"
ProcessingStateFailed ProcessingState = "Failed"

MessageTypeText MessageType = "Text"
MessageTypeData MessageType = "Data"
Expand All @@ -34,7 +36,7 @@ type messageModel struct {
ExtID string `gorm:"not null;type:varchar(36);uniqueIndex:unq_messages_id_device,priority:1"`
Type MessageType `gorm:"not null;type:enum('Text','Data');default:Text"`
Content string `gorm:"not null;type:text"`
State ProcessingState `gorm:"not null;type:enum('Pending','Sent','Processed','Delivered','Failed');default:Pending;index:idx_messages_device_state"`
State ProcessingState `gorm:"not null;type:enum('Pending','Cancelling','Cancelled','Sent','Processed','Delivered','Failed');default:Pending;index:idx_messages_device_state"`
ValidUntil *time.Time `gorm:"type:datetime"`
ScheduleAt *time.Time `gorm:"type:datetime"`
SimNumber *uint8 `gorm:"type:tinyint(1) unsigned"`
Expand Down Expand Up @@ -197,7 +199,7 @@ type messageRecipientModel struct {
ID uint64 `gorm:"primaryKey;type:BIGINT UNSIGNED;autoIncrement"`
MessageID uint64 `gorm:"uniqueIndex:unq_message_recipients_message_id_phone_number,priority:1;type:BIGINT UNSIGNED"`
PhoneNumber string `gorm:"uniqueIndex:unq_message_recipients_message_id_phone_number,priority:2;type:varchar(128)"`
State ProcessingState `gorm:"not null;type:enum('Pending','Sent','Processed','Delivered','Failed');default:Pending"`
State ProcessingState `gorm:"not null;type:enum('Pending','Cancelling','Cancelled','Sent','Processed','Delivered','Failed');default:Pending"`
Error *string `gorm:"type:varchar(256)"`
}

Expand Down Expand Up @@ -226,7 +228,7 @@ func (m *messageRecipientModel) toDomain() smsgateway.RecipientState {
type messageStateModel struct {
ID uint64 `gorm:"primaryKey;type:BIGINT UNSIGNED;autoIncrement"`
MessageID uint64 `gorm:"not null;type:BIGINT UNSIGNED;uniqueIndex:unq_message_states_message_id_state,priority:1"`
State ProcessingState `gorm:"not null;type:enum('Pending','Sent','Processed','Delivered','Failed');uniqueIndex:unq_message_states_message_id_state,priority:2"`
State ProcessingState `gorm:"not null;type:enum('Pending','Cancelling','Cancelled','Sent','Processed','Delivered','Failed');uniqueIndex:unq_message_states_message_id_state,priority:2"`
UpdatedAt time.Time `gorm:"<-:create;not null;autoupdatetime:false"`
}

Expand Down
37 changes: 32 additions & 5 deletions internal/sms-gateway/modules/messages/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func (r *Repository) list(filter SelectFilter, options SelectOptions) ([]message
}

// Apply state filter
if filter.State != "" {
query = query.Where("messages.state = ?", filter.State)
if len(filter.State) > 0 {
query = query.Where("messages.state IN ?", filter.State)
}

// Apply device filter
Expand Down Expand Up @@ -103,7 +103,7 @@ func (r *Repository) list(filter SelectFilter, options SelectOptions) ([]message

func (r *Repository) listPending(deviceID string, order Order) ([]messageModel, error) {
messages, _, err := r.list(
*new(SelectFilter).WithDeviceID(deviceID).WithState(ProcessingStatePending),
*new(SelectFilter).WithDeviceID(deviceID).WithState(ProcessingStatePending).WithState(ProcessingStateCancelling),
*new(SelectOptions).IncludeContent().IncludeRecipients().WithLimit(maxPendingBatch).WithOrderBy(order),
)

Expand Down Expand Up @@ -177,7 +177,7 @@ func (r *Repository) UpdateState(message *messageModel) error {
func (r *Repository) HashProcessed(ctx context.Context, ids []uint64) (int64, error) {
rawSQL := "UPDATE `messages` `m`, `message_recipients` `r`\n" +
"SET `m`.`is_hashed` = true, `m`.`content` = SHA2(COALESCE(JSON_VALUE(`content`, '$.text'), JSON_VALUE(`content`, '$.data')), 256), `r`.`phone_number` = LEFT(SHA2(phone_number, 256), 16)\n" +
"WHERE `m`.`id` = `r`.`message_id` AND `m`.`is_hashed` = false AND `m`.`is_encrypted` = false AND `m`.`state` <> 'Pending'"
"WHERE `m`.`id` = `r`.`message_id` AND `m`.`is_hashed` = false AND `m`.`is_encrypted` = false AND `m`.`state` NOT IN ('Pending', 'Cancelling')"
params := []any{}
if len(ids) > 0 {
rawSQL += " AND `m`.`id` IN (?)"
Expand All @@ -193,10 +193,37 @@ func (r *Repository) HashProcessed(ctx context.Context, ids []uint64) (int64, er
return res.RowsAffected, nil
}

func (r *Repository) CancelMessage(userID string, id string) error {
res := r.db.Model((*messageModel)(nil)).
Joins("JOIN devices ON messages.device_id = devices.id").
Where("messages.ext_id = ? AND devices.user_id = ? AND messages.state = ?", id, userID, ProcessingStatePending).
Update("messages.state", ProcessingStateCancelling)
if res.Error != nil {
return fmt.Errorf("failed to cancel message: %w", res.Error)
}
if res.RowsAffected == 0 {
return ErrMessageNotPending
}
return nil
}

func (r *Repository) ConfirmCancel(id string, deviceID string) error {
res := r.db.Model((*messageModel)(nil)).
Where("ext_id = ? AND device_id = ? AND state = ?", id, deviceID, ProcessingStateCancelling).
Update("state", ProcessingStateCancelled)
if res.Error != nil {
return fmt.Errorf("failed to confirm cancel: %w", res.Error)
}
if res.RowsAffected == 0 {
return ErrMessageNotFound
}
return nil
}

func (r *Repository) Cleanup(ctx context.Context, until time.Time) (int64, error) {
res := r.db.
WithContext(ctx).
Where("state <> ?", ProcessingStatePending).
Where("state NOT IN ?", []ProcessingState{ProcessingStatePending, ProcessingStateCancelling}).
Where("created_at < ?", until).
Delete(new(messageModel))
return res.RowsAffected, res.Error
Expand Down
4 changes: 2 additions & 2 deletions internal/sms-gateway/modules/messages/repository_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type SelectFilter struct {
DeviceID string
StartDate time.Time
EndDate time.Time
State ProcessingState
State []ProcessingState
}

func (f *SelectFilter) WithExtID(extID string) *SelectFilter {
Expand All @@ -44,7 +44,7 @@ func (f *SelectFilter) WithDateRange(start, end time.Time) *SelectFilter {
}

func (f *SelectFilter) WithState(state ProcessingState) *SelectFilter {
f.State = state
f.State = append(f.State, state)
return f
}

Expand Down
30 changes: 30 additions & 0 deletions internal/sms-gateway/modules/messages/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,36 @@ func (s *Service) GetState(userID string, id string) (*MessageState, error) {
return state, nil
}

// CancelMessage transitions a pending message to Cancelling state.
// Returns an error if the message is not in Pending state.
func (s *Service) CancelMessage(userID string, id string) (*MessageState, error) {
message, err := s.messages.get(
*new(SelectFilter).WithExtID(id).WithUserID(userID),
*new(SelectOptions).IncludeDevice(),
)
if err != nil {
return nil, err
}

if cancelErr := s.messages.CancelMessage(userID, id); cancelErr != nil {
return nil, cancelErr
}

// Notify device about cancellation
go func(userID, deviceID, messageID string) {
if ntfErr := s.eventsSvc.Notify(userID, &deviceID, events.NewMessageCancelledEvent(messageID)); ntfErr != nil {
s.logger.Error(
"failed to notify device about cancellation",
zap.Error(ntfErr),
zap.String("user_id", userID),
zap.String("device_id", deviceID),
)
}
}(userID, message.DeviceID, id)

return s.GetState(userID, id)
}

func (s *Service) Enqueue(
ctx context.Context,
device devices.Device,
Expand Down
Loading
Loading