第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定

云計(jì)算-私有云-私有云運(yùn)維開(kāi)發(fā)

標(biāo)簽:
云計(jì)算

三、私有云运维开发(15)
​ 使用自动化运维工具 Ansible 完成系统的自动化部署与管理。
​ 基于 OpenStack APIs 与SDK,开发私有云运维程序
1.OpenStack Python运维开发:实现镜像管理(7分)
​ 编写Python代码,实现OpenStack镜像增删查改。
​ 在controller节点的/root目录下创建create_image.py文件,编写python代码对接OpenStack API,完成镜像的上传与查询。

​ ①创建镜像:要求在OpenStack私有云平台中上传镜像cirros-0.3.4-x86_64-disk.img,名字为pvm_image,disk_format为qcow2,container_format为bare。

​ ②查询镜像:查询pvm_image的详细信息,并控制台输出。

# 编写Python代码,实现OpenStack镜像增查
vi create_image.py
# encoding:utf-8
import requests,json,time
def get_auth_token(controller_ip,domain,name,password):
    try:
        url = f"http://{controller_ip}:5000/v3/auth/tokens"
        body = {
            "auth": {
                "identity": {
                    "methods": ['password'],
                    "password": {
                        "user": {
                            "domain": {"name": domain},
                            "name": name,
                            "password": password,
                        }
                    }
                },
                "scope": {
                    "project": {
                        "domain": {"name": domain},
                        "name": name
                    }
                }
            }
        }
        headers = {
            "Content-Type": "application/json"
        }
        token = requests.post(url,headers=headers,data=json.dumps(body)).headers['X-Subject-Token']
        headers = {
            "X-Auth-Token": token
        }
        print(f"token值为:{token}")
        return headers
    except Exception as e:
        print(f"token获取失败,{e}")

class image_manager:
    def __init__(self,handers:dict,resUrl):
        self.headers = handers
        self.resUrl = resUrl

    def create_image(self,image_name,disk_format,container_format):
        body = {
            "name": image_name,
            "disk_format": disk_format,
            "container_format": container_format,
        }

        req = requests.post(self.resUrl,headers=self.headers,data=json.dumps(body)).text
        print(f"创建镜像的信息为:{req}")
        return req

    def get_image_id(self,name):
        req = json.loads(requests.get(self.resUrl,headers=self.headers).text)
        for image in req['images']:
            if image['name'] == name:
                return image['id']
        return "NONE"

    def upload_image(self,id,file_path:str):
        url = self.resUrl + "/" + id + "/file"
        self.headers["Content-Type"] = "application/octet-stream"
        req = requests.put(url,headers=self.headers,data=open(file_path,'rb').read())
        if req.status_code == 204:
            print("上传镜像成功",req.status_code)
        else:
            print("上传镜像失败",req.status_code)

        print(f"镜像上传信息:{req}")
        return req

    def get_image(self,id):
        url = self.resUrl + "/" + id
        req = json.loads(requests.get(self.resUrl,headers=self.headers).text)
        print(f"获取到的镜像信息为:{req}")
        return req

    def delete_image(self,id):
        url = self.resUrl + "/" + id
        req = requests.delete(url,headers=self.headers)
        print(f"删除信息:{req}")
        return req

if __name__ == "__main__":
    controller_ip = "10.26.16.133"
    domain = "demo"
    name = "admin"
    password = "000000"
    headers = get_auth_token(controller_ip, domain, name, password)
    image_m = image_manager(headers,f"http://{controller_ip}:9292/v2/images")

    #create
    create_image = image_m.create_image("cirros001","qcow2","bare")

    #get id
    get_id = image_m.get_image_id("cirros001")
    print(f"cirros001镜像ID为:{get_id}")

    #upload
    upload_image = image_m.upload_image(get_id,"cirros-0.3.4-x86_64-disk.img")

    #get image
    get_image = image_m.get_image(get_id)
    with open("image_demo.json","w")as outfile:
        json.dump(get_image,outfile,indent=4)

2.OpenStack用户管理服务接口开发(8分)
​ 使用已建好的OpenStack Python运维开发环境,在/root目录下创建user_manager.py脚本,编写Python代码,端口为5043,IP地址为0.0.0.0,开发出OpenStack用户管理的接口,需要实现的接口如下:

​ ①GET /user/,自行调用查询接口,查询指定名称的用户;返回信息以json格式输出到控制台。

​ ②POST /user/create,自行调用创建接口,创建名为chinaskill的用户,密码为123456,返回信息以json格式输出到控制台。

​ ③DELETE /user/delete/,自行调用删除接口,删除指定名称的用户,若删除成功,返回信息输出到控制台。

# 编写api_user_manager.py
vi api_user_manager.py
# encoding:utf-8
import requests, json, time
import logging
# -----------logger-----------
# get logger
logger = logging.getLogger(__name__)
# level
logger.setLevel(logging.DEBUG)
# format
format = logging.Formatter('%(asctime)s %(message)s')
# to console
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(format)
logger.addHandler(stream_handler)
# -----------logger-----------
def get_auth_token(controller_ip, domain, user, password):
    '''
    :param controller_ip: openstack master ip address
    :param domain: current user's domain
    :param user: user name
    :param password: user password
    :return: keystoen auth Token for current user.
    '''
    try:
        url = "http://controller:5000/v3/auth/tokens"
        body = {
            "auth": {
                "identity": {
                    "methods": [
                        "password"
                    ],
                    "password": {
                        "user": {
                            "domain": {
                                "name": domain
                            },
                            "name": user,
                            "password": password
                        }
                    }
                },
                "scope": {
                    "project": {
                        "domain": {
                            "name": domain
                        },
                        "name": user
                    }
                }
            }
        }
        headers = {
            "Content-Type": "application/json",
        }
        print(body)
        Token = requests.post(url, data=json.dumps(body), headers=headers).headers['X-Subject-Token']

        headers = {
            "X-Auth-Token": Token
        }
        logger.debug(f"获取Token值:{str(Token)}")
        return headers
    except Exception as e:
        logger.error(f"获取Token值失败,请检查访问云主机控制节点IP是否正确?输出错误信息如下:{str(e)}")
        exit(0)
# 用户管理
# https://docs.openstack.org/api-ref/identity/v3/index.html#users
class user_manager:
    def __init__(self, handers: dict, resUrl: str):
        self.headers = handers
        self.resUrl = resUrl
    def create_users(self, user_name, password: str, desc: str):
        """
        create a user with name and password and description.
        """
        body = {
            "user": {
                "name": user_name,
                "password": password,
                "description": desc,
            }
        }
        status_code = requests.post(self.resUrl, data=json.dumps(body), headers=self.headers).text
        logger.debug(f"返回状态:{str(status_code)}")
        return status_code
    def get_users(self):
        """
        get user
        """
        status_code = requests.get(self.resUrl, headers=self.headers).text
        logger.debug(f"返回状态:{str(status_code)}")
        return status_code
    def get_user_id(self, user_name):
        """
        get user id by name.
        """
        result = json.loads(requests.get(self.resUrl, headers=self.headers).text)
        user_name = user_name
        for item in result['users']:
            if item['name'] == user_name:
                return item['id']
        return "NONE"
    def get_user(self, id: str):
        """
        get a flavor by id.
        """
        api_url = self.resUrl + "/" + id
        result = json.loads(requests.get(api_url, headers=self.headers).text)
        logger.debug(f"返回信息:{str(result)}")
        return result
    def delete_user(self, name: str):
        """
         delete a user by id.
         """
        id = self.get_user_id(name)
        api_url = self.resUrl + "/" + id
        response = requests.delete(api_url, headers=self.headers)
        if response.status_code == 204:
            return {"User itemDeletedSuccess": response.status_code}
        result = json.loads(response.text)
        logger.debug(f"返回信息:{str(result)}")
        return result
    def update_User_password(self, id: str, original_password: str, new_password: str):
        """
        update a flavor desc by id.
        """
        self.headers['Content-Type'] = "application/json"
        body = {
            "user": {
                "password": new_password,
                "original_password": original_password
            }
        }
        api_url = self.resUrl + "/" + id + "/password"
        response = requests.post(api_url, data=json.dumps(body), headers=self.headers)
        # Normal response codes: 204 without return text
        if response.status_code == 204:
            return {"item Update Password Success": response.status_code}
        result = json.loads(response.text)
        logger.debug(f"返回信息:{str(result)}")
        return result
if __name__ == '__main__':
    # 1. openstack allinone (controller ) credentials
    # host ip address
    # controller_ip = "10.24.2.22"
    controller_ip = "controller"
    # controller_ip = "10.24.2.22"
    # domain name
    domain = "demo"
    # user name
    user = "admin"
    # user password
    password = "000000"
    headers = get_auth_token(controller_ip, domain, user, password)
    print("headers:", headers)
    # get all user
    user_m = user_manager(headers, "http://controller:5000/v3/users")
    # 1 查询所有
    users = user_m.get_users()
    print("查询所有users:", users)
    
# 编写user_manager.py
vi user_manager.py
#encoding:utf-8
import argparse
import api_user_manager
import json
import csv
import yaml
controller_ip = "controller"
domain = "demo"
user = "admin"
password = "000000"
headers = api_user_manager.get_auth_token(controller_ip, domain, user, password)
print("headers:", headers)
user_m = api_user_manager.user_manager(headers, "http://controller:5000/v3/users")
print("-----------begin-----------------")
def define_args(parser):
    """
    定义程序支持的args
    :return:
    """
    # parser = argparse.ArgumentParser()
    #增加控制命令(postion 位置参数,必须)
    parser.add_argument('command',
                        help='Resource command name',
                        type=str)
    # parser.add_argument('delete',
    #                     help='delete a resource',
    #                     type=str)
    #可选参数(可有可无)
    parser.add_argument('-n', '--name',  # 可选参数,删除的名称
                        help='The Name of the resource',  # 输入-h展示
                        type=str)
    parser.add_argument('-i', '--input',  # 可选参数,删除的名称
                        help='The input json format text ',  # 输入-h展示
                        type=str)
    parser.add_argument('-o', '--output',  # 可选参数,删除的名称
                        help='The output file path ',  # 输入-h展示
                        type=str)
def parse_args(parser):
    args = parser.parse_args()
    if args.command:
        if args.command == "create":
            print("create some thing")
            create_user(args)
        elif args.command == "getall":
            print("getall some thing")
            getall_users(args)
        elif args.command == "get":
            print("get some thing")
            get_user(args)
        elif args.command == "delete":
            print("delete some thing")
            delete_user(args)
        else:
            print("Note support command name!")
def create_user(args):
    print('Provided command value is %r.' % args.command)
    print('Provided input value is %r.' % args.input)
    print('Provided output value is %r.' % args.output)
    output_file = args.output
    # user_name, password: str, desc: str):
    user_dict = json.loads(args.input)
    result = user_m.create_users(user_dict["name"],user_dict["password"],user_dict["description"])
    # 写出json文件
    print("--------write to json---------:", result)
    print(result)
def delete_user(args):
    print('Provided command value is %r.' % args.command)
    print('Provided input value is %r.' % args.input)
    print('Provided output value is %r.' % args.output)
    result = user_m.delete_user(args.name)
    print(result)
def getall_users(args):
    print('Provided command value is %r.' % args.command)
    print('Provided input value is %r.' % args.input)
    print('Provided output value is %r.' % args.output)
    print(type(args.input))
    result = user_m.get_users()
    output_file = args.output
    # 写出json文件
    print("--------result---------")
    print(result)
    configuration = json.loads(result)
    # 写出yaml (dict)
    with open(output_file, 'w') as yaml_file:
        yaml.dump(configuration, yaml_file)
    print(result)
def get_user(args):
    print('Provided command value is %r.' % args.command)
    print('Provided input value is %r.' % args.input)
    print('Provided output value is %r.' % args.output)
    id = user_m.get_user_id(args.name)
    result = user_m.get_user(id)
    output_file = args.output
    # 写出json文件
    with open(output_file, 'w') as jsonfile:
        json.dump(result, jsonfile, indent=4)
    print(result)
if __name__ == '__main__':
    import sys
    print(sys.argv)
    parser = argparse.ArgumentParser()
    define_args(parser)
parse_args(parser)

# 创建1个用户
python3 user_manager.py create --input '{ "name": "user01", "password": "000000", "description": "description" } '

# 查询给定具体名称的用户查询
python3 user_manager.py get --name user01 -o user.json

# 查询目前admin账号下所有的用户
python3 user_manager.py getall -o openstack_all_user.yaml

# 删除指定的名称的用户
python3 user_manager.py delete --name user01

本文由博客一文多发平台 OpenWrite 发布!

點(diǎn)擊查看更多內(nèi)容
TA 點(diǎn)贊

若覺(jué)得本文不錯(cuò),就分享一下吧!

評(píng)論

作者其他優(yōu)質(zhì)文章

正在加載中
  • 推薦
  • 評(píng)論
  • 收藏
  • 共同學(xué)習(xí),寫下你的評(píng)論
感謝您的支持,我會(huì)繼續(xù)努力的~
掃碼打賞,你說(shuō)多少就多少
贊賞金額會(huì)直接到老師賬戶
支付方式
打開(kāi)微信掃一掃,即可進(jìn)行掃碼打賞哦
今天注冊(cè)有機(jī)會(huì)得

100積分直接送

付費(fèi)專欄免費(fèi)學(xué)

大額優(yōu)惠券免費(fèi)領(lǐng)

立即參與 放棄機(jī)會(huì)
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)

舉報(bào)

0/150
提交
取消