容器云服务运维开发(15)
基于Kubernetes APIs与SDK,使用Python脚本,完成容器云服务运维任务
1.管理service资源(5分)
Kubernetes Python运维脚本开发,使用Restful APIs方式管理service服务。
使用已经部署完成的Kubernetes两节点云平台,在Master节点安装Python 3.7.3的运行环境与依赖库。
使用python request库和Kubernetes Restful APIs,在/root目录下,创建api_manager_service.py文件,要求编写python代码,代码实现以下任务:
①首先查询查询服务service,如果service名称“nginx-svc”已经存在,先删除。
②如果不存在“nginx-svc”,则使用service.yaml文件创建服务。
③创建完成后,查询该服务的信息,查询的body部分以json格式的文件输出到当前目录下的service_api_dev.json文件中。
④然后使用service_update.yaml更新服务端口。
⑤完成更新后,查询该服务的信息,信息通过控制台输出,并通过json格式追加到service_api_dev.json文件后。
# 上传k8s_Python_Packages解压
tar -zxf k8s_Python_Packages.tar.gz
# 解压python包
tar -zxf /root/Python_Packages/python-3.6.8.tar.gz
# 安装python
yum install -y /root/python-3.6.8/packages/*
# 验证是否安装成功
pip3 --version
# 进入Python_Packages
cd Python_Packages
# 安装依赖包
pip3 install pip-21.1.3-py3-none-any.whl
# 使用pip3命令安装开发环境依赖包
pip3 install --no-index --find-links=/root/Python_Packages/ -r /root/Python_Packages/requirements.txt
# 在Kubernetes中创建一个名为admin1的服务账号(ServiceAccount)在kube-system命名空间中
kubectl create serviceaccount admin1 -n kube-system
# 用于创建一个新的集群角色绑定,它将一个服务账号绑定到一个集群角色上
kubectl create clusterrolebinding admin1 --clusterrole=cluster-admin --serviceaccount=kube-system:admin1
# 获取账户token(用于替换api_server_token,做身份验证)
cat<<EOF | kubectl apply -f -
apiVersion: v1
kind: Secret
type: kubernetes.io/service-account-token
metadata:
name: admin1
namespace: kube-system
annotations:
kubernetes.io/service-account.name: "admin1"
EOF
# 查看账户资源
kubectl describe secret admin1 -n kube-system
# 编写service.yaml
vi service.yaml
apiVersion: v1
kind: Service
metadata:
name: nginx-svc3
namespace: default
spec:
selector:
app: nginx
ports:
- port: 80
targetPort: 80
nodePort: 30083
type: NodePort
# 编写api_manager_service
vi api_manager_service.py
import requests,time
import logging
import os,yaml,json
#-----------logger-----------
#get logger
logger = logging.getLogger(__name__)
# level
logger.setLevel(logging.DEBUG)
# format
format = logging.Formatter('%(asctime)s %(message)s')
# to console
stream_handler = logging.StreamHandler()
stream_handler .setFormatter(format)
logger.addHandler(stream_handler )
#-----------logger-----------
def get_api_server_token(api_server_token, node_url):
# Bearer token
bearer_token = "bearer " + api_server_token
return bearer_token
class api_service_manager():
def __init__(self,node_url: str, bearer_token: str):
self.node_url = node_url
self.bearer_token = bearer_token
def create_svc(self, yamlFile, namespace: str):
headers = {
"Authorization": self.bearer_token,
"Content-Type": "application/json"
}
with open(yamlFile, encoding="utf8") as f:
body = json.dumps(yaml.safe_load(f))
request_url = self.node_url + "/api/v1/namespaces/" + namespace + "/services"
result = json.loads(requests.post(request_url, data=body, headers=headers, verify=False).text)
logger.debug(f"return_message:{str(result)}")
return result
def get_svc(self,svc_name:str,namespace:str):
headers = {
"Authorization": self.bearer_token,
"pretty" : "true"
}
request_url = self.node_url + "/api/v1/namespaces/" + namespace + "/services/" + svc_name
result = json.loads(requests.get(request_url, headers=headers, verify=False).text)
logger.debug(f"return_message:{str(result)}")
return result
def update_svc(self,svc_name:str,yamlFile,namespace:str):
headers = {
"Authorization": self.bearer_token,
"Content-Type": "application/strategic-merge-patch+json"
}
with open(yamlFile, encoding="utf8") as f:
body = json.dumps(yaml.safe_load(f))
# '{"spec": {"ports": [{"port": 80, "targetPort": 8089}]}}'
# body = {"spec": {"ports": [{"port": 80, "targetPort": 80}]}}
request_url = self.node_url + "/api/v1/namespaces/" + namespace + "/services/" + svc_name
resp = requests.put(request_url, data=json.dumps(body), headers=headers, verify=False)
result = json.loads(resp.text)
logger.debug(f"return_message:{str(result)}")
return result
def delete_svc(self,svc_name:str,namespace:str):
headers = {
"Authorization": self.bearer_token
}
request_url = self.node_url + "/api/v1/namespaces/" + namespace + "/services/" + svc_name
result = json.loads(requests.delete(request_url, headers=headers, verify=False).text)
logger.debug(f"return_message:{str(result)}")
return result
def get_api_service_manager():
api_server_token = "token"
cluster_server_url = "https://10.26.7.60:6443"
bearer_token = get_api_server_token(api_server_token, cluster_server_url)
svc_m = api_service_manager(cluster_server_url, bearer_token)
return svc_m
if __name__ == "__main__":
api_server_token = "token"
cluster_server_url = "https://10.26.7.60:6443"
bearer_token = get_api_server_token(api_server_token, cluster_server_url)
svc_m = api_service_manager(cluster_server_url,bearer_token)
namespace = "default"
svc_name = "nginx-svc3"
yaml_file = "service.yaml"
#1delete svc
svc_m.delete_svc(svc_name,namespace)
#2create svc
print("crate ----------------")
svc_m.create_svc(yaml_file, namespace)
#3get svc
print("get ----------------")
svc_m.get_svc(svc_name,namespace)
#4delete svc
svc_m.delete_svc(svc_name,namespace)
# 编写service_server
vi service_server.py
import requests
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
PORT = 8888
# request handler
import api_service_manager
svc_m = api_service_manager.get_api_service_manager()
class MyHandler(BaseHTTPRequestHandler):
#Header setting
def _set_headers(self,content_type):
self.send_response(200) # 200 stands for request succeeded
self.send_header("Content-type", content_type) # informs requests of the Media type
self.end_headers()
def do_GET(self):
self._set_headers("application/json")
print(self.path)
# /services/name
if self.path.startswith('/services/'):
name = self.path[10:]
print(name)
result = svc_m.get_svc(name, "default")
# converts dictionary to a JSON string
json_string = json.dumps(result)
self.wfile.write(json_string.encode(encoding='utf_8'))
else:#
json_string = json.dumps({'path': 'home', 'received': 'ok'})
self.wfile.write(json_string.encode(encoding='utf_8'))
def do_POST(self):
print("post")
self._set_headers("application/json")
print(self.path)
if self.path.startswith('/services/'):
filename = self.path[10:]
print("filename---", filename)
result = svc_m.create_svc(filename, "default")
# converts dictionary to a JSON string
print("result---", result)
json_string = json.dumps(result)
self.wfile.write(json_string.encode(encoding='utf_8'))
def do_DELETE(self):
self._set_headers("application/json")
print(self.path)
if self.path.startswith('/services/'):
name = self.path[10:]
print(name)
result = svc_m.delete_svc(name,"default")
def run(server_class=HTTPServer, handler_class=MyHandler, addr="localhost", port=PORT):
server_address = (addr, port)
httpd = server_class(server_address, handler_class)
print(f"Starting httpd server on {addr}:{port}") # f before string allows special formatting
httpd.serve_forever()
#start
if __name__ == "__main__":
print("---------start----------------")
thread = threading.Thread(target=run)
thread.start()
print("---------end----------------")
# 执行py
python3 api_service_manager.py
python3 service_server.py
# 获取服务器信息
curl -X GET 127.0.0.1:8888/services/kubernetes
2.管理Pod服务(5分)
Kubernetes Python运维脚本开发-使用SDK方式,通过Deployment管理Pod服务。
使用已经部署完成的Kubernetes两节点云平台,在Master节点安装Python 3.7.3的运行环境与依赖库。
使用Kubernetes python SDK的“kubernetes”Python库,在/root目录下,创建sdk_manager_deployment.py文件,要求编写python代码,代码实现以下任务:
①首先使用nginx-deployment.yaml文件创建deployment资源。
②创建完成后,查询该服务的信息,查询的body部分通过控制台输出,并以json格式的文件输出到当前目录下的deployment_sdk_dev.json文件中。
# 复制kube配置文件到当前目录
cp /root/.kube/config .
# 导入image.tar
ctr -n k8s.io images import image.tar
# 编写sdk_manager_deployment.py
vi sdk_manager_deployment.py
import yaml
from kubernetes import client, config
class DeploymentManager:
def __init__(self, kubeconfig_path):
config.load_kube_config(config_file=kubeconfig_path)
self.apps_v1 = client.AppsV1Api()
def create_deployment(self, yaml_file):
with open(yaml_file, 'r') as file:
deployment_spec = yaml.safe_load(file)
response = self.apps_v1.create_namespaced_deployment(
body=deployment_spec,
namespace='default'
)
print(f"Deployment {response.metadata.name} created.")
def get_deployment(self, deployment_name):
response = self.apps_v1.read_namespaced_deployment(
name=deployment_name,
namespace='default'
)
print("Deployment info:")
print(json.dumps(response.to_dict(), indent=4))
# Save to file
with open('deployment_sdk_dev.json', 'w') as f:
json.dump(response.to_dict(), f, indent=4)
if __name__ == '__main__':
DeploymentManager(config_file="config").create_deployment(yamlFile="nginx-deployment.yaml")
DeploymentManager(config_file="config").get_deployment()
# 执行脚本
python3 sdk_job_manager.py
3.Kubernetes CRD自定义资源的管理封装(5分)
在前面已建好的Kubernetes开发环境云平台上。Kubernetes容器云平台通过CRD机制进行自定义APIs资源拓展,将chinaskill-cloud-*.yaml共5个文件复制到root目录下。参考chinaskill-cloud-11.yaml文件,编写CRD文件“chinaskill-cloud-crd.yaml”,放在root目录下。
说明:Competition CRD命名要求如下:
Kind为 Competition
Plural为competitions
singular为competition
shortNames为cpt
session含义是赛程
content含义为竞赛内容。
使用已建好的Kubernetes Python运维开发环境,在/root目录下创建crd _manager.py脚本。crd_manager.py编写基于Kubernetes SDK 实现Competition CRD的创建、删除与事件变化监听。
crd_manager.py内部实现3个方法:
①实现方法create_crd(),实现对Competition CRD的创建。
②实现方法delete_crd(),实现对Competition CRD的删除。
③实现方法watch_crd_object(),实现对CRD资源的变化事件监听,将监听到Competition CRD被删除,将event信息输出到控制台,并停止监听。
# 编辑chinaskill-cloud-crd
vi chinaskill-cloud-crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: competitions.chinaskill.cloud
spec:
group: chinaskill.cloud
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
session:
type: string
content:
type: string
status:
type: object
names:
plural: competitions
singular: competition
kind: Competition
shortNames:
- cpt
scope: Namespaced
# 编辑crd_manager
vi crd_manager.py
from kubernetes import client, config, watch
def create_crd(crd_yaml):
# 加载kube-config
config.load_kube_config(config_file=kubeconfig_path)
# 创建API实例
apiextensions_v1 = client.CustomObjectsApi()
# 创建CRD
try:
apiextensions_v1.create_namespaced_custom_object(
group='apiextensions.k8s.io',
version='v1',
namespace='default',
plural='customresourcedefinitions',
body=crd_yaml,
)
print("Competition CRD created successfully")
except client.rest.ApiException as e:
print(f"Exception when calling CustomObjectsApi->create_namespaced_custom_object: {e}")
def delete_crd():
config.load_kube_config()
apiextensions_v1 = client.ApiextensionsV1Api()
try:
apiextensions_v1.delete_custom_resource_definition("competitions.chinaskill.cloud", body=client.V1DeleteOptions())
print("Competition CRD deleted successfully")
except client.rest.ApiException as e:
print(f"Exception when calling ApiextensionsV1Api->delete_custom_resource_definition: {e}")
def watch_crd_object():
config.load_kube_config()
w = watch.Watch()
for event in w.stream(client.CoreV1Api().list_namespaced_event_for_all_namespaces, label_selector="involvedObject.kind=Competition"):
print(f"Event: {event['type']} {event['object'].kind} {event['object'].metadata.name}")
if event['type'] == 'DELETED' and event['object'].kind == 'Competition':
print(f"Competition {event['object'].metadata.name} was deleted")
w.stop()
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python3 crd_manager.py <action> [args]")
print("Actions: create_crd, delete_crd, watch_crd_object")
sys.exit(1)
action = sys.argv[1]
if action == 'create_crd':
create_crd(config_file="config", crd_yaml="chinaskill-cloud-crd.yaml")
elif action == 'delete_crd':
delete_crd()
elif action == 'watch_crd_object':
watch_crd_object()
else:
print(f"Unknown action: {action}")
# 调用
python3 crd_manager.py create_crd
python3 crd_manager.py delete_crd
python3 crd_manager.py watch_crd_object
本文由博客一文多发平台 OpenWrite 发布!
共同學(xué)習(xí),寫(xiě)下你的評(píng)論
評(píng)論加載中...
作者其他優(yōu)質(zhì)文章
100積分直接送
付費(fèi)專(zhuān)欄免費(fèi)學(xué)
大額優(yōu)惠券免費(fèi)領(lǐng)