Source code for pypersonalassistant.secure

#!/usr/bin/env python3

import argparse
import os
import sys
import logging
import hashlib
import json
import getpass
import base64
import smtplib
import email

from Crypto.Cipher import AES
from twilio.rest import TwilioRestClient

# Helper functions
def confirm_input(prompt, method=input): # Takes string to use as prompt, and method used to get input (default is stdlib input, options include getpass.getpass)
    value_initial = value_check = None
    while (value_initial != value_check) or (value_initial == None):
            value_initial = method(prompt)
            value_check = method("Confirm: ")
            if value_initial != value_check:
                print('\nPlease try again:')
    return value_initial

# AES Encryption Implementation based on PyCrypto, modified from http://stackoverflow.com/questions/12524994/encrypt-decrypt-using-pycrypto-aes-256
class AESCipher(object):
    """
    Create and use a new AESCipher instance based on a secret key
    """
    def __init__(self, key): 
        self.bs = 32
        self.key = hashlib.sha256(key.encode()).digest()

    # Takes string, returns ascii-string
    def encrypt(self, raw):
        logging.debug('Encrypting')
        logging.debug('DECRYP Type: {0}'.format(type(raw)))
        raw = self._pad(raw)
        iv = os.urandom(AES.block_size)
        cipher = AES.new(self.key, AES.MODE_CBC, iv)
        enc = base64.b64encode(iv + cipher.encrypt(raw)).decode('ascii')
        logging.debug('ENCRYP Type: {0}, Value: {1}'.format(type(enc), enc))
        return enc
    
    # Takes string (decodes as ascii), returns string
    def decrypt(self, enc):
        logging.debug('Decrypting')
        logging.debug('ENCRYP Type: {0}, Value: {1}'.format(type(enc), enc))
        enc = base64.b64decode(enc.encode('ascii'))
        iv = enc[:AES.block_size]
        cipher = AES.new(self.key, AES.MODE_CBC, iv)
        dec = self._unpad(cipher.decrypt(enc[AES.block_size:])).decode('utf-8')
        logging.debug('DECRYP Type: {0}'.format(type(dec)))
        return dec

    def _pad(self, s):
        return s + (self.bs - len(s) % self.bs) * chr(self.bs - len(s) % self.bs)

    @staticmethod
    def _unpad(s):
        return s[:-ord(s[len(s)-1:])]

# Class holding useful functions, that all use personal secure encrypted data
[docs]class secure(object): """ This class represents a user's consent to pypersonalassistant using their personal credentials in the future. .. note :: On creating an instance of this object, user input is required. Asks the user for the password used to previously encrypt their private credentials. If it is the first time the module has been used, the password will be used to encrypt credentials entered in future. :param string credentials_path: The file used to retrieve/store encrypted personal credentials. :returns: A :py:class:`pypersonalassistant.secure.secure` object """ _CREDENTIALS_REQUIRED = { 'TWILIO_ACCOUNT_SID', 'TWILIO_AUTH_TOKEN', 'PERSONAL_PHONE_NUMBER', 'TWILIO_PHONE_NUMBER', 'PERSONAL_EMAIL', 'GMAIL_EMAIL', 'GMAIL_EMAIL_PASSWORD' } _MASTER_KEY_HASH_NAME = 'MASTER_KEY_HASH' def __init__(self, credentials_path="credentials.json"): self.credentials_path = credentials_path master_key = getpass.getpass('Password for personal module (hidden): ') # sha512 for password checking self._master_key_hash = hashlib.sha512(master_key.encode()).digest().hex() # sha256 used for ciphering self._cipher = AESCipher(master_key) self._credentials_encrypted = {} self._credentials = {} logging.debug('Password saved, cipher generated') logging.debug('Looking for personal credentials at {0}'.format(self.credentials_path)) # try to _load_credential # if FileNotFoundError error, make new cred with edit_credentials try: self._load_credentials() except FileNotFoundError: print('\nStarting new credentials file: {0}'.format(self.credentials_path)) self.edit_credentials() def _save_credentials(self): logging.debug('Saving {0}'.format(self.credentials_path)) # Encrypt credentials for saving dict_out = {k: self._cipher.encrypt(v) for k, v in self._credentials.items()} # Add password hash dict_out.update({self._MASTER_KEY_HASH_NAME: self._master_key_hash}) with open(self.credentials_path, 'w') as outfile: json.dump(dict_out, outfile) print('Personal credentials saved') def _load_credentials(self): # check if file exist, else pass error up for handling # verify with password hash logging.debug('Loading {0}'.format(self.credentials_path)) credentials = {} with open(self.credentials_path) as data_file: credentials = json.load(data_file) if credentials[self._MASTER_KEY_HASH_NAME] != self._master_key_hash: logging.debug('Password incorrect') sys.exit('Password for personal module incorrect. Please try again or delete the current credentials.json file') else: logging.debug('Password correct') logging.debug('Loading credentials.json') # Remove password hash, store & decrypt and store other credentials credentials.pop(self._MASTER_KEY_HASH_NAME) self._credentials_encrypted = {k: v for k, v in credentials.items()} self._credentials = {k: self._cipher.decrypt(v) for k, v in credentials.items()} print('Personal credentials loaded')
[docs] def edit_credentials(self, credential=None, already_set=True): """ Edit the credentials used by the :py:class:`~pypersonalassistant.secure.secure` class. .. note :: User input is required. By default, unset credentials will be displayed. :param string credential: A specific credential to edit. :param bool already_set: If `False` will show all credentials, whether set or unset. """ # get user input of credentials # save and reload credentials.json credentials_required = self._CREDENTIALS_REQUIRED if credential: credentials_required = [credential] for cred in credentials_required: # If credential is already set, decide whether to continue if (cred in self._credentials) and ((already_set is False) and (credential is None)): continue else: current_encrypted_value = '' try: current_encrypted_value = self._credentials_encrypted[cred] except KeyError: None prompt = '\nCredential: {0}\nCurrent (encrypted) value: {1}\nEnter new value, or press enter to skip: '.format(cred, current_encrypted_value) input_value = confirm_input(prompt) if input_value == '': logging.info('{0} skipped'.format(cred)) else: self._credentials.update({cred: input_value}) logging.info('{0} updated: {1}'.format(cred, input_value)) self._save_credentials() self._load_credentials()
[docs] def credential(self, key): """ Returns the decrypted (plain-text) value of the credential specified. :param string key: The credential name. :returns: The credential value as a string. """ return self._credentials[key] ### SMS TEXTING
[docs] def twilio_SMS(self, from_, to, body): """ Send an SMS message from your `twilio`_ account. .. _twilio: https://www.twilio.com/ Login will be performed using stored credentials. * *stored credential name: TWILIO_ACCOUNT_SID* * *stored credential name: TWILIO_AUTH_TOKEN* :param string from_: The phone number in your twilio account to send the SMS message from. Full international format. :param string to: The phone number to send the SMS message to. Full international format. :param string body: The content of the SMS message. """ logging.debug('Texting from Twilio') client = TwilioRestClient(self._credentials['TWILIO_ACCOUNT_SID'], self._credentials['TWILIO_AUTH_TOKEN']) response = client.messages.create( to=to, from_=from_, body=body, ) logging.debug('Response from Twilio: {0}'.format(response)) return response
[docs] def SMS(self, to, body): """ Quickly send an SMS from a default number. Calls :py:meth:`twilio_SMS`. * *stored credential name: TWILIO_PHONE_NUMBER* :param string to: The phone number to send the SMS message to. Full international format. :param string body: The content of the SMS message. """ logging.debug('Texting someone') return self.twilio_SMS(self._credentials['TWILIO_PHONE_NUMBER'], to, body)
[docs] def SMS_me(self, body): """ Quickly send an SMS to yourself. Calls :py:meth:`SMS`. * *stored credential name: PERSONAL_PHONE_NUMBER* :param string body: The content of the SMS message. """ logging.debug('Texting myself') return self.text(self._credentials['PERSONAL_PHONE_NUMBER'], body) ### EMAILING # All email functions expect email.message.message; msg['From'] and msg['To'] are ignored
[docs] def gmail_email(self, from_, to, msg): """ Send an email from your `gmail`_ account. .. _gmail: https://mail.google.com/ msg can either be: * A string, in which case: * At the first newline (\\n) the string will be split into subject and body * If no newline is present, the entire string will be body. * An `email.message.Message`_ object .. _email.message.Message: https://docs.python.org/3/library/email.message.html Login will be performed using stored credentials. * *stored credential name: GMAIL_EMAIL* * *stored credential name: GMAIL_EMAIL_PASSWORD* :param string from_: The phone number in your twilio account to send the SMS message from. Full international format. :param string to: The email address to send the email to. :param body: The content of the email. See above. """ logging.debug('Emailing from Gmail') smtpConn = smtplib.SMTP('smtp.gmail.com', 587) smtpConn.ehlo() smtpConn.starttls() login_response = smtpConn.login(self._credentials['GMAIL_EMAIL'], self._credentials['GMAIL_EMAIL_PASSWORD']) # if email is of type email.message.Message, flatten and send # if anything else, convert to string and try and send if isinstance(msg, email.message.Message): logging.debug('Flattening MIME to string') # If From is already set, overwrite msg['From'] = from_ # If To is string, convert to list and add each to header if isinstance(to, str): to = [to] for x in to: msg['To'] = x msg = msg.as_string() else: msg = str(msg) logging.debug(msg.replace('\n', ' ')) response = smtpConn.sendmail(from_, to, msg) logging.info('Response from Gmail: {0}'.format(response)) smtpConn.quit() return response
[docs] def email(self, to, msg): """ Quickly send an email from a default address. Calls :py:meth:`gmail_email`. * *stored credential name: GMAIL_EMAIL* :param string to: The email address to send the email to. :param msg: The content of the email. See :py:meth:`gmail_email`. """ logging.debug('Emailing someone') return self.gmail_email(self._credentials['GMAIL_EMAIL'], to, msg)
[docs] def email_me(self, msg): """ Quickly send an email to yourself. Calls :py:meth:`email`. * *stored credential name: PERSONAL_EMAIL* :param msg: The content of the email. See :py:meth:`gmail_email`. """ logging.debug('Emailing myself') return self.email(self._credentials['PERSONAL_EMAIL'], msg) # If called directly, script allows editing of the credentials file # Setup will be performed automatically on first use of the personal module if not already done #? this currently results in setupx2 on first run, todo fix
def main(): # Command line argument parsing parser = argparse.ArgumentParser(description='Editing of credentials.json for personal.py. By default shows all unset credentials') group = parser.add_mutually_exclusive_group() group.add_argument('-c', '--credential', help='update a specific credential') group.add_argument('-a', '--already_set', action='store_true', help='cycle through all credentials, even if already set') args = parser.parse_args() # Initialise a personal instance and edit credentials ppa_sec = secure() ppa_sec.edit_credentials(credential=args.credential, already_set=args.already_set) if __name__ == '__main__': sys.exit(main())