三、私有云运维开发(15)
使用自动化运维工具 Ansible 完成系统的自动化部署与管理。
基于 OpenStack APIs 与SDK,开发私有云运维程序
1.OpenStack Python运维开发:实现镜像管理(7分)
编写Python代码,实现OpenStack镜像增删查改。
在controller节点的/root目录下创建create_image.py文件,编写python代码对接OpenStack API,完成镜像的上传与查询。
①创建镜像:要求在OpenStack私有云平台中上传镜像cirros-0.3.4-x86_64-disk.img,名字为pvm_image,disk_format为qcow2,container_format为bare。
②查询镜像:查询pvm_image的详细信息,并控制台输出。
# 编写Python代码,实现OpenStack镜像增查
vi create_image.py
# encoding:utf-8
import requests,json,time
def get_auth_token(controller_ip,domain,name,password):
try:
url = f"http://{controller_ip}:5000/v3/auth/tokens"
body = {
"auth": {
"identity": {
"methods": ['password'],
"password": {
"user": {
"domain": {"name": domain},
"name": name,
"password": password,
}
}
},
"scope": {
"project": {
"domain": {"name": domain},
"name": name
}
}
}
}
headers = {
"Content-Type": "application/json"
}
token = requests.post(url,headers=headers,data=json.dumps(body)).headers['X-Subject-Token']
headers = {
"X-Auth-Token": token
}
print(f"token值为:{token}")
return headers
except Exception as e:
print(f"token获取失败,{e}")
class image_manager:
def __init__(self,handers:dict,resUrl):
self.headers = handers
self.resUrl = resUrl
def create_image(self,image_name,disk_format,container_format):
body = {
"name": image_name,
"disk_format": disk_format,
"container_format": container_format,
}
req = requests.post(self.resUrl,headers=self.headers,data=json.dumps(body)).text
print(f"创建镜像的信息为:{req}")
return req
def get_image_id(self,name):
req = json.loads(requests.get(self.resUrl,headers=self.headers).text)
for image in req['images']:
if image['name'] == name:
return image['id']
return "NONE"
def upload_image(self,id,file_path:str):
url = self.resUrl + "/" + id + "/file"
self.headers["Content-Type"] = "application/octet-stream"
req = requests.put(url,headers=self.headers,data=open(file_path,'rb').read())
if req.status_code == 204:
print("上传镜像成功",req.status_code)
else:
print("上传镜像失败",req.status_code)
print(f"镜像上传信息:{req}")
return req
def get_image(self,id):
url = self.resUrl + "/" + id
req = json.loads(requests.get(self.resUrl,headers=self.headers).text)
print(f"获取到的镜像信息为:{req}")
return req
def delete_image(self,id):
url = self.resUrl + "/" + id
req = requests.delete(url,headers=self.headers)
print(f"删除信息:{req}")
return req
if __name__ == "__main__":
controller_ip = "10.26.16.133"
domain = "demo"
name = "admin"
password = "000000"
headers = get_auth_token(controller_ip, domain, name, password)
image_m = image_manager(headers,f"http://{controller_ip}:9292/v2/images")
#create
create_image = image_m.create_image("cirros001","qcow2","bare")
#get id
get_id = image_m.get_image_id("cirros001")
print(f"cirros001镜像ID为:{get_id}")
#upload
upload_image = image_m.upload_image(get_id,"cirros-0.3.4-x86_64-disk.img")
#get image
get_image = image_m.get_image(get_id)
with open("image_demo.json","w")as outfile:
json.dump(get_image,outfile,indent=4)
2.OpenStack用户管理服务接口开发(8分)
使用已建好的OpenStack Python运维开发环境,在/root目录下创建user_manager.py脚本,编写Python代码,端口为5043,IP地址为0.0.0.0,开发出OpenStack用户管理的接口,需要实现的接口如下:
①GET /user/,自行调用查询接口,查询指定名称的用户;返回信息以json格式输出到控制台。
②POST /user/create,自行调用创建接口,创建名为chinaskill的用户,密码为123456,返回信息以json格式输出到控制台。
③DELETE /user/delete/,自行调用删除接口,删除指定名称的用户,若删除成功,返回信息输出到控制台。
# 编写api_user_manager.py
vi api_user_manager.py
# encoding:utf-8
import requests, json, time
import logging
# -----------logger-----------
# get logger
logger = logging.getLogger(__name__)
# level
logger.setLevel(logging.DEBUG)
# format
format = logging.Formatter('%(asctime)s %(message)s')
# to console
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(format)
logger.addHandler(stream_handler)
# -----------logger-----------
def get_auth_token(controller_ip, domain, user, password):
'''
:param controller_ip: openstack master ip address
:param domain: current user's domain
:param user: user name
:param password: user password
:return: keystoen auth Token for current user.
'''
try:
url = "http://controller:5000/v3/auth/tokens"
body = {
"auth": {
"identity": {
"methods": [
"password"
],
"password": {
"user": {
"domain": {
"name": domain
},
"name": user,
"password": password
}
}
},
"scope": {
"project": {
"domain": {
"name": domain
},
"name": user
}
}
}
}
headers = {
"Content-Type": "application/json",
}
print(body)
Token = requests.post(url, data=json.dumps(body), headers=headers).headers['X-Subject-Token']
headers = {
"X-Auth-Token": Token
}
logger.debug(f"获取Token值:{str(Token)}")
return headers
except Exception as e:
logger.error(f"获取Token值失败,请检查访问云主机控制节点IP是否正确?输出错误信息如下:{str(e)}")
exit(0)
# 用户管理
# https://docs.openstack.org/api-ref/identity/v3/index.html#users
class user_manager:
def __init__(self, handers: dict, resUrl: str):
self.headers = handers
self.resUrl = resUrl
def create_users(self, user_name, password: str, desc: str):
"""
create a user with name and password and description.
"""
body = {
"user": {
"name": user_name,
"password": password,
"description": desc,
}
}
status_code = requests.post(self.resUrl, data=json.dumps(body), headers=self.headers).text
logger.debug(f"返回状态:{str(status_code)}")
return status_code
def get_users(self):
"""
get user
"""
status_code = requests.get(self.resUrl, headers=self.headers).text
logger.debug(f"返回状态:{str(status_code)}")
return status_code
def get_user_id(self, user_name):
"""
get user id by name.
"""
result = json.loads(requests.get(self.resUrl, headers=self.headers).text)
user_name = user_name
for item in result['users']:
if item['name'] == user_name:
return item['id']
return "NONE"
def get_user(self, id: str):
"""
get a flavor by id.
"""
api_url = self.resUrl + "/" + id
result = json.loads(requests.get(api_url, headers=self.headers).text)
logger.debug(f"返回信息:{str(result)}")
return result
def delete_user(self, name: str):
"""
delete a user by id.
"""
id = self.get_user_id(name)
api_url = self.resUrl + "/" + id
response = requests.delete(api_url, headers=self.headers)
if response.status_code == 204:
return {"User itemDeletedSuccess": response.status_code}
result = json.loads(response.text)
logger.debug(f"返回信息:{str(result)}")
return result
def update_User_password(self, id: str, original_password: str, new_password: str):
"""
update a flavor desc by id.
"""
self.headers['Content-Type'] = "application/json"
body = {
"user": {
"password": new_password,
"original_password": original_password
}
}
api_url = self.resUrl + "/" + id + "/password"
response = requests.post(api_url, data=json.dumps(body), headers=self.headers)
# Normal response codes: 204 without return text
if response.status_code == 204:
return {"item Update Password Success": response.status_code}
result = json.loads(response.text)
logger.debug(f"返回信息:{str(result)}")
return result
if __name__ == '__main__':
# 1. openstack allinone (controller ) credentials
# host ip address
# controller_ip = "10.24.2.22"
controller_ip = "controller"
# controller_ip = "10.24.2.22"
# domain name
domain = "demo"
# user name
user = "admin"
# user password
password = "000000"
headers = get_auth_token(controller_ip, domain, user, password)
print("headers:", headers)
# get all user
user_m = user_manager(headers, "http://controller:5000/v3/users")
# 1 查询所有
users = user_m.get_users()
print("查询所有users:", users)
# 编写user_manager.py
vi user_manager.py
#encoding:utf-8
import argparse
import api_user_manager
import json
import csv
import yaml
controller_ip = "controller"
domain = "demo"
user = "admin"
password = "000000"
headers = api_user_manager.get_auth_token(controller_ip, domain, user, password)
print("headers:", headers)
user_m = api_user_manager.user_manager(headers, "http://controller:5000/v3/users")
print("-----------begin-----------------")
def define_args(parser):
"""
定义程序支持的args
:return:
"""
# parser = argparse.ArgumentParser()
#增加控制命令(postion 位置参数,必须)
parser.add_argument('command',
help='Resource command name',
type=str)
# parser.add_argument('delete',
# help='delete a resource',
# type=str)
#可选参数(可有可无)
parser.add_argument('-n', '--name', # 可选参数,删除的名称
help='The Name of the resource', # 输入-h展示
type=str)
parser.add_argument('-i', '--input', # 可选参数,删除的名称
help='The input json format text ', # 输入-h展示
type=str)
parser.add_argument('-o', '--output', # 可选参数,删除的名称
help='The output file path ', # 输入-h展示
type=str)
def parse_args(parser):
args = parser.parse_args()
if args.command:
if args.command == "create":
print("create some thing")
create_user(args)
elif args.command == "getall":
print("getall some thing")
getall_users(args)
elif args.command == "get":
print("get some thing")
get_user(args)
elif args.command == "delete":
print("delete some thing")
delete_user(args)
else:
print("Note support command name!")
def create_user(args):
print('Provided command value is %r.' % args.command)
print('Provided input value is %r.' % args.input)
print('Provided output value is %r.' % args.output)
output_file = args.output
# user_name, password: str, desc: str):
user_dict = json.loads(args.input)
result = user_m.create_users(user_dict["name"],user_dict["password"],user_dict["description"])
# 写出json文件
print("--------write to json---------:", result)
print(result)
def delete_user(args):
print('Provided command value is %r.' % args.command)
print('Provided input value is %r.' % args.input)
print('Provided output value is %r.' % args.output)
result = user_m.delete_user(args.name)
print(result)
def getall_users(args):
print('Provided command value is %r.' % args.command)
print('Provided input value is %r.' % args.input)
print('Provided output value is %r.' % args.output)
print(type(args.input))
result = user_m.get_users()
output_file = args.output
# 写出json文件
print("--------result---------")
print(result)
configuration = json.loads(result)
# 写出yaml (dict)
with open(output_file, 'w') as yaml_file:
yaml.dump(configuration, yaml_file)
print(result)
def get_user(args):
print('Provided command value is %r.' % args.command)
print('Provided input value is %r.' % args.input)
print('Provided output value is %r.' % args.output)
id = user_m.get_user_id(args.name)
result = user_m.get_user(id)
output_file = args.output
# 写出json文件
with open(output_file, 'w') as jsonfile:
json.dump(result, jsonfile, indent=4)
print(result)
if __name__ == '__main__':
import sys
print(sys.argv)
parser = argparse.ArgumentParser()
define_args(parser)
parse_args(parser)
# 创建1个用户
python3 user_manager.py create --input '{ "name": "user01", "password": "000000", "description": "description" } '
# 查询给定具体名称的用户查询
python3 user_manager.py get --name user01 -o user.json
# 查询目前admin账号下所有的用户
python3 user_manager.py getall -o openstack_all_user.yaml
# 删除指定的名称的用户
python3 user_manager.py delete --name user01
本文由博客一文多发平台 OpenWrite 发布!
共同學(xué)習(xí),寫下你的評(píng)論
評(píng)論加載中...
作者其他優(yōu)質(zhì)文章