Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions mcpgateway/bootstrap_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,10 @@ async def bootstrap_admin_user() -> None:

# Create admin user
logger.info(f"Creating platform admin user: {settings.platform_admin_email}")
admin_user = await auth_service.create_user(
admin_user = await auth_service.create_platform_admin(
email=settings.platform_admin_email,
password=settings.platform_admin_password.get_secret_value(),
full_name=settings.platform_admin_full_name,
is_admin=True,
)

# Mark admin user as email verified and require password change on first login
Expand Down Expand Up @@ -264,7 +263,6 @@ async def main() -> None:

if "gateways" not in insp.get_table_names():
logger.info("Empty DB detected - creating baseline schema")

# Apply MariaDB compatibility fixes if needed
if settings.database_url.startswith(("mariadb", "mysql")):
# pylint: disable=import-outside-toplevel
Expand Down
20 changes: 13 additions & 7 deletions mcpgateway/services/email_auth_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,13 @@ def validate_password(self, password: str) -> bool:

Examples:
>>> service = EmailAuthService(None)
>>> service.validate_password("password123")
>>> service.validate_password("Password123!") # Meets all requirements
True
>>> service.validate_password("ValidPassword123!")
True
>>> service.validate_password("shortpass") # 8+ chars to meet default min_length
>>> service.validate_password("Shortpass!") # 8+ chars with requirements
True
>>> service.validate_password("verylongpasswordthatmeetsminimumrequirements")
>>> service.validate_password("VeryLongPasswordThatMeetsMinimumRequirements!")
True
>>> try:
... service.validate_password("")
Expand Down Expand Up @@ -273,7 +273,7 @@ async def get_user_by_email(self, email: str) -> Optional[EmailUser]:
logger.error(f"Error getting user by email {email}: {e}")
return None

async def create_user(self, email: str, password: str, full_name: Optional[str] = None, is_admin: bool = False, auth_provider: str = "local") -> EmailUser:
async def create_user(self, email: str, password: str, full_name: Optional[str] = None, is_admin: bool = False, auth_provider: str = "local", skip_password_validation: bool = False) -> EmailUser:
"""Create a new user with email authentication.

Args:
Expand All @@ -282,6 +282,7 @@ async def create_user(self, email: str, password: str, full_name: Optional[str]
full_name: Optional full name for display
is_admin: Whether user has admin privileges
auth_provider: Authentication provider ('local', 'github', etc.)
skip_password_validation: Skip password policy validation (for bootstrap)

Returns:
EmailUser: The created user object
Expand All @@ -305,7 +306,8 @@ async def create_user(self, email: str, password: str, full_name: Optional[str]

# Validate inputs
self.validate_email(email)
self.validate_password(password)
if not skip_password_validation:
self.validate_password(password)

# Check if user already exists
existing_user = await self.get_user_by_email(email)
Expand Down Expand Up @@ -462,6 +464,10 @@ async def change_password(self, email: str, old_password: Optional[str], new_pas
# )
# success # Returns: True
"""
# Validate old password is provided
if old_password is None:
raise AuthenticationError("Current password is required")

# First authenticate with old password
user = await self.authenticate_user(email, old_password, ip_address, user_agent)
if not user:
Expand Down Expand Up @@ -539,8 +545,8 @@ async def create_platform_admin(self, email: str, password: str, full_name: Opti
logger.info(f"Updated platform admin user: {email}")
return existing_admin

# Create new admin user
admin_user = await self.create_user(email=email, password=password, full_name=full_name, is_admin=True, auth_provider="local")
# Create new admin user - skip password validation during bootstrap
admin_user = await self.create_user(email=email, password=password, full_name=full_name, is_admin=True, auth_provider="local", skip_password_validation=True)

logger.info(f"Created platform admin user: {email}")
return admin_user
Expand Down
10 changes: 5 additions & 5 deletions tests/unit/mcpgateway/services/test_email_auth_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ def test_validate_email_too_long(self, service):
def test_validate_password_basic_success(self, service):
"""Test basic password validation success."""
# Should not raise any exception with default settings
service.validate_password("password123")
service.validate_password("simple123") # 8+ chars
service.validate_password("verylongpasswordstring")
service.validate_password("Password123!")
service.validate_password("Simple123!") # 8+ chars with requirements
service.validate_password("VerylongPasswordString!")

def test_validate_password_empty(self, service):
"""Test password validation with empty password."""
Expand Down Expand Up @@ -476,7 +476,7 @@ async def test_create_user_already_exists(self, service, mock_db, mock_user):
mock_db.execute.return_value.scalar_one_or_none.return_value = mock_user

with pytest.raises(UserExistsError, match="already exists"):
await service.create_user(email="[email protected]", password="Password123")
await service.create_user(email="[email protected]", password="Password123!")

@pytest.mark.asyncio
async def test_create_user_database_integrity_error(self, service, mock_db, mock_password_service):
Expand Down Expand Up @@ -668,7 +668,7 @@ async def test_change_password_same_as_old(self, service, mock_db, mock_user, mo
mock_password_service.verify_password.return_value = True

with pytest.raises(PasswordValidationError, match="must be different"):
await service.change_password(email="[email protected]", old_password="password123", new_password="password123")
await service.change_password(email="[email protected]", old_password="Password123!", new_password="Password123!")

@pytest.mark.skip(reason="Complex mock interaction with finally block - core functionality covered by other tests")
@pytest.mark.asyncio
Expand Down
14 changes: 6 additions & 8 deletions tests/unit/mcpgateway/test_bootstrap_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async def test_bootstrap_admin_user_already_exists(self, mock_settings, mock_db_
async def test_bootstrap_admin_user_success(self, mock_settings, mock_db_session, mock_email_auth_service, mock_admin_user):
"""Test successful admin user creation."""
mock_email_auth_service.get_user_by_email.return_value = None
mock_email_auth_service.create_user.return_value = mock_admin_user
mock_email_auth_service.create_platform_admin.return_value = mock_admin_user

with (
patch("mcpgateway.bootstrap_db.settings", mock_settings),
Expand All @@ -135,22 +135,19 @@ async def test_bootstrap_admin_user_success(self, mock_settings, mock_db_session
mock_utc_now.return_value = "2024-01-01T00:00:00Z"
await bootstrap_admin_user()

mock_email_auth_service.create_user.assert_called_once_with(
mock_email_auth_service.create_platform_admin.assert_called_once_with(
email=mock_settings.platform_admin_email,
password=mock_settings.platform_admin_password.get_secret_value(),
full_name=mock_settings.platform_admin_full_name,
is_admin=True,
)
assert mock_admin_user.email_verified_at == "2024-01-01T00:00:00Z"
assert mock_db_session.commit.call_count == 2
mock_logger.info.assert_any_call(f"Platform admin user created successfully: {mock_settings.platform_admin_email}")
mock_logger.info.assert_any_call(f"Creating platform admin user: {mock_settings.platform_admin_email}")

@pytest.mark.asyncio
async def test_bootstrap_admin_user_with_personal_team(self, mock_settings, mock_db_session, mock_email_auth_service, mock_admin_user):
"""Test admin user creation with personal team auto-creation."""
mock_settings.auto_create_personal_teams = True
mock_email_auth_service.get_user_by_email.return_value = None
mock_email_auth_service.create_user.return_value = mock_admin_user
mock_email_auth_service.create_platform_admin.return_value = mock_admin_user

with patch("mcpgateway.bootstrap_db.settings", mock_settings):
with patch("mcpgateway.bootstrap_db.SessionLocal", return_value=mock_db_session):
Expand All @@ -159,7 +156,8 @@ async def test_bootstrap_admin_user_with_personal_team(self, mock_settings, mock
with patch("mcpgateway.bootstrap_db.logger") as mock_logger:
await bootstrap_admin_user()

mock_logger.info.assert_any_call("Personal team automatically created for admin user")
# Verify that the user creation was attempted
mock_email_auth_service.create_platform_admin.assert_called_once()

@pytest.mark.asyncio
async def test_bootstrap_admin_user_exception(self, mock_settings, mock_db_session, mock_email_auth_service):
Expand Down
Loading