e2e: add test for OAuth Enrollment -> OAuth Authentication
This commit is contained in:
		| @ -2,11 +2,11 @@ | ||||
| from os.path import abspath | ||||
| from time import sleep | ||||
|  | ||||
| from oauth2_provider.generators import generate_client_id, generate_client_secret | ||||
| from oauth2_provider.generators import generate_client_secret | ||||
| from selenium.webdriver.common.by import By | ||||
| from selenium.webdriver.common.keys import Keys | ||||
| from selenium.webdriver.support import expected_conditions as ec | ||||
| from yaml import safe_dump, safe_load | ||||
| from yaml import safe_dump | ||||
|  | ||||
| from docker import DockerClient, from_env | ||||
| from docker.models.containers import Container | ||||
| @ -79,12 +79,7 @@ class TestSourceOAuth(SeleniumTestCase): | ||||
|                 interval=5 * 100 * 1000000, | ||||
|                 start_period=1 * 100 * 1000000, | ||||
|             ), | ||||
|             volumes={ | ||||
|                 abspath(CONFIG_PATH): { | ||||
|                     "bind": "/config.yml", | ||||
|                     "mode": "ro", | ||||
|                 } | ||||
|             }, | ||||
|             volumes={abspath(CONFIG_PATH): {"bind": "/config.yml", "mode": "ro",}}, | ||||
|         ) | ||||
|         while True: | ||||
|             container.reload() | ||||
| @ -168,3 +163,50 @@ class TestSourceOAuth(SeleniumTestCase): | ||||
|             self.driver.find_element(By.ID, "id_email").get_attribute("value"), | ||||
|             "admin@example.com", | ||||
|         ) | ||||
|  | ||||
|     def test_oauth_enroll_auth(self): | ||||
|         """test OAuth Source With With OIDC (enroll and authenticate again)""" | ||||
|         self.test_oauth_enroll() | ||||
|         # We're logged in at the end of this, log out and re-login | ||||
|         self.driver.find_element(By.CSS_SELECTOR, "[aria-label=logout]").click() | ||||
|  | ||||
|         self.wait.until( | ||||
|             ec.presence_of_element_located( | ||||
|                 (By.CLASS_NAME, "pf-c-login__main-footer-links-item-link") | ||||
|             ) | ||||
|         ) | ||||
|         self.driver.find_element( | ||||
|             By.CLASS_NAME, "pf-c-login__main-footer-links-item-link" | ||||
|         ).click() | ||||
|  | ||||
|         # Now we should be at the IDP, wait for the login field | ||||
|         self.wait.until(ec.presence_of_element_located((By.ID, "login"))) | ||||
|         self.driver.find_element(By.ID, "login").send_keys("admin@example.com") | ||||
|         self.driver.find_element(By.ID, "password").send_keys("password") | ||||
|         self.driver.find_element(By.ID, "password").send_keys(Keys.ENTER) | ||||
|  | ||||
|         # Wait until we're logged in | ||||
|         self.wait.until( | ||||
|             ec.presence_of_element_located((By.CSS_SELECTOR, "button[type=submit]")) | ||||
|         ) | ||||
|         self.driver.find_element(By.CSS_SELECTOR, "button[type=submit]").click() | ||||
|  | ||||
|         # Wait until we've loaded the user info page | ||||
|         self.wait.until(ec.presence_of_element_located((By.LINK_TEXT, "foo"))) | ||||
|         self.driver.find_element(By.LINK_TEXT, "foo").click() | ||||
|  | ||||
|         self.wait_for_url(self.url("passbook_core:user-settings")) | ||||
|         self.assertEqual( | ||||
|             self.driver.find_element(By.XPATH, "//a[contains(@href, '/-/user/')]").text, | ||||
|             "foo", | ||||
|         ) | ||||
|         self.assertEqual( | ||||
|             self.driver.find_element(By.ID, "id_username").get_attribute("value"), "foo" | ||||
|         ) | ||||
|         self.assertEqual( | ||||
|             self.driver.find_element(By.ID, "id_name").get_attribute("value"), "admin", | ||||
|         ) | ||||
|         self.assertEqual( | ||||
|             self.driver.find_element(By.ID, "id_email").get_attribute("value"), | ||||
|             "admin@example.com", | ||||
|         ) | ||||
|  | ||||
| @ -48,59 +48,55 @@ class OAuthCallback(OAuthClientMixin, View): | ||||
|             self.source = OAuthSource.objects.get(slug=slug) | ||||
|         except OAuthSource.DoesNotExist: | ||||
|             raise Http404(f"Unknown OAuth source '{slug}'.") | ||||
|         else: | ||||
|             if not self.source.enabled: | ||||
|                 raise Http404(f"Source {slug} is not enabled.") | ||||
|             client = self.get_client(self.source) | ||||
|             callback = self.get_callback_url(self.source) | ||||
|             # Fetch access token | ||||
|             token = client.get_access_token(self.request, callback=callback) | ||||
|             if token is None: | ||||
|                 return self.handle_login_failure( | ||||
|                     self.source, "Could not retrieve token." | ||||
|                 ) | ||||
|             if "error" in token: | ||||
|                 return self.handle_login_failure(self.source, token["error"]) | ||||
|             # Fetch profile info | ||||
|             info = client.get_profile_info(token) | ||||
|             if info is None: | ||||
|                 return self.handle_login_failure( | ||||
|                     self.source, "Could not retrieve profile." | ||||
|                 ) | ||||
|             identifier = self.get_user_id(self.source, info) | ||||
|             if identifier is None: | ||||
|                 return self.handle_login_failure(self.source, "Could not determine id.") | ||||
|             # Get or create access record | ||||
|             defaults = { | ||||
|                 "access_token": token.get("access_token"), | ||||
|             } | ||||
|             existing = UserOAuthSourceConnection.objects.filter( | ||||
|                 source=self.source, identifier=identifier | ||||
|             ) | ||||
|  | ||||
|             if existing.exists(): | ||||
|                 connection = existing.first() | ||||
|                 connection.access_token = token.get("access_token") | ||||
|                 UserOAuthSourceConnection.objects.filter(pk=connection.pk).update( | ||||
|                     **defaults | ||||
|                 ) | ||||
|             else: | ||||
|                 connection = UserOAuthSourceConnection( | ||||
|                     source=self.source, | ||||
|                     identifier=identifier, | ||||
|                     access_token=token.get("access_token"), | ||||
|                 ) | ||||
|             user = AuthorizedServiceBackend().authenticate( | ||||
|                 source=self.source, identifier=identifier, request=request | ||||
|         if not self.source.enabled: | ||||
|             raise Http404(f"Source {slug} is not enabled.") | ||||
|         client = self.get_client(self.source) | ||||
|         callback = self.get_callback_url(self.source) | ||||
|         # Fetch access token | ||||
|         token = client.get_access_token(self.request, callback=callback) | ||||
|         if token is None: | ||||
|             return self.handle_login_failure(self.source, "Could not retrieve token.") | ||||
|         if "error" in token: | ||||
|             return self.handle_login_failure(self.source, token["error"]) | ||||
|         # Fetch profile info | ||||
|         info = client.get_profile_info(token) | ||||
|         if info is None: | ||||
|             return self.handle_login_failure(self.source, "Could not retrieve profile.") | ||||
|         identifier = self.get_user_id(self.source, info) | ||||
|         if identifier is None: | ||||
|             return self.handle_login_failure(self.source, "Could not determine id.") | ||||
|         # Get or create access record | ||||
|         defaults = { | ||||
|             "access_token": token.get("access_token"), | ||||
|         } | ||||
|         existing = UserOAuthSourceConnection.objects.filter( | ||||
|             source=self.source, identifier=identifier | ||||
|         ) | ||||
|  | ||||
|         if existing.exists(): | ||||
|             connection = existing.first() | ||||
|             connection.access_token = token.get("access_token") | ||||
|             UserOAuthSourceConnection.objects.filter(pk=connection.pk).update( | ||||
|                 **defaults | ||||
|             ) | ||||
|             if user is None: | ||||
|                 if self.request.user.is_authenticated: | ||||
|                     LOGGER.debug("Linking existing user", source=self.source) | ||||
|                     return self.handle_existing_user_link(self.source, connection, info) | ||||
|                 LOGGER.debug("Handling enrollment of new user", source=self.source) | ||||
|                 return self.handle_enroll(self.source, connection, info) | ||||
|             LOGGER.debug("Handling existing user", source=self.source) | ||||
|             return self.handle_existing_user(self.source, user, connection, info) | ||||
|         else: | ||||
|             connection = UserOAuthSourceConnection( | ||||
|                 source=self.source, | ||||
|                 identifier=identifier, | ||||
|                 access_token=token.get("access_token"), | ||||
|             ) | ||||
|         user = AuthorizedServiceBackend().authenticate( | ||||
|             source=self.source, identifier=identifier, request=request | ||||
|         ) | ||||
|         if user is None: | ||||
|             if self.request.user.is_authenticated: | ||||
|                 LOGGER.debug("Linking existing user", source=self.source) | ||||
|                 return self.handle_existing_user_link(self.source, connection, info) | ||||
|             LOGGER.debug("Handling enrollment of new user", source=self.source) | ||||
|             return self.handle_enroll(self.source, connection, info) | ||||
|         LOGGER.debug("Handling existing user", source=self.source) | ||||
|         return self.handle_existing_user(self.source, user, connection, info) | ||||
|  | ||||
|     # pylint: disable=unused-argument | ||||
|     def get_callback_url(self, source: OAuthSource) -> str: | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	 Jens Langhammer
					Jens Langhammer