Skip to content

auth

Module with classes and functions used for authentication and password handling.

Classes:

Name Description
AuthCallback

Class to handle authorization callback.

AuthContext

Class to handle authentication context and related operations.

AuthUser

Class to handle user authentication and management.

DataLunchLoginHandler

Custom Panel login Handler.

DataLunchProvider

Custom Panel auth provider.

PasswordEncrypt

Class that store the encrypted value of a password.

PasswordHash

Class that store the hashed value of a password.

Attributes:

Name Type Description
log Logger

Module logger.

pwd_context CryptContext

Crypt context with configurations for passlib (selected algorithm, etc.).

log module-attribute

log: Logger = getLogger(__name__)

Module logger.

pwd_context module-attribute

pwd_context: CryptContext = CryptContext(
    schemes=["pbkdf2_sha256"], deprecated="auto"
)

Crypt context with configurations for passlib (selected algorithm, etc.).

AuthCallback

Class to handle authorization callback.

Parameters:

Name Type Description Default
config DictConfig

Hydra configuration dictionary.

required
authorize_guest_users bool

Set to True to enable the main page to guest users. Defaults to False.

False

Methods:

Name Description
__init__
authorize

Authorization callback: read config, user info and the target path of the

Attributes:

Name Type Description
authorize_guest_users
config
Source code in dlunch/auth.py
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
class AuthCallback:
    """Class to handle authorization callback.

    Args:
        config (DictConfig): Hydra configuration dictionary.
        authorize_guest_users (bool, optional): Set to `True` to enable the main page to guest users.
            Defaults to `False`.
    """

    def __init__(
        self, config: DictConfig, authorize_guest_users: bool = False
    ) -> None:
        self.config = config
        self.authorize_guest_users = authorize_guest_users

    def authorize(self, user_info: dict, target_path: str) -> bool:
        """Authorization callback: read config, user info and the target path of the
        requested resource.

        Return `True` (authorized) or `False` (not authorized) by checking current user
        and target path.

        Args:
            user_info (dict): dictionary with user info passed by Panel to the authorization handle.
            target_path (str): path of the requested resource.

        Returns:
            bool: authorization flag. `True` if authorized.
        """
        # Set authenticated user from panel state (authentication context is
        # instantiated automatically)
        auth_user = AuthUser(config=self.config)
        # If authorization is not active authorize every user
        if not auth_user.auth_context.is_auth_active():
            return True
        # Get privileged users
        privileged_users = auth_user.auth_context.list_privileged_users()
        log.debug(f"target path: {target_path}")
        # If user is not authenticated block it
        if not auth_user.name:
            log.debug("user not authenticated")
            return False
        # All privileged users can reach backend (but the backend will have
        # controls only for admins)
        if auth_user.name in privileged_users:
            return True
        # If the target is the mainpage always authorized (if authenticated)
        if self.authorize_guest_users and (target_path == "/"):
            return True

        # In all other cases, don't authorize and logout
        log.debug("not authorized")
        pn.state.location.pathname.split("/")[0] + "/logout"
        return False

authorize_guest_users instance-attribute

authorize_guest_users = authorize_guest_users

config instance-attribute

config = config

__init__

__init__(
    config: DictConfig, authorize_guest_users: bool = False
) -> None
Source code in dlunch/auth.py
986
987
988
989
990
def __init__(
    self, config: DictConfig, authorize_guest_users: bool = False
) -> None:
    self.config = config
    self.authorize_guest_users = authorize_guest_users

authorize

authorize(user_info: dict, target_path: str) -> bool

Authorization callback: read config, user info and the target path of the requested resource.

Return True (authorized) or False (not authorized) by checking current user and target path.

Parameters:

Name Type Description Default
user_info dict

dictionary with user info passed by Panel to the authorization handle.

required
target_path str

path of the requested resource.

required

Returns:

Type Description
bool

authorization flag. True if authorized.

Source code in dlunch/auth.py
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
def authorize(self, user_info: dict, target_path: str) -> bool:
    """Authorization callback: read config, user info and the target path of the
    requested resource.

    Return `True` (authorized) or `False` (not authorized) by checking current user
    and target path.

    Args:
        user_info (dict): dictionary with user info passed by Panel to the authorization handle.
        target_path (str): path of the requested resource.

    Returns:
        bool: authorization flag. `True` if authorized.
    """
    # Set authenticated user from panel state (authentication context is
    # instantiated automatically)
    auth_user = AuthUser(config=self.config)
    # If authorization is not active authorize every user
    if not auth_user.auth_context.is_auth_active():
        return True
    # Get privileged users
    privileged_users = auth_user.auth_context.list_privileged_users()
    log.debug(f"target path: {target_path}")
    # If user is not authenticated block it
    if not auth_user.name:
        log.debug("user not authenticated")
        return False
    # All privileged users can reach backend (but the backend will have
    # controls only for admins)
    if auth_user.name in privileged_users:
        return True
    # If the target is the mainpage always authorized (if authenticated)
    if self.authorize_guest_users and (target_path == "/"):
        return True

    # In all other cases, don't authorize and logout
    log.debug("not authorized")
    pn.state.location.pathname.split("/")[0] + "/logout"
    return False

AuthContext

Class to handle authentication context and related operations.

Parameters:

Name Type Description Default
config DictConfig

Hydra configuration dictionary.

required

Methods:

Name Description
__init__
auth_type

Check configuration object and return authentication type.

backend_submit_password

Submit password to database from backend but used also from frontend as part of submit_password function.

generate_password

Generate a random password.

is_auth_active

Check configuration object and return True if basic authentication or OAuth is active.

is_basic_auth_active

Check configuration object and return True if basic authentication is active.

list_privileged_users

List only privileged users (from privileged_users table).

list_users_guests_and_privileges

Join privileged_users and credentials tables to list normal users,

set_app_auth_and_encryption

Setup Panel authorization and encryption.

set_guest_user_password

If guest user is active return a password, otherwise return an empty string.

submit_password

Same as backend_submit_password with an additional check on old password.

Attributes:

Name Type Description
config

Hydra configuration dictionary.

database_connector DatabaseConnector

Object that handles database connection and operations

Source code in dlunch/auth.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
class AuthContext:
    """Class to handle authentication context and related operations.

    Args:
        config (DictConfig): Hydra configuration dictionary.
    """

    def __init__(self, config: DictConfig) -> None:
        self.config = config
        """Hydra configuration dictionary."""
        self.database_connector: models.DatabaseConnector = (
            models.DatabaseConnector(config=config)
        )
        """Object that handles database connection and operations"""

    def is_basic_auth_active(self) -> bool:
        """Check configuration object and return `True` if basic authentication is active.
        Return `False` otherwise.

        Returns:
            bool: `True` if basic authentication is active, `False` otherwise.
        """
        # Check if a valid basic_auth key exists
        auth_provider = self.config.get("basic_auth", None)
        return auth_provider is not None

    def is_auth_active(self) -> bool:
        """Check configuration object and return `True` if basic authentication or OAuth is active.
        Return `False` otherwise.

        Returns:
            bool: `True` if authentication (basic or OAuth) is active, `False` otherwise.
        """
        # Check if a valid oauth key exists
        auth_provider = self.is_basic_auth_active()
        oauth_provider = (
            self.config.server.get("oauth_provider", None) is not None
        )
        return auth_provider or oauth_provider

    def auth_type(self) -> str | None:
        """Check configuration object and return authentication type.

        Returns:
            str | None: authentication type. None if no authentication is active.
        """
        if self.is_basic_auth_active():
            auth_type = "basic"
        elif self.config.server.get("oauth_provider", None) is not None:
            auth_type = self.config.server.oauth_provider
        else:
            auth_type = None

        return auth_type

    def set_app_auth_and_encryption(self) -> None:
        """Setup Panel authorization and encryption.

        Namely:
            - Encryption key
            - Cookie expiry date

        Raises:
            ImportError: missing library (cryptography).
        """
        # Encryption key
        try:
            if self.config.auth.oauth_encryption_key:
                pn.config.oauth_encryption_key = (
                    self.config.auth.oauth_encryption_key.encode("ascii")
                )
                pn.state.encryption = Fernet(pn.config.oauth_encryption_key)
        except ConfigAttributeError:
            log.warning(
                "missing authentication encryption key, generate a key with the `panel oauth-secret` CLI command and then provide it to hydra using the DATA_LUNCH_OAUTH_ENC_KEY environment variable"
            )
        # Cookie expiry date
        try:
            if self.config.auth.oauth_expiry:
                pn.config.oauth_expiry = self.config.auth.oauth_expiry
        except ConfigAttributeError:
            log.warning(
                "missing explicit authentication expiry date for cookies, defaults to 1 day"
            )

    def list_privileged_users(self) -> list[str]:
        """List only privileged users (from `privileged_users` table).

        Returns:
            list[str]: list of usernames.
        """
        session = self.database_connector.create_session()

        with session:
            privileged_users = session.scalars(
                select(models.PrivilegedUsers)
            ).all()

        # Return users
        users_list = [u.user for u in privileged_users]
        users_list.sort()

        return users_list

    def list_users_guests_and_privileges(self) -> pd.DataFrame:
        """Join `privileged_users` and `credentials` tables to list normal users,
        admins and guests.

        Returns a dataframe.

        Returns:
            pd.DataFrame: dataframe with users and privileges.
        """

        # Query tables required to understand users and guests
        df_privileged_users = models.PrivilegedUsers.read_as_df(
            config=self.config,
            index_col="user",
        )
        # Leave credentials table empty if basic auth is not active
        if self.is_basic_auth_active():
            df_credentials = models.Credentials.read_as_df(
                config=self.config,
                index_col="user",
            )
        else:
            df_credentials = pd.DataFrame()

        # Change admin column to privileges (used after join)
        df_privileged_users["group"] = df_privileged_users.admin.map(
            {True: "admin", False: "user"}
        )
        df_user_guests_privileges = df_privileged_users.join(
            df_credentials, how="outer"
        )[["group"]]
        df_user_guests_privileges = df_user_guests_privileges.fillna("guest")

        return df_user_guests_privileges

    @staticmethod
    def generate_password(
        alphabet: str | None = None,
        special_chars: str | None = "",
        length: int = 12,
    ) -> str:
        """Generate a random password.

        Args:
            alphabet (str | None, optional): list of characters to use as alphabet to generate the password.
                Defaults to None.
            special_chars (str | None, optional): special characters to include inside the password string.
                Defaults to "".
            length (int, optional): length of the random password.
                Defaults to 12.

        Returns:
            str: random password.
        """
        # If alphabet is not avilable use a default one
        if alphabet is None:
            alphabet = string.ascii_letters + string.digits + special_chars
        # Infinite loop for finding a valid password
        while True:
            password = "".join(secrets.choice(alphabet) for i in range(length))
            # Create special chars condition only if special chars is non-empty
            if special_chars:
                special_chars_condition = any(
                    c in special_chars for c in password
                )
            else:
                special_chars_condition = True
            if (
                any(c.islower() for c in password)
                and any(c.isupper() for c in password)
                and any(c.isdigit() for c in password)
                and special_chars_condition
            ):
                break
        return password

    def set_guest_user_password(self) -> str:
        """If guest user is active return a password, otherwise return an empty string.

        This function always returns an empty string if basic authentication is not active.

        Guest user and basic authentication are handled through configuration files.

        If the flag `reset_guest_user_password` is set to `True` the password is created
        and uploaded to database. Otherwise the existing password is queried from database
        `credentials` table.

        Returns:
            str: guest user password or empty string if basic authentication is not active.
        """
        # Check if basic auth is active
        if self.is_basic_auth_active():
            # If active basic_auth.guest_user is true if guest user is active
            is_guest_user_active = self.config.basic_auth.guest_user
            log.debug(f"guest user flag is {is_guest_user_active}")
        else:
            # Otherwise the guest user feature is not applicable
            is_guest_user_active = False
            log.debug("guest user not applicable")

        # Set the guest password variable
        if is_guest_user_active:
            # If flag for resetting the password does not exist use the default
            # value
            if (
                self.database_connector.get_flag(
                    id="reset_guest_user_password"
                )
                is None
            ):
                self.database_connector.set_flag(
                    id="reset_guest_user_password",
                    value=self.config.basic_auth.default_reset_guest_user_password_flag,
                )
            # Generate a random password only if requested (check on flag)
            # otherwise load from database
            if self.database_connector.get_flag(
                id="reset_guest_user_password"
            ):
                # Turn off reset user password (in order to reset it only once)
                # This statement also acquire a lock on database (so it is
                # called first)
                self.database_connector.set_flag(
                    id="reset_guest_user_password",
                    value=False,
                )
                # Create password
                guest_password = self.generate_password(
                    special_chars=self.config.basic_auth.psw_special_chars,
                    length=self.config.basic_auth.generated_psw_length,
                )
                # Add hashed password to database
                AuthUser(
                    config=self.config, auth_context=self, name="guest"
                ).add_user_hashed_password(guest_password)
            else:
                # Load from database
                session = self.database_connector.create_session()
                with session:
                    try:
                        guest_password = session.get(
                            models.Credentials, "guest"
                        ).password_encrypted.decrypt()
                    except InvalidToken:
                        # Notify exception and suggest to reset guest user password
                        guest_password = ""
                        log.warning(
                            "Unable to decrypt 'guest' user password because an invalid token has been detected: reset password from backend"
                        )
                        pn.state.notifications.warning(
                            "Unable to decrypt 'guest' user password<br>Invalid token detected: reset password from backend",
                            duration=self.config.panel.notifications.duration,
                        )
        else:
            guest_password = ""

        return guest_password

    def submit_password(self, gi: gui.GraphicInterface) -> bool:
        """Same as backend_submit_password with an additional check on old password.

        Args:
            gi (gui.GraphicInterface): graphic interface object (used to interact with Panel widgets).

        Returns:
            bool: true if successful, false otherwise.
        """
        # Get authenticated user from Panel state
        auth_user = AuthUser(config=self.config, auth_context=self)
        # Get username, updated updated at each key press
        old_password_key_press = gi.password_widget._widgets[
            "old_password"
        ].value_input
        # Check if old password is correct
        if auth_user.password_hash == old_password_key_press:
            # Then run the same checks used for backend
            return self.backend_submit_password(
                gi=gi, user=auth_user.name, logout_on_success=True
            )
        else:
            pn.state.notifications.error(
                "Incorrect old password!",
                duration=self.config.panel.notifications.duration,
            )
        return False

    def backend_submit_password(
        self,
        gi: gui.GraphicInterface | gui.BackendInterface,
        user: str = None,
        user_is_guest: bool | None = None,
        user_is_admin: bool | None = None,
        logout_on_success: bool = False,
    ) -> bool:
        """Submit password to database from backend but used also from frontend as part of `submit_password` function.

        Args:
            gi (gui.GraphicInterface | gui.BackendInterface): graphic interface object (used to interact with Panel widgets).
            user (str, optional): username. Defaults to None.
            user_is_guest (bool | None, optional): guest flag (true if guest). Defaults to None.
            user_is_admin (bool | None, optional): admin flag (true if admin). Defaults to None.
            logout_on_success (bool, optional): set to true to force logout once the new password is set. Defaults to False.

        Returns:
            bool: true if successful, false otherwise.
        """
        # Check if user is passed, otherwise check if backend widget
        # (password_widget.object.user) is available
        if not user:
            username = gi.password_widget._widgets["user"].value_input
        else:
            username = user
        # Get all passwords, updated at each key press
        new_password_key_press = gi.password_widget._widgets[
            "new_password"
        ].value_input
        repeat_new_password_key_press = gi.password_widget._widgets[
            "repeat_new_password"
        ].value_input
        # Check if new password match repeat password
        if username:
            if new_password_key_press == repeat_new_password_key_press:
                # Check if new password is valid with regex
                if re.fullmatch(
                    self.config.basic_auth.psw_regex,
                    new_password_key_press,
                ):
                    auth_user = AuthUser(
                        config=self.config, auth_context=self, name=username
                    )
                    # If is_guest and is_admin are None (not passed) use the ones
                    # already set for the user
                    if user_is_guest is None:
                        user_is_guest = auth_user.is_guest(
                            allow_override=False
                        )
                    if user_is_admin is None:
                        user_is_admin = auth_user.is_admin()
                    # First remove user from both 'privileged_users' and
                    # 'credentials' tables.
                    deleted_data = auth_user.remove_user()
                    if (deleted_data["privileged_users_deleted"] > 0) or (
                        deleted_data["credentials_deleted"] > 0
                    ):
                        pn.state.notifications.success(
                            f"Removed old data for<br>'{username}'<br>auth: {deleted_data['privileged_users_deleted']}<br>cred: {deleted_data['credentials_deleted']}",
                            duration=self.config.panel.notifications.duration,
                        )
                    else:
                        pn.state.notifications.warning(
                            f"Creating new user<br>'{username}' does not exist",
                            duration=self.config.panel.notifications.duration,
                        )
                    # Add a privileged users only if guest option is not active
                    if not user_is_guest:
                        auth_user.add_privileged_user(is_admin=user_is_admin)
                    # Green light: update the password!
                    auth_user.add_user_hashed_password(
                        password=new_password_key_press
                    )

                    # Logout if requested
                    if logout_on_success:
                        pn.state.notifications.success(
                            "Password updated<br>Logging out",
                            duration=self.config.panel.notifications.duration,
                        )
                        sleep(4)
                        gi.force_logout()
                    else:
                        pn.state.notifications.success(
                            "Password updated",
                            duration=self.config.panel.notifications.duration,
                        )
                    return True
                else:
                    pn.state.notifications.error(
                        "Password requirements not satisfied<br>Check again!",
                        duration=self.config.panel.notifications.duration,
                    )
            else:
                pn.state.notifications.error(
                    "Passwords are different!",
                    duration=self.config.panel.notifications.duration,
                )
        else:
            pn.state.notifications.error(
                "Missing user!",
                duration=self.config.panel.notifications.duration,
            )

        return False

config instance-attribute

config = config

Hydra configuration dictionary.

database_connector instance-attribute

database_connector: DatabaseConnector = DatabaseConnector(
    config=config
)

Object that handles database connection and operations

__init__

__init__(config: DictConfig) -> None
Source code in dlunch/auth.py
406
407
408
409
410
411
412
def __init__(self, config: DictConfig) -> None:
    self.config = config
    """Hydra configuration dictionary."""
    self.database_connector: models.DatabaseConnector = (
        models.DatabaseConnector(config=config)
    )
    """Object that handles database connection and operations"""

auth_type

auth_type() -> str | None

Check configuration object and return authentication type.

Returns:

Type Description
str | None

authentication type. None if no authentication is active.

Source code in dlunch/auth.py
439
440
441
442
443
444
445
446
447
448
449
450
451
452
def auth_type(self) -> str | None:
    """Check configuration object and return authentication type.

    Returns:
        str | None: authentication type. None if no authentication is active.
    """
    if self.is_basic_auth_active():
        auth_type = "basic"
    elif self.config.server.get("oauth_provider", None) is not None:
        auth_type = self.config.server.oauth_provider
    else:
        auth_type = None

    return auth_type

backend_submit_password

backend_submit_password(
    gi: GraphicInterface | BackendInterface,
    user: str = None,
    user_is_guest: bool | None = None,
    user_is_admin: bool | None = None,
    logout_on_success: bool = False,
) -> bool

Submit password to database from backend but used also from frontend as part of submit_password function.

Parameters:

Name Type Description Default
gi GraphicInterface | BackendInterface

graphic interface object (used to interact with Panel widgets).

required
user str

username. Defaults to None.

None
user_is_guest bool | None

guest flag (true if guest). Defaults to None.

None
user_is_admin bool | None

admin flag (true if admin). Defaults to None.

None
logout_on_success bool

set to true to force logout once the new password is set. Defaults to False.

False

Returns:

Type Description
bool

true if successful, false otherwise.

Source code in dlunch/auth.py
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
def backend_submit_password(
    self,
    gi: gui.GraphicInterface | gui.BackendInterface,
    user: str = None,
    user_is_guest: bool | None = None,
    user_is_admin: bool | None = None,
    logout_on_success: bool = False,
) -> bool:
    """Submit password to database from backend but used also from frontend as part of `submit_password` function.

    Args:
        gi (gui.GraphicInterface | gui.BackendInterface): graphic interface object (used to interact with Panel widgets).
        user (str, optional): username. Defaults to None.
        user_is_guest (bool | None, optional): guest flag (true if guest). Defaults to None.
        user_is_admin (bool | None, optional): admin flag (true if admin). Defaults to None.
        logout_on_success (bool, optional): set to true to force logout once the new password is set. Defaults to False.

    Returns:
        bool: true if successful, false otherwise.
    """
    # Check if user is passed, otherwise check if backend widget
    # (password_widget.object.user) is available
    if not user:
        username = gi.password_widget._widgets["user"].value_input
    else:
        username = user
    # Get all passwords, updated at each key press
    new_password_key_press = gi.password_widget._widgets[
        "new_password"
    ].value_input
    repeat_new_password_key_press = gi.password_widget._widgets[
        "repeat_new_password"
    ].value_input
    # Check if new password match repeat password
    if username:
        if new_password_key_press == repeat_new_password_key_press:
            # Check if new password is valid with regex
            if re.fullmatch(
                self.config.basic_auth.psw_regex,
                new_password_key_press,
            ):
                auth_user = AuthUser(
                    config=self.config, auth_context=self, name=username
                )
                # If is_guest and is_admin are None (not passed) use the ones
                # already set for the user
                if user_is_guest is None:
                    user_is_guest = auth_user.is_guest(
                        allow_override=False
                    )
                if user_is_admin is None:
                    user_is_admin = auth_user.is_admin()
                # First remove user from both 'privileged_users' and
                # 'credentials' tables.
                deleted_data = auth_user.remove_user()
                if (deleted_data["privileged_users_deleted"] > 0) or (
                    deleted_data["credentials_deleted"] > 0
                ):
                    pn.state.notifications.success(
                        f"Removed old data for<br>'{username}'<br>auth: {deleted_data['privileged_users_deleted']}<br>cred: {deleted_data['credentials_deleted']}",
                        duration=self.config.panel.notifications.duration,
                    )
                else:
                    pn.state.notifications.warning(
                        f"Creating new user<br>'{username}' does not exist",
                        duration=self.config.panel.notifications.duration,
                    )
                # Add a privileged users only if guest option is not active
                if not user_is_guest:
                    auth_user.add_privileged_user(is_admin=user_is_admin)
                # Green light: update the password!
                auth_user.add_user_hashed_password(
                    password=new_password_key_press
                )

                # Logout if requested
                if logout_on_success:
                    pn.state.notifications.success(
                        "Password updated<br>Logging out",
                        duration=self.config.panel.notifications.duration,
                    )
                    sleep(4)
                    gi.force_logout()
                else:
                    pn.state.notifications.success(
                        "Password updated",
                        duration=self.config.panel.notifications.duration,
                    )
                return True
            else:
                pn.state.notifications.error(
                    "Password requirements not satisfied<br>Check again!",
                    duration=self.config.panel.notifications.duration,
                )
        else:
            pn.state.notifications.error(
                "Passwords are different!",
                duration=self.config.panel.notifications.duration,
            )
    else:
        pn.state.notifications.error(
            "Missing user!",
            duration=self.config.panel.notifications.duration,
        )

    return False

generate_password staticmethod

generate_password(
    alphabet: str | None = None,
    special_chars: str | None = "",
    length: int = 12,
) -> str

Generate a random password.

Parameters:

Name Type Description Default
alphabet str | None

list of characters to use as alphabet to generate the password. Defaults to None.

None
special_chars str | None

special characters to include inside the password string. Defaults to "".

''
length int

length of the random password. Defaults to 12.

12

Returns:

Type Description
str

random password.

Source code in dlunch/auth.py
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
@staticmethod
def generate_password(
    alphabet: str | None = None,
    special_chars: str | None = "",
    length: int = 12,
) -> str:
    """Generate a random password.

    Args:
        alphabet (str | None, optional): list of characters to use as alphabet to generate the password.
            Defaults to None.
        special_chars (str | None, optional): special characters to include inside the password string.
            Defaults to "".
        length (int, optional): length of the random password.
            Defaults to 12.

    Returns:
        str: random password.
    """
    # If alphabet is not avilable use a default one
    if alphabet is None:
        alphabet = string.ascii_letters + string.digits + special_chars
    # Infinite loop for finding a valid password
    while True:
        password = "".join(secrets.choice(alphabet) for i in range(length))
        # Create special chars condition only if special chars is non-empty
        if special_chars:
            special_chars_condition = any(
                c in special_chars for c in password
            )
        else:
            special_chars_condition = True
        if (
            any(c.islower() for c in password)
            and any(c.isupper() for c in password)
            and any(c.isdigit() for c in password)
            and special_chars_condition
        ):
            break
    return password

is_auth_active

is_auth_active() -> bool

Check configuration object and return True if basic authentication or OAuth is active. Return False otherwise.

Returns:

Type Description
bool

True if authentication (basic or OAuth) is active, False otherwise.

Source code in dlunch/auth.py
425
426
427
428
429
430
431
432
433
434
435
436
437
def is_auth_active(self) -> bool:
    """Check configuration object and return `True` if basic authentication or OAuth is active.
    Return `False` otherwise.

    Returns:
        bool: `True` if authentication (basic or OAuth) is active, `False` otherwise.
    """
    # Check if a valid oauth key exists
    auth_provider = self.is_basic_auth_active()
    oauth_provider = (
        self.config.server.get("oauth_provider", None) is not None
    )
    return auth_provider or oauth_provider

is_basic_auth_active

is_basic_auth_active() -> bool

Check configuration object and return True if basic authentication is active. Return False otherwise.

Returns:

Type Description
bool

True if basic authentication is active, False otherwise.

Source code in dlunch/auth.py
414
415
416
417
418
419
420
421
422
423
def is_basic_auth_active(self) -> bool:
    """Check configuration object and return `True` if basic authentication is active.
    Return `False` otherwise.

    Returns:
        bool: `True` if basic authentication is active, `False` otherwise.
    """
    # Check if a valid basic_auth key exists
    auth_provider = self.config.get("basic_auth", None)
    return auth_provider is not None

list_privileged_users

list_privileged_users() -> list[str]

List only privileged users (from privileged_users table).

Returns:

Type Description
list[str]

list of usernames.

Source code in dlunch/auth.py
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
def list_privileged_users(self) -> list[str]:
    """List only privileged users (from `privileged_users` table).

    Returns:
        list[str]: list of usernames.
    """
    session = self.database_connector.create_session()

    with session:
        privileged_users = session.scalars(
            select(models.PrivilegedUsers)
        ).all()

    # Return users
    users_list = [u.user for u in privileged_users]
    users_list.sort()

    return users_list

list_users_guests_and_privileges

list_users_guests_and_privileges() -> DataFrame

Join privileged_users and credentials tables to list normal users, admins and guests.

Returns a dataframe.

Returns:

Type Description
DataFrame

dataframe with users and privileges.

Source code in dlunch/auth.py
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
def list_users_guests_and_privileges(self) -> pd.DataFrame:
    """Join `privileged_users` and `credentials` tables to list normal users,
    admins and guests.

    Returns a dataframe.

    Returns:
        pd.DataFrame: dataframe with users and privileges.
    """

    # Query tables required to understand users and guests
    df_privileged_users = models.PrivilegedUsers.read_as_df(
        config=self.config,
        index_col="user",
    )
    # Leave credentials table empty if basic auth is not active
    if self.is_basic_auth_active():
        df_credentials = models.Credentials.read_as_df(
            config=self.config,
            index_col="user",
        )
    else:
        df_credentials = pd.DataFrame()

    # Change admin column to privileges (used after join)
    df_privileged_users["group"] = df_privileged_users.admin.map(
        {True: "admin", False: "user"}
    )
    df_user_guests_privileges = df_privileged_users.join(
        df_credentials, how="outer"
    )[["group"]]
    df_user_guests_privileges = df_user_guests_privileges.fillna("guest")

    return df_user_guests_privileges

set_app_auth_and_encryption

set_app_auth_and_encryption() -> None

Setup Panel authorization and encryption.

Namely
  • Encryption key
  • Cookie expiry date

Raises:

Type Description
ImportError

missing library (cryptography).

Source code in dlunch/auth.py
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
def set_app_auth_and_encryption(self) -> None:
    """Setup Panel authorization and encryption.

    Namely:
        - Encryption key
        - Cookie expiry date

    Raises:
        ImportError: missing library (cryptography).
    """
    # Encryption key
    try:
        if self.config.auth.oauth_encryption_key:
            pn.config.oauth_encryption_key = (
                self.config.auth.oauth_encryption_key.encode("ascii")
            )
            pn.state.encryption = Fernet(pn.config.oauth_encryption_key)
    except ConfigAttributeError:
        log.warning(
            "missing authentication encryption key, generate a key with the `panel oauth-secret` CLI command and then provide it to hydra using the DATA_LUNCH_OAUTH_ENC_KEY environment variable"
        )
    # Cookie expiry date
    try:
        if self.config.auth.oauth_expiry:
            pn.config.oauth_expiry = self.config.auth.oauth_expiry
    except ConfigAttributeError:
        log.warning(
            "missing explicit authentication expiry date for cookies, defaults to 1 day"
        )

set_guest_user_password

set_guest_user_password() -> str

If guest user is active return a password, otherwise return an empty string.

This function always returns an empty string if basic authentication is not active.

Guest user and basic authentication are handled through configuration files.

If the flag reset_guest_user_password is set to True the password is created and uploaded to database. Otherwise the existing password is queried from database credentials table.

Returns:

Type Description
str

guest user password or empty string if basic authentication is not active.

Source code in dlunch/auth.py
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
def set_guest_user_password(self) -> str:
    """If guest user is active return a password, otherwise return an empty string.

    This function always returns an empty string if basic authentication is not active.

    Guest user and basic authentication are handled through configuration files.

    If the flag `reset_guest_user_password` is set to `True` the password is created
    and uploaded to database. Otherwise the existing password is queried from database
    `credentials` table.

    Returns:
        str: guest user password or empty string if basic authentication is not active.
    """
    # Check if basic auth is active
    if self.is_basic_auth_active():
        # If active basic_auth.guest_user is true if guest user is active
        is_guest_user_active = self.config.basic_auth.guest_user
        log.debug(f"guest user flag is {is_guest_user_active}")
    else:
        # Otherwise the guest user feature is not applicable
        is_guest_user_active = False
        log.debug("guest user not applicable")

    # Set the guest password variable
    if is_guest_user_active:
        # If flag for resetting the password does not exist use the default
        # value
        if (
            self.database_connector.get_flag(
                id="reset_guest_user_password"
            )
            is None
        ):
            self.database_connector.set_flag(
                id="reset_guest_user_password",
                value=self.config.basic_auth.default_reset_guest_user_password_flag,
            )
        # Generate a random password only if requested (check on flag)
        # otherwise load from database
        if self.database_connector.get_flag(
            id="reset_guest_user_password"
        ):
            # Turn off reset user password (in order to reset it only once)
            # This statement also acquire a lock on database (so it is
            # called first)
            self.database_connector.set_flag(
                id="reset_guest_user_password",
                value=False,
            )
            # Create password
            guest_password = self.generate_password(
                special_chars=self.config.basic_auth.psw_special_chars,
                length=self.config.basic_auth.generated_psw_length,
            )
            # Add hashed password to database
            AuthUser(
                config=self.config, auth_context=self, name="guest"
            ).add_user_hashed_password(guest_password)
        else:
            # Load from database
            session = self.database_connector.create_session()
            with session:
                try:
                    guest_password = session.get(
                        models.Credentials, "guest"
                    ).password_encrypted.decrypt()
                except InvalidToken:
                    # Notify exception and suggest to reset guest user password
                    guest_password = ""
                    log.warning(
                        "Unable to decrypt 'guest' user password because an invalid token has been detected: reset password from backend"
                    )
                    pn.state.notifications.warning(
                        "Unable to decrypt 'guest' user password<br>Invalid token detected: reset password from backend",
                        duration=self.config.panel.notifications.duration,
                    )
    else:
        guest_password = ""

    return guest_password

submit_password

submit_password(gi: GraphicInterface) -> bool

Same as backend_submit_password with an additional check on old password.

Parameters:

Name Type Description Default
gi GraphicInterface

graphic interface object (used to interact with Panel widgets).

required

Returns:

Type Description
bool

true if successful, false otherwise.

Source code in dlunch/auth.py
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
def submit_password(self, gi: gui.GraphicInterface) -> bool:
    """Same as backend_submit_password with an additional check on old password.

    Args:
        gi (gui.GraphicInterface): graphic interface object (used to interact with Panel widgets).

    Returns:
        bool: true if successful, false otherwise.
    """
    # Get authenticated user from Panel state
    auth_user = AuthUser(config=self.config, auth_context=self)
    # Get username, updated updated at each key press
    old_password_key_press = gi.password_widget._widgets[
        "old_password"
    ].value_input
    # Check if old password is correct
    if auth_user.password_hash == old_password_key_press:
        # Then run the same checks used for backend
        return self.backend_submit_password(
            gi=gi, user=auth_user.name, logout_on_success=True
        )
    else:
        pn.state.notifications.error(
            "Incorrect old password!",
            duration=self.config.panel.notifications.duration,
        )
    return False

AuthUser

Class to handle user authentication and management.

Parameters:

Name Type Description Default
config DictConfig

Hydra configuration dictionary.

required
name str | None

username. Defaults to None.

None
auth_context AuthContext | None

authentication context. Defaults to None.

None

Methods:

Name Description
__init__
add_privileged_user

Add user id to privileged_users table.

add_user_hashed_password

Add user credentials to credentials table.

get_user_from_panel_state

Return the user from Panel state object.

is_admin

Check if a user is an admin by checking the privileged_users table.

is_guest

Check if a user is a guest by checking if it is listed inside the privileged_users table.

remove_user

Remove user from the database.

Attributes:

Name Type Description
auth_context
config
name
password_hash PasswordHash | None

Query the database to retrieve the hashed password for the user.

Source code in dlunch/auth.py
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
class AuthUser:
    """Class to handle user authentication and management.

    Args:
        config (DictConfig): Hydra configuration dictionary.
        name (str | None, optional): username. Defaults to None.
        auth_context (AuthContext | None, optional): authentication context. Defaults to None.
    """

    def __init__(
        self,
        config: DictConfig,
        name: str | None = None,
        auth_context: AuthContext | None = None,
    ) -> None:
        self.config = config
        self.auth_context = auth_context or AuthContext(config)
        # Take username from Panel state if not provided
        self.name = name or self.get_user_from_panel_state()

    def get_user_from_panel_state(self) -> str:
        """Return the user from Panel state object.

        If `config.auth.remove_email_domain` is `True`, remove the email domain from username.

        Returns:
            str: username.
        """
        user = pn.state.user
        # Check if username is an email
        if user and re.fullmatch(r"[^@]+@[^@]+\.[^@]+", user):
            # Remove domain from username
            if self.config.auth.remove_email_domain:
                user = user.split("@")[0]
        return user

    def is_guest(self, allow_override: bool = True) -> bool:
        """Check if a user is a guest by checking if it is listed inside the `privileged_users` table.

        Args:
            allow_override (bool, optional): override enablement flag. Defaults to True.

        Returns:
            bool: guest flag. `True` if the user is a guest.
        """
        # If authorization is not active always return false (user is not guest)
        if not self.auth_context.is_auth_active():
            return False

        # Load guest override from flag table (if the button is pressed its value
        # is True). If not available use False.
        guest_override = self.auth_context.database_connector.get_flag(
            id=f"{self.name}_guest_override",
            value_if_missing=False,
        )

        # If guest override is active always return true (user act like guest)
        if guest_override and allow_override:
            return True

        # Otherwise check if user is not included in privileged users
        privileged_users = self.auth_context.list_privileged_users()

        return self.name not in privileged_users

    def is_admin(self) -> bool:
        """Check if a user is an admin by checking the `privileged_users` table.

        Returns:
            bool: admin flag. `True` if the user is an admin.
        """
        # If authorization is not active always return false (ther is no admin)
        if not self.auth_context.is_auth_active():
            return False
        session = self.auth_context.database_connector.create_session()
        with session:
            admin_users = session.scalars(
                select(models.PrivilegedUsers).where(
                    models.PrivilegedUsers.admin == sql_true()
                )
            ).all()

        return self.name in [u.user for u in admin_users]

    @property
    def password_hash(self) -> PasswordHash | None:
        """Query the database to retrieve the hashed password for the user.

        Returns:
            PasswordHash | None: returns password object if the user exists, `None` otherwise.
        """
        session = self.auth_context.database_connector.create_session()
        # Get the hashed password if user exists
        with session:
            user_credential = session.get(models.Credentials, self.name)
        return user_credential.password_hash if user_credential else None

    def add_privileged_user(self, is_admin: bool) -> None:
        """Add user id to `privileged_users` table.

        Args:
            is_admin (bool): admin flag.
        """
        session = self.auth_context.database_connector.create_session()
        # New credentials
        new_privileged_user = models.PrivilegedUsers(
            user=self.name, admin=is_admin
        )
        # Update credentials
        # Use an upsert for postgresql, a simple session add otherwise
        models.DatabaseConnector.session_add_with_upsert(
            session=session,
            constraint="privileged_users_pkey",
            new_record=new_privileged_user,
        )
        session.commit()

    def add_user_hashed_password(self, password: str) -> None:
        """Add user credentials to `credentials` table.

        Args:
            password (str): plain password (not hashed).
        """
        session = self.auth_context.database_connector.create_session()
        # New credentials
        # For the user named "guest" add also the encrypted password so that panel
        # can show the decrypted guest password to logged users
        # Can't use is_guest to determine the user that need encription, because
        # only the user named guest is shown in the guest user password widget
        if self.name == "guest":
            new_user_credential = models.Credentials(
                user=self.name,
                password_hash=password,
                password_encrypted=password,
            )
        else:
            new_user_credential = models.Credentials(
                user=self.name, password_hash=password
            )
        # Update credentials
        # Use an upsert for postgresql, a simple session add otherwise
        models.DatabaseConnector.session_add_with_upsert(
            session=session,
            constraint="credentials_pkey",
            new_record=new_user_credential,
        )
        session.commit()

    def remove_user(self) -> dict:
        """Remove user from the database.

        Returns:
            dict: dictionary with `privileged_users_deleted` and `credentials_deleted`
                with deleted rows from each table.
        """
        session = self.auth_context.database_connector.create_session()

        with session:
            # Delete user from privileged_users table
            privileged_users_deleted = session.execute(
                delete(models.PrivilegedUsers).where(
                    models.PrivilegedUsers.user == self.name
                )
            )
            session.commit()

            # Delete user from credentials table
            credentials_deleted = session.execute(
                delete(models.Credentials).where(
                    models.Credentials.user == self.name
                )
            )
            session.commit()

        return {
            "privileged_users_deleted": privileged_users_deleted.rowcount,
            "credentials_deleted": credentials_deleted.rowcount,
        }

auth_context instance-attribute

auth_context = auth_context or AuthContext(config)

config instance-attribute

config = config

name instance-attribute

name = name or get_user_from_panel_state()

password_hash property

password_hash: PasswordHash | None

Query the database to retrieve the hashed password for the user.

Returns:

Type Description
PasswordHash | None

returns password object if the user exists, None otherwise.

__init__

__init__(
    config: DictConfig,
    name: str | None = None,
    auth_context: AuthContext | None = None,
) -> None
Source code in dlunch/auth.py
806
807
808
809
810
811
812
813
814
815
def __init__(
    self,
    config: DictConfig,
    name: str | None = None,
    auth_context: AuthContext | None = None,
) -> None:
    self.config = config
    self.auth_context = auth_context or AuthContext(config)
    # Take username from Panel state if not provided
    self.name = name or self.get_user_from_panel_state()

add_privileged_user

add_privileged_user(is_admin: bool) -> None

Add user id to privileged_users table.

Parameters:

Name Type Description Default
is_admin bool

admin flag.

required
Source code in dlunch/auth.py
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
def add_privileged_user(self, is_admin: bool) -> None:
    """Add user id to `privileged_users` table.

    Args:
        is_admin (bool): admin flag.
    """
    session = self.auth_context.database_connector.create_session()
    # New credentials
    new_privileged_user = models.PrivilegedUsers(
        user=self.name, admin=is_admin
    )
    # Update credentials
    # Use an upsert for postgresql, a simple session add otherwise
    models.DatabaseConnector.session_add_with_upsert(
        session=session,
        constraint="privileged_users_pkey",
        new_record=new_privileged_user,
    )
    session.commit()

add_user_hashed_password

add_user_hashed_password(password: str) -> None

Add user credentials to credentials table.

Parameters:

Name Type Description Default
password str

plain password (not hashed).

required
Source code in dlunch/auth.py
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
def add_user_hashed_password(self, password: str) -> None:
    """Add user credentials to `credentials` table.

    Args:
        password (str): plain password (not hashed).
    """
    session = self.auth_context.database_connector.create_session()
    # New credentials
    # For the user named "guest" add also the encrypted password so that panel
    # can show the decrypted guest password to logged users
    # Can't use is_guest to determine the user that need encription, because
    # only the user named guest is shown in the guest user password widget
    if self.name == "guest":
        new_user_credential = models.Credentials(
            user=self.name,
            password_hash=password,
            password_encrypted=password,
        )
    else:
        new_user_credential = models.Credentials(
            user=self.name, password_hash=password
        )
    # Update credentials
    # Use an upsert for postgresql, a simple session add otherwise
    models.DatabaseConnector.session_add_with_upsert(
        session=session,
        constraint="credentials_pkey",
        new_record=new_user_credential,
    )
    session.commit()

get_user_from_panel_state

get_user_from_panel_state() -> str

Return the user from Panel state object.

If config.auth.remove_email_domain is True, remove the email domain from username.

Returns:

Type Description
str

username.

Source code in dlunch/auth.py
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
def get_user_from_panel_state(self) -> str:
    """Return the user from Panel state object.

    If `config.auth.remove_email_domain` is `True`, remove the email domain from username.

    Returns:
        str: username.
    """
    user = pn.state.user
    # Check if username is an email
    if user and re.fullmatch(r"[^@]+@[^@]+\.[^@]+", user):
        # Remove domain from username
        if self.config.auth.remove_email_domain:
            user = user.split("@")[0]
    return user

is_admin

is_admin() -> bool

Check if a user is an admin by checking the privileged_users table.

Returns:

Type Description
bool

admin flag. True if the user is an admin.

Source code in dlunch/auth.py
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
def is_admin(self) -> bool:
    """Check if a user is an admin by checking the `privileged_users` table.

    Returns:
        bool: admin flag. `True` if the user is an admin.
    """
    # If authorization is not active always return false (ther is no admin)
    if not self.auth_context.is_auth_active():
        return False
    session = self.auth_context.database_connector.create_session()
    with session:
        admin_users = session.scalars(
            select(models.PrivilegedUsers).where(
                models.PrivilegedUsers.admin == sql_true()
            )
        ).all()

    return self.name in [u.user for u in admin_users]

is_guest

is_guest(allow_override: bool = True) -> bool

Check if a user is a guest by checking if it is listed inside the privileged_users table.

Parameters:

Name Type Description Default
allow_override bool

override enablement flag. Defaults to True.

True

Returns:

Type Description
bool

guest flag. True if the user is a guest.

Source code in dlunch/auth.py
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
def is_guest(self, allow_override: bool = True) -> bool:
    """Check if a user is a guest by checking if it is listed inside the `privileged_users` table.

    Args:
        allow_override (bool, optional): override enablement flag. Defaults to True.

    Returns:
        bool: guest flag. `True` if the user is a guest.
    """
    # If authorization is not active always return false (user is not guest)
    if not self.auth_context.is_auth_active():
        return False

    # Load guest override from flag table (if the button is pressed its value
    # is True). If not available use False.
    guest_override = self.auth_context.database_connector.get_flag(
        id=f"{self.name}_guest_override",
        value_if_missing=False,
    )

    # If guest override is active always return true (user act like guest)
    if guest_override and allow_override:
        return True

    # Otherwise check if user is not included in privileged users
    privileged_users = self.auth_context.list_privileged_users()

    return self.name not in privileged_users

remove_user

remove_user() -> dict

Remove user from the database.

Returns:

Type Description
dict

dictionary with privileged_users_deleted and credentials_deleted with deleted rows from each table.

Source code in dlunch/auth.py
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
def remove_user(self) -> dict:
    """Remove user from the database.

    Returns:
        dict: dictionary with `privileged_users_deleted` and `credentials_deleted`
            with deleted rows from each table.
    """
    session = self.auth_context.database_connector.create_session()

    with session:
        # Delete user from privileged_users table
        privileged_users_deleted = session.execute(
            delete(models.PrivilegedUsers).where(
                models.PrivilegedUsers.user == self.name
            )
        )
        session.commit()

        # Delete user from credentials table
        credentials_deleted = session.execute(
            delete(models.Credentials).where(
                models.Credentials.user == self.name
            )
        )
        session.commit()

    return {
        "privileged_users_deleted": privileged_users_deleted.rowcount,
        "credentials_deleted": credentials_deleted.rowcount,
    }

DataLunchLoginHandler

Bases: RequestHandler

Custom Panel login Handler.

This class run the user authentication process for Data-Lunch when basic authentication is selected in configuration options.

It is responsible of rendering the login page, validate the user (and update its password hash if the hashing protocol is superseeded) and set the current user once validated.

Methods:

Name Description
check_permission

Validate user.

get

Render the login template.

post

Validate user and set the current user if valid.

set_current_user

Set secure cookie for the selected user.

Source code in dlunch/auth.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
class DataLunchLoginHandler(RequestHandler):
    """Custom Panel login Handler.

    This class run the user authentication process for Data-Lunch when basic authentication
    is selected in configuration options.

    It is responsible of rendering the login page, validate the user (and update its
    password hash if the hashing protocol is superseeded) and set the current user once validated.
    """

    def get(self) -> None:
        """Render the login template."""
        try:
            errormessage = self.get_argument("error")
        except Exception:
            errormessage = ""
        html = self._login_template.render(errormessage=errormessage)
        self.write(html)

    def check_permission(self, user: str, password: str) -> bool:
        """Validate user.

        Automatically update the password hash if it was generated by an old hashing protocol.

        Args:
            user (str): username.
            password (str): password (not hashed).

        Returns:
            bool: user authentication flag (`True` if authenticated)
        """
        auth_user = AuthUser(config=self.config, name=user)
        password_hash = auth_user.password_hash
        if auth_user.password_hash == password:
            # Check if hash needs update
            valid, new_hash = password_hash.verify_and_update(password)
            if valid and new_hash:
                # Update to new hash
                auth_user.add_user_hashed_password(password)
            # Return the OK value
            return True
        # Return the NOT OK value
        return False

    def post(self) -> None:
        """Validate user and set the current user if valid."""
        username = self.get_argument("username", "")
        password = self.get_argument("password", "")
        auth = self.check_permission(username, password)
        if auth:
            self.set_current_user(username)
            self.redirect("/")
        else:
            error_msg = "?error=" + tornado.escape.url_escape(
                "Login incorrect"
            )
            self.redirect("/login" + error_msg)

    def set_current_user(self, user: str):
        """Set secure cookie for the selected user.

        Args:
            user (str): username.
        """
        if not user:
            self.clear_cookie("user")
            return
        self.set_secure_cookie(
            "user",
            user,
            expires_days=pn.config.oauth_expiry,
            **self.config.auth.cookie_kwargs,
        )
        id_token = base64url_encode(json.dumps({"user": user}))
        if pn.state.encryption:
            id_token = pn.state.encryption.encrypt(id_token.encode("utf-8"))
        self.set_secure_cookie(
            "id_token",
            id_token,
            expires_days=pn.config.oauth_expiry,
            **self.config.auth.cookie_kwargs,
        )

check_permission

check_permission(user: str, password: str) -> bool

Validate user.

Automatically update the password hash if it was generated by an old hashing protocol.

Parameters:

Name Type Description Default
user str

username.

required
password str

password (not hashed).

required

Returns:

Type Description
bool

user authentication flag (True if authenticated)

Source code in dlunch/auth.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def check_permission(self, user: str, password: str) -> bool:
    """Validate user.

    Automatically update the password hash if it was generated by an old hashing protocol.

    Args:
        user (str): username.
        password (str): password (not hashed).

    Returns:
        bool: user authentication flag (`True` if authenticated)
    """
    auth_user = AuthUser(config=self.config, name=user)
    password_hash = auth_user.password_hash
    if auth_user.password_hash == password:
        # Check if hash needs update
        valid, new_hash = password_hash.verify_and_update(password)
        if valid and new_hash:
            # Update to new hash
            auth_user.add_user_hashed_password(password)
        # Return the OK value
        return True
    # Return the NOT OK value
    return False

get

get() -> None

Render the login template.

Source code in dlunch/auth.py
79
80
81
82
83
84
85
86
def get(self) -> None:
    """Render the login template."""
    try:
        errormessage = self.get_argument("error")
    except Exception:
        errormessage = ""
    html = self._login_template.render(errormessage=errormessage)
    self.write(html)

post

post() -> None

Validate user and set the current user if valid.

Source code in dlunch/auth.py
113
114
115
116
117
118
119
120
121
122
123
124
125
def post(self) -> None:
    """Validate user and set the current user if valid."""
    username = self.get_argument("username", "")
    password = self.get_argument("password", "")
    auth = self.check_permission(username, password)
    if auth:
        self.set_current_user(username)
        self.redirect("/")
    else:
        error_msg = "?error=" + tornado.escape.url_escape(
            "Login incorrect"
        )
        self.redirect("/login" + error_msg)

set_current_user

set_current_user(user: str)

Set secure cookie for the selected user.

Parameters:

Name Type Description Default
user str

username.

required
Source code in dlunch/auth.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def set_current_user(self, user: str):
    """Set secure cookie for the selected user.

    Args:
        user (str): username.
    """
    if not user:
        self.clear_cookie("user")
        return
    self.set_secure_cookie(
        "user",
        user,
        expires_days=pn.config.oauth_expiry,
        **self.config.auth.cookie_kwargs,
    )
    id_token = base64url_encode(json.dumps({"user": user}))
    if pn.state.encryption:
        id_token = pn.state.encryption.encrypt(id_token.encode("utf-8"))
    self.set_secure_cookie(
        "id_token",
        id_token,
        expires_days=pn.config.oauth_expiry,
        **self.config.auth.cookie_kwargs,
    )

DataLunchProvider

Bases: OAuthProvider

Custom Panel auth provider.

It's a simple login page with a form that interacts with authentication tables.

It is used only if basic authentication is selected in Data-Lunch configuration options.

Parameters:

Name Type Description Default
config DictConfig

Hydra configuration dictionary.

required
login_template str | None

path to login template. Defaults to None.

None
logout_template str | None

path to logout template. Defaults to None.

None

Methods:

Name Description
__init__

Attributes:

Name Type Description
config DictConfig

Hydra configuration dictionary.

login_handler DataLunchLoginHandler

Data-Lunch custom login handler.

login_url str

Login url (/login).

Source code in dlunch/auth.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
class DataLunchProvider(OAuthProvider):
    """Custom Panel auth provider.

    It's a simple login page with a form that interacts with authentication tables.

    It is used only if basic authentication is selected in Data-Lunch configuration options.

    Args:
        config (DictConfig): Hydra configuration dictionary.
        login_template (str | None, optional): path to login template. Defaults to None.
        logout_template (str | None, optional): path to logout template. Defaults to None.
    """

    def __init__(
        self,
        config: DictConfig,
        login_template: str | None = None,
        logout_template: str | None = None,
    ) -> None:
        # Set Hydra config info
        self.config: DictConfig = config
        """Hydra configuration dictionary."""

        super().__init__(
            login_template=login_template, logout_template=logout_template
        )

    @property
    def login_url(self) -> str:
        """Login url (`/login`)."""
        return "/login"

    @property
    def login_handler(self) -> DataLunchLoginHandler:
        """Data-Lunch custom login handler."""
        # Set basic template
        DataLunchLoginHandler._login_template = self._login_template
        # Set Hydra config info
        DataLunchLoginHandler.config = self.config

        return DataLunchLoginHandler

config instance-attribute

config: DictConfig = config

Hydra configuration dictionary.

login_handler property

login_handler: DataLunchLoginHandler

Data-Lunch custom login handler.

login_url property

login_url: str

Login url (/login).

__init__

__init__(
    config: DictConfig,
    login_template: str | None = None,
    logout_template: str | None = None,
) -> None
Source code in dlunch/auth.py
166
167
168
169
170
171
172
173
174
175
176
177
178
def __init__(
    self,
    config: DictConfig,
    login_template: str | None = None,
    logout_template: str | None = None,
) -> None:
    # Set Hydra config info
    self.config: DictConfig = config
    """Hydra configuration dictionary."""

    super().__init__(
        login_template=login_template, logout_template=logout_template
    )

PasswordEncrypt

Class that store the encrypted value of a password.

The encryption is based on Panel encryption system.

The class has methods to encrypt and decrypt a string.

The encrypted password may be passed to instantiate the new object. If the encrypted password is not aviailable use the class method PasswordEncrypt.from_str to create an istance with the string properly encrypted.

Parameters:

Name Type Description Default
encrypted_password str

encrypted password.

required

Methods:

Name Description
__eq__

Decrypt the candidate string and compares it to the stored encrypted value.

__init__
__repr__

Simple object representation.

decrypt

Return decrypted password.

encrypt

Return encrypted password.

from_str

Creates a PasswordEncrypt from the given string.

Attributes:

Name Type Description
encrypted_password str

Encrypted password.

Source code in dlunch/auth.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
class PasswordEncrypt:
    """Class that store the encrypted value of a password.

    The encryption is based on Panel encryption system.

    The class has methods to encrypt and decrypt a string.

    The encrypted password may be passed to instantiate the new object.
    If the encrypted password is not aviailable use the class method
    `PasswordEncrypt.from_str` to create an istance with the string properly
    encrypted.

    Args:
        encrypted_password (str): encrypted password.
    """

    def __init__(self, encrypted_password: str) -> None:
        # Consistency checks
        assert (
            len(encrypted_password) <= 150
        ), "encrypted string should have less than 150 chars."
        # Attributes
        self.encrypted_password: str = encrypted_password
        """Encrypted password."""

    def __eq__(self, candidate: str) -> bool:
        """Decrypt the candidate string and compares it to the stored encrypted value.

        Args:
            candidate (str): candidate string.

        Returns:
            bool: `True` if equal.
        """
        # If string check hash, otherwise return False
        if isinstance(candidate, str):
            # Replace hashed_password if the algorithm changes
            valid = self.decrypt() == candidate
        else:
            valid = False

        return valid

    def __repr__(self) -> str:
        """Simple object representation.

        Returns:
            str: string representation.
        """
        return f"<{type(self).__name__}>"

    @staticmethod
    def encrypt(password: str) -> str:
        """Return encrypted password.

        Args:
            password (str): plain password (not encrypted).

        Returns:
            str: encrypted password.
        """
        if pn.state.encryption:
            encrypted_password = pn.state.encryption.encrypt(
                password.encode("utf-8")
            ).decode("utf-8")
        else:
            encrypted_password = password
        return encrypted_password

    def decrypt(self) -> str:
        """Return decrypted password.

        Returns:
            str: plain password (not encrypted).
        """
        if pn.state.encryption:
            password = pn.state.encryption.decrypt(
                self.encrypted_password.encode("utf-8")
            ).decode("utf-8")
        else:
            password = self.encrypted_password
        return password

    @classmethod
    def from_str(cls, password: str) -> Self:
        """Creates a PasswordEncrypt from the given string.

        Args:
            password (str): plain password (not encrypted).

        Returns:
            PasswordEncrypt: new class instance with encrypted value already stored.
        """
        return cls(cls.encrypt(password))

encrypted_password instance-attribute

encrypted_password: str = encrypted_password

Encrypted password.

__eq__

__eq__(candidate: str) -> bool

Decrypt the candidate string and compares it to the stored encrypted value.

Parameters:

Name Type Description Default
candidate str

candidate string.

required

Returns:

Type Description
bool

True if equal.

Source code in dlunch/auth.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
def __eq__(self, candidate: str) -> bool:
    """Decrypt the candidate string and compares it to the stored encrypted value.

    Args:
        candidate (str): candidate string.

    Returns:
        bool: `True` if equal.
    """
    # If string check hash, otherwise return False
    if isinstance(candidate, str):
        # Replace hashed_password if the algorithm changes
        valid = self.decrypt() == candidate
    else:
        valid = False

    return valid

__init__

__init__(encrypted_password: str) -> None
Source code in dlunch/auth.py
319
320
321
322
323
324
325
326
def __init__(self, encrypted_password: str) -> None:
    # Consistency checks
    assert (
        len(encrypted_password) <= 150
    ), "encrypted string should have less than 150 chars."
    # Attributes
    self.encrypted_password: str = encrypted_password
    """Encrypted password."""

__repr__

__repr__() -> str

Simple object representation.

Returns:

Type Description
str

string representation.

Source code in dlunch/auth.py
346
347
348
349
350
351
352
def __repr__(self) -> str:
    """Simple object representation.

    Returns:
        str: string representation.
    """
    return f"<{type(self).__name__}>"

decrypt

decrypt() -> str

Return decrypted password.

Returns:

Type Description
str

plain password (not encrypted).

Source code in dlunch/auth.py
372
373
374
375
376
377
378
379
380
381
382
383
384
def decrypt(self) -> str:
    """Return decrypted password.

    Returns:
        str: plain password (not encrypted).
    """
    if pn.state.encryption:
        password = pn.state.encryption.decrypt(
            self.encrypted_password.encode("utf-8")
        ).decode("utf-8")
    else:
        password = self.encrypted_password
    return password

encrypt staticmethod

encrypt(password: str) -> str

Return encrypted password.

Parameters:

Name Type Description Default
password str

plain password (not encrypted).

required

Returns:

Type Description
str

encrypted password.

Source code in dlunch/auth.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
@staticmethod
def encrypt(password: str) -> str:
    """Return encrypted password.

    Args:
        password (str): plain password (not encrypted).

    Returns:
        str: encrypted password.
    """
    if pn.state.encryption:
        encrypted_password = pn.state.encryption.encrypt(
            password.encode("utf-8")
        ).decode("utf-8")
    else:
        encrypted_password = password
    return encrypted_password

from_str classmethod

from_str(password: str) -> Self

Creates a PasswordEncrypt from the given string.

Parameters:

Name Type Description Default
password str

plain password (not encrypted).

required

Returns:

Type Description
PasswordEncrypt

new class instance with encrypted value already stored.

Source code in dlunch/auth.py
386
387
388
389
390
391
392
393
394
395
396
@classmethod
def from_str(cls, password: str) -> Self:
    """Creates a PasswordEncrypt from the given string.

    Args:
        password (str): plain password (not encrypted).

    Returns:
        PasswordEncrypt: new class instance with encrypted value already stored.
    """
    return cls(cls.encrypt(password))

PasswordHash

Class that store the hashed value of a password.

The password hash may be passed to instantiate the new object. If the hash is not aviailable use the class method PasswordHash.from_str to create an istance with the string properly hashed.

Parameters:

Name Type Description Default
hashed_password str

password hash.

required

Methods:

Name Description
__eq__

Hashes the candidate string and compares it to the stored hash.

__init__
__repr__

Simple object representation.

from_str

Creates a PasswordHash from the given string.

hash

Return hash of the given password.

verify

Check a password against its hash and return True if check passes,

verify_and_update

Check a password against its hash and return True if check passes,

Attributes:

Name Type Description
hashed_password str

Password hash.

Source code in dlunch/auth.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
class PasswordHash:
    """Class that store the hashed value of a password.

    The password hash may be passed to instantiate the new object.
    If the hash is not aviailable use the class method
    `PasswordHash.from_str` to create an istance with the string properly
    hashed.

    Args:
        hashed_password (str): password hash.
    """

    def __init__(self, hashed_password: str) -> None:
        # Consistency checks
        assert (
            len(hashed_password) <= 150
        ), "hash should have less than 150 chars."
        # Attributes
        self.hashed_password: str = hashed_password
        """Password hash."""

    def __eq__(self, candidate: str) -> bool:
        """Hashes the candidate string and compares it to the stored hash.

        Args:
            candidate (str): candidate string.

        Returns:
            bool: `True` if equal.
        """
        # If string check hash, otherwise return False
        if isinstance(candidate, str):
            # Replace hashed_password if the algorithm changes
            valid = self.verify(candidate)
        else:
            valid = False

        return valid

    def __repr__(self) -> str:
        """Simple object representation.

        Returns:
            str: string representation.
        """
        return f"<{type(self).__name__}>"

    def verify(self, password: str) -> bool:
        """Check a password against its hash and return `True` if check passes,
        `False` otherwise.

        Args:
            password (str): plain password (not hashed).

        Returns:
            bool: `True` if password and hash match.
        """
        valid = pwd_context.verify(saslprep(password), self.hashed_password)

        return valid

    def verify_and_update(self, password: str) -> tuple[bool, str | None]:
        """Check a password against its hash and return `True` if check passes,
        `False` otherwise. Return also a new hash if the original hashing  method
        is superseeded

        Args:
            password (str): plain password (not hashed).

        Returns:
            tuple[bool, str | None]: return a tuple with two elements (valid, new_hash).
                valid: `True` if password and hash match.
                new_hash: new hash to replace the one generated with an old algorithm.
        """
        valid, new_hash = pwd_context.verify_and_update(
            saslprep(password), self.hashed_password
        )
        if valid and new_hash:
            self.hashed_password = new_hash

        return valid, new_hash

    @staticmethod
    def hash(password: str) -> str:
        """Return hash of the given password.

        Args:
            password (str): plain password (not hashed).

        Returns:
            str: hashed password.
        """
        return pwd_context.hash(saslprep(password))

    @classmethod
    def from_str(cls, password: str) -> Self:
        """Creates a PasswordHash from the given string.

        Args:
            password (str): plain password (not hashed).

        Returns:
            PasswordHash: new class instance with hashed value already stored.
        """
        return cls(cls.hash(password))

hashed_password instance-attribute

hashed_password: str = hashed_password

Password hash.

__eq__

__eq__(candidate: str) -> bool

Hashes the candidate string and compares it to the stored hash.

Parameters:

Name Type Description Default
candidate str

candidate string.

required

Returns:

Type Description
bool

True if equal.

Source code in dlunch/auth.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def __eq__(self, candidate: str) -> bool:
    """Hashes the candidate string and compares it to the stored hash.

    Args:
        candidate (str): candidate string.

    Returns:
        bool: `True` if equal.
    """
    # If string check hash, otherwise return False
    if isinstance(candidate, str):
        # Replace hashed_password if the algorithm changes
        valid = self.verify(candidate)
    else:
        valid = False

    return valid

__init__

__init__(hashed_password: str) -> None
Source code in dlunch/auth.py
208
209
210
211
212
213
214
215
def __init__(self, hashed_password: str) -> None:
    # Consistency checks
    assert (
        len(hashed_password) <= 150
    ), "hash should have less than 150 chars."
    # Attributes
    self.hashed_password: str = hashed_password
    """Password hash."""

__repr__

__repr__() -> str

Simple object representation.

Returns:

Type Description
str

string representation.

Source code in dlunch/auth.py
235
236
237
238
239
240
241
def __repr__(self) -> str:
    """Simple object representation.

    Returns:
        str: string representation.
    """
    return f"<{type(self).__name__}>"

from_str classmethod

from_str(password: str) -> Self

Creates a PasswordHash from the given string.

Parameters:

Name Type Description Default
password str

plain password (not hashed).

required

Returns:

Type Description
PasswordHash

new class instance with hashed value already stored.

Source code in dlunch/auth.py
290
291
292
293
294
295
296
297
298
299
300
@classmethod
def from_str(cls, password: str) -> Self:
    """Creates a PasswordHash from the given string.

    Args:
        password (str): plain password (not hashed).

    Returns:
        PasswordHash: new class instance with hashed value already stored.
    """
    return cls(cls.hash(password))

hash staticmethod

hash(password: str) -> str

Return hash of the given password.

Parameters:

Name Type Description Default
password str

plain password (not hashed).

required

Returns:

Type Description
str

hashed password.

Source code in dlunch/auth.py
278
279
280
281
282
283
284
285
286
287
288
@staticmethod
def hash(password: str) -> str:
    """Return hash of the given password.

    Args:
        password (str): plain password (not hashed).

    Returns:
        str: hashed password.
    """
    return pwd_context.hash(saslprep(password))

verify

verify(password: str) -> bool

Check a password against its hash and return True if check passes, False otherwise.

Parameters:

Name Type Description Default
password str

plain password (not hashed).

required

Returns:

Type Description
bool

True if password and hash match.

Source code in dlunch/auth.py
243
244
245
246
247
248
249
250
251
252
253
254
255
def verify(self, password: str) -> bool:
    """Check a password against its hash and return `True` if check passes,
    `False` otherwise.

    Args:
        password (str): plain password (not hashed).

    Returns:
        bool: `True` if password and hash match.
    """
    valid = pwd_context.verify(saslprep(password), self.hashed_password)

    return valid

verify_and_update

verify_and_update(password: str) -> tuple[bool, str | None]

Check a password against its hash and return True if check passes, False otherwise. Return also a new hash if the original hashing method is superseeded

Parameters:

Name Type Description Default
password str

plain password (not hashed).

required

Returns:

Type Description
tuple[bool, str | None]

return a tuple with two elements (valid, new_hash). valid: True if password and hash match. new_hash: new hash to replace the one generated with an old algorithm.

Source code in dlunch/auth.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def verify_and_update(self, password: str) -> tuple[bool, str | None]:
    """Check a password against its hash and return `True` if check passes,
    `False` otherwise. Return also a new hash if the original hashing  method
    is superseeded

    Args:
        password (str): plain password (not hashed).

    Returns:
        tuple[bool, str | None]: return a tuple with two elements (valid, new_hash).
            valid: `True` if password and hash match.
            new_hash: new hash to replace the one generated with an old algorithm.
    """
    valid, new_hash = pwd_context.verify_and_update(
        saslprep(password), self.hashed_password
    )
    if valid and new_hash:
        self.hashed_password = new_hash

    return valid, new_hash