Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 36 additions & 23 deletions rsconnect/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1554,23 +1554,13 @@ def server_settings(self):
def verify_api_key(self, server: Optional[RSConnectServer] = None):
"""
Verify that an API Key may be used to authenticate with the given Posit Connect server.
If the API key verifies, we return the username of the associated user.
"""
if not server:
server = self.remote_server
if isinstance(server, ShinyappsServer):
raise RSConnectException("Shinnyapps server does not use an API key.")
with RSConnectClient(server) as client:
result = client.me()
if isinstance(result, HTTPResponse):
if (
result.json_data
and isinstance(result.json_data, dict)
and "code" in result.json_data
and result.json_data["code"] == 30
):
raise RSConnectException("The specified API key is not valid.")
raise RSConnectException("Could not verify the API key: %s %s" % (result.status, result.reason))
verify_api_key_response(client)
return self

@property
Expand Down Expand Up @@ -2053,27 +2043,50 @@ def verify_server(connect_server: RSConnectServer):
raise RSConnectException("There is an SSL/TLS configuration problem: %s" % ssl_error)


def verify_api_key_response(client: RSConnectClient) -> Optional[UserRecord]:
"""
Issue GET v1/user and interpret the response for the purpose of API key verification.

:param client: a client configured with the credential to verify.
:return: the user record on success, or None for a valid credential that has no
associated user (a service principal or machine identity, for example one used
for trusted publishing).
:raises RSConnectException: if the credential is invalid or the request otherwise fails.
"""
# Use the raw response rather than client.me(), which would raise a generic error
# and discard the error code we need to distinguish the verification-specific cases
# below. Everything else (success, connection errors, other HTTP errors) is left to
# the standard handle_bad_response handler.
result = client.get("v1/user")
if isinstance(result, HTTPResponse) and not result.exception:
json_data = result.json_data if isinstance(result.json_data, dict) else {}
code = json_data.get("code")
# A service principal or machine identity authenticates successfully but has no
# associated user, so the v1/user endpoint rejects it with a 403 and error code

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary that we request the v1/user endpoint to validate the API key, or the OIDC token? I noticed that nothing else uses the .me() endpoint other than to check that an API key is valid.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also found this confusing. I believe the original point of it was to surface more user-friendly error messages, but I agree that I would personally prefer we just fail later on when attempting an actual operation. That felt like a more invasive behaviour change to make, though.

# 22. That code is unambiguous on this endpoint -- a genuinely invalid credential
# is rejected at the auth layer with code 30 instead -- so the credential is valid
# and we treat it as verified. This distinction only holds for v1/user, which is
# why it lives here rather than in handle_bad_response.
if result.status == 403 and code == 22:
Comment thread
atheriel marked this conversation as resolved.
return None
if code == 30:
raise RSConnectException("The specified API key is not valid.")
return cast(UserRecord, client._server.handle_bad_response(result))


def verify_api_key(connect_server: RSConnectServer) -> str:
"""
Verify that an API Key may be used to authenticate with the given Posit Connect server.
If the API key verifies, we return the username of the associated user.

:param connect_server: the Connect server information, including the API key to test.
:return: the username of the user to whom the API key belongs.
:return: the username of the user to whom the API key belongs, or an empty string for a
valid credential with no associated user (a service principal or machine identity).
"""
warn("This method has been moved and will be deprecated.", DeprecationWarning, stacklevel=2)
with RSConnectClient(connect_server) as client:
result = client.me()
if isinstance(result, HTTPResponse):
if (
result.json_data
and isinstance(result.json_data, dict)
and "code" in result.json_data
and result.json_data["code"] == 30
):
raise RSConnectException("The specified API key is not valid.")
raise RSConnectException("Could not verify the API key: %s %s" % (result.status, result.reason))
return result["username"]
user = verify_api_key_response(client)
return user["username"] if user else ""


def get_python_info(connect_server: Union[RSConnectServer, SPCSConnectServer]):
Expand Down
95 changes: 95 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
ShinyappsServer,
ShinyappsService,
SPCSConnectServer,
verify_api_key,
)
from rsconnect.exception import DeploymentFailedException, RSConnectException

Expand Down Expand Up @@ -99,6 +100,100 @@ def test_client_system_caches_runtime_list(self):
result = ce.runtime_caches
self.assertDictEqual(result, mocked_response)

@httpretty.activate(verbose=True, allow_net_connect=False)
def test_verify_api_key_user(self):
ce = RSConnectExecutor(None, None, "http://test-server/", "api_key")
httpretty.register_uri(
httpretty.GET,
"http://test-server/__api__/v1/user",
body=json.dumps({"username": "alice"}),
status=200,
forcing_headers={"Content-Type": "application/json"},
)
# Returns the executor without raising for a regular user.
self.assertIs(ce.verify_api_key(), ce)

@httpretty.activate(verbose=True, allow_net_connect=False)
def test_verify_api_key_service_principal(self):
# A service principal (e.g. for trusted publishing) authenticates but is not a
# user, so v1/user returns 403 / code 22. The credential is still valid, so
# verification should succeed instead of raising.
ce = RSConnectExecutor(None, None, "http://test-server/", "api_key")
httpretty.register_uri(
httpretty.GET,
"http://test-server/__api__/v1/user",
body=json.dumps({"code": 22, "error": "You don't have permission to perform this operation."}),
status=403,
forcing_headers={"Content-Type": "application/json"},
)
self.assertIs(ce.verify_api_key(), ce)

@httpretty.activate(verbose=True, allow_net_connect=False)
def test_verify_api_key_invalid(self):
ce = RSConnectExecutor(None, None, "http://test-server/", "api_key")
httpretty.register_uri(
httpretty.GET,
"http://test-server/__api__/v1/user",
body=json.dumps({"code": 30, "error": "Invalid login."}),
status=401,
forcing_headers={"Content-Type": "application/json"},
)
with self.assertRaises(RSConnectException) as cm:
ce.verify_api_key()
self.assertIn("not valid", str(cm.exception))

def test_verify_api_key_connection_error(self):
# A transport-layer failure yields an HTTPResponse with no status/reason, only
# an exception. Verification should surface a clean RSConnectException rather
# than an AttributeError from reading the missing status.
from rsconnect.http_support import HTTPResponse

ce = RSConnectExecutor(None, None, "http://test-server/", "api_key")
failed_response = HTTPResponse("http://test-server/__api__/v1/user", exception=OSError("connection refused"))
with patch.object(RSConnectClient, "get", return_value=failed_response):
with self.assertRaises(RSConnectException) as cm:
ce.verify_api_key()
self.assertIn("connection refused", str(cm.exception))

# The deprecated module-level verify_api_key() is reached via actions.test_api_key()
# during `rsconnect add`, so it must accept the same credentials as the executor path.
@httpretty.activate(verbose=True, allow_net_connect=False)
def test_module_verify_api_key_user(self):
httpretty.register_uri(
httpretty.GET,
"http://test-server/__api__/v1/user",
body=json.dumps({"username": "alice"}),
status=200,
forcing_headers={"Content-Type": "application/json"},
)
self.assertEqual(verify_api_key(RSConnectServer("http://test-server", "api_key")), "alice")

@httpretty.activate(verbose=True, allow_net_connect=False)
def test_module_verify_api_key_service_principal(self):
# A service principal authenticates but is not a user (403 / code 22); the
# credential is valid, so this returns an empty username instead of raising.
httpretty.register_uri(
httpretty.GET,
"http://test-server/__api__/v1/user",
body=json.dumps({"code": 22, "error": "You don't have permission to perform this operation."}),
status=403,
forcing_headers={"Content-Type": "application/json"},
)
self.assertEqual(verify_api_key(RSConnectServer("http://test-server", "api_key")), "")

@httpretty.activate(verbose=True, allow_net_connect=False)
def test_module_verify_api_key_invalid(self):
httpretty.register_uri(
httpretty.GET,
"http://test-server/__api__/v1/user",
body=json.dumps({"code": 30, "error": "Invalid login."}),
status=401,
forcing_headers={"Content-Type": "application/json"},
)
with self.assertRaises(RSConnectException) as cm:
verify_api_key(RSConnectServer("http://test-server", "api_key"))
self.assertIn("not valid", str(cm.exception))

# RSConnectExecutor.delete_runtime_cache() dry run returns expected request
# RSConnectExecutor.delete_runtime_cache() dry run prints expected messages
@httpretty.activate(verbose=True, allow_net_connect=False)
Expand Down
Loading