博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Day13 SQLAlchemy连表操作和堡垒机
阅读量:5011 次
发布时间:2019-06-12

本文共 14311 字,大约阅读时间需要 47 分钟。

一、数据库操作

1、创建表、插入数据和一对多查询

#!/usr/bin/env python# -*- coding: utf-8 -*-# Author: wanghuafengfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Indexfrom sqlalchemy.orm import sessionmaker, relationshipfrom sqlalchemy import create_engineengine = create_engine("mysql+pymysql://root:123456@192.168.100.188:3306/s13", max_overflow =5)Base = declarative_base()#单表class Test(Base):    __tablename__ = 'test'    nid = Column(Integer, primary_key=True, autoincrement=True)    name = Column(String(32))#一对多class Group(Base):    __tablename__ = 'group'    nid = Column(Integer, primary_key=True, autoincrement=True)    caption = Column(String(32))class User(Base):    __tablename__ = 'user'    nid = Column(Integer, primary_key=True, autoincrement=True)    username = Column(String(32))    #外键    group_id = Column(Integer, ForeignKey("group.nid"))    # 只用于查询    #uuu代表虚拟列:[1 wang, 3 xiaoming, 4 xiaoxiao]    #relationship与ForeignKey一般在一起    group = relationship("Group", backref='uuu')    #只是对print对象的时候有用    def __repr__(self):        #return "
" % (self.nid, self.username, self.group_id) temp = "%s - %s - %s" % (self.nid, self.username, self.group_id) return temp#创建表def init_db(): Base.metadata.create_all(engine)def drop_db(): Base.metadata.drop_all(engine)init_db()#创建组Session = sessionmaker(bind=engine)session = Session()session.add(Group(caption = 'dba'))session.add(Group(caption = 'dbd'))session.commit()#只获取用户ret = session.query(User).filter(User.username=='wang').all()print(ret)ret = session.query(User).all()obj = ret[0]print(obj)print(obj.nid)print(obj.username)print(obj.group_id)ret = session.query(User.username).all()print(ret)#左连接isouter=True#同时取两个表session.query(User, Group)sql = session.query(User, Group).join(Group, isouter=True)sql = session.query(User.username, Group.caption).join(Group, isouter=True)print(sql)ret = session.query(User.username, Group.caption).join(Group, isouter=True).all()#select * from user left join group on user.group_id = group.nidprint(ret)#新方式(正向查询):relationship在这个表里并查询该表的数据ret = session.query(User).all()for obj in ret: # obj.group:obj代表user表的每一行数据 # obj.group为group对象 print(obj.nid, obj.username, obj.group_id, obj.group, obj.group.nid, obj.group.caption)#列出组中的所有人# ret = session.query(User.username, Group.caption).join(Group, isouter=True).filter(Group.caption == 'dba').all()# print(ret)#新方式(反向查询):relationship不在这个表里并查询其他表的数据obj = session.query(Group).filter(Group.caption == 'dba').first()print(obj.nid, obj.caption)print(obj.uuu)

2、多对多关联

#!/usr/bin/env python# -*- coding: utf-8 -*-# Author: wanghuafengfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Indexfrom sqlalchemy.orm import sessionmaker, relationshipfrom sqlalchemy import create_engineengine = create_engine("mysql+pymysql://root:123456@192.168.100.188:3306/s13", max_overflow =5)Base = declarative_base()#多对多class Host(Base):    __tablename__ = 'host'    nid = Column(Integer, primary_key=True, autoincrement=True)    hostname = Column(String(32))    port = Column(String(32))    ip = Column(String(32))class HostUser(Base):    __tablename__ = 'host_user'    nid = Column(Integer, primary_key=True, autoincrement=True)    username = Column(String(32))class HostToHostUser(Base):    __tablename__ = 'host_to_hostuser'    #增加nid,方便以后删除    nid = Column(Integer, primary_key=True, autoincrement=True)    host_id = Column(Integer, ForeignKey('host.nid'))    host_user_id = Column(Integer, ForeignKey('host_user.nid'))#创建表def init_db():    Base.metadata.create_all(engine)def drop_db():    Base.metadata.drop_all(engine)init_db()Session = sessionmaker(bind=engine)session = Session()#循环插入数据session.add_all([    Host(hostname='c1', port='22', ip='1.2.1.2'),    Host(hostname='c2', port='22', ip='1.2.1.3'),    Host(hostname='c3', port='22', ip='1.2.1.1'),    Host(hostname='c4', port='22', ip='1.2.1.4'),    Host(hostname='c5', port='22', ip='1.2.1.5'),])session.commit()session.add_all([    HostUser(username='root'),    HostUser(username='mysql'),    HostUser(username='svn'),    HostUser(username='git'),    HostUser(username='oracle'),])session.commit()session.add_all([    HostToHostUser(host_id='1', host_user_id=1),    HostToHostUser(host_id='1', host_user_id=2),    HostToHostUser(host_id='1', host_user_id=3),    HostToHostUser(host_id='2', host_user_id=4),    HostToHostUser(host_id='2', host_user_id=5),    HostToHostUser(host_id='2', host_user_id=1),    HostToHostUser(host_id='3', host_user_id=1),    HostToHostUser(host_id='3', host_user_id=2),    HostToHostUser(host_id='3', host_user_id=3),    HostToHostUser(host_id='4', host_user_id=4),    HostToHostUser(host_id='4', host_user_id=5),    HostToHostUser(host_id='4', host_user_id=1),    HostToHostUser(host_id='5', host_user_id=4),    HostToHostUser(host_id='5', host_user_id=5),    HostToHostUser(host_id='5', host_user_id=1),])session.commit()#多对多操作数据#获取主机1中的所有用户host_obj = session.query(Host).filter(Host.hostname == 'c1').first()#host_obj.nid(找到主机id)host_2_host_user = session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id == host_obj.nid).all()print(host_2_host_user)#[(1,), (2,), (3,)]r = zip(*host_2_host_user)#print(list(r)[0])#[1, 2, 3]users = session.query(HostUser.username).filter(HostUser.nid.in_(list(r)[0])).all()print(users)

3、多对多查询第一种方式

#!/usr/bin/env python# -*- coding: utf-8 -*-# Author: wanghuafengfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Indexfrom sqlalchemy.orm import sessionmaker, relationshipfrom sqlalchemy import create_engineengine = create_engine("mysql+pymysql://root:123456@192.168.100.188:3306/s13", max_overflow =5)Base = declarative_base()#多对多class Host(Base):    __tablename__ = 'host'    nid = Column(Integer, primary_key=True, autoincrement=True)    hostname = Column(String(32))    port = Column(String(32))    ip = Column(String(32))class HostUser(Base):    __tablename__ = 'host_user'    nid = Column(Integer, primary_key=True, autoincrement=True)    username = Column(String(32))class HostToHostUser(Base):    __tablename__ = 'host_to_hostuser'    #增加nid,方便以后删除    nid = Column(Integer, primary_key=True, autoincrement=True)    host_id = Column(Integer, ForeignKey('host.nid'))    host_user_id = Column(Integer, ForeignKey('host_user.nid'))    host = relationship("Host", backref='h')    host_user = relationship("HostUser", backref='u')Session = sessionmaker(bind=engine)session = Session()#获取主机1中的所有用户#主机host_obj = session.query(Host).filter(Host.hostname=='c1').first()#host_to_hostuser表中的对象#print(host_obj.h)for item in host_obj.h:    print(item.host_user, item.host_user.nid, item.host_user.username)

4、多对多查询第二种方式

#!/usr/bin/env python# -*- coding: utf-8 -*-# Author: wanghuafengfrom sqlalchemy import create_engine, and_, or_, func, Tablefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, DateTimefrom sqlalchemy.orm import sessionmaker, relationshipengine = create_engine("mysql+pymysql://root:2008bjAY@192.168.145.130:3306/s13", max_overflow =5)Base = declarative_base()#多对多HostToHostUser  = Table('host_to_hostuser', Base.metadata,    Column('host_id',ForeignKey('host.nid'), primary_key=True),    Column('host_user_id', ForeignKey('host_user.nid'), primary_key=True),)class Host(Base):    __tablename__ = 'host'    nid = Column(Integer, primary_key=True, autoincrement=True)    hostname = Column(String(32))    port = Column(String(32))    ip = Column(String(32))    host_user = relationship('HostUser',                             secondary=HostToHostUser,                             backref='h')class HostUser(Base):    __tablename__ = 'host_user'    nid = Column(Integer, primary_key=True, autoincrement=True)    username = Column(String(32))Session = sessionmaker(bind=engine)session = Session()#主机host_obj = session.query(Host).filter(Host.hostname=='c1').first()for item in host_obj.host_user:    print(item.username)

5、多对多查询第三种方式

#!/usr/bin/env python# -*- coding: utf-8 -*-# Author: wanghuafengfrom sqlalchemy import create_engine, and_, or_, func, Tablefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, DateTimefrom sqlalchemy.orm import sessionmaker, relationshipengine = create_engine("mysql+pymysql://root:2008bjAY@192.168.145.130:3306/s13", max_overflow =5)Base = declarative_base()#多对多class Host(Base):    __tablename__ = 'host'    nid = Column(Integer, primary_key=True, autoincrement=True)    hostname = Column(String(32))    port = Column(String(32))    ip = Column(String(32))    host_user = relationship("HostUser", secondary = HostToHostUser.__table__,backref='u')class HostUser(Base):    __tablename__ = 'host_user'    nid = Column(Integer, primary_key=True, autoincrement=True)    username = Column(String(32))class HostToHostUser(Base):    __tablename__ = 'host_to_hostuser'    #增加nid,方便以后删除    nid = Column(Integer, primary_key=True, autoincrement=True)    host_id = Column(Integer, ForeignKey('host.nid'))    host_user_id = Column(Integer, ForeignKey('host_user.nid'))Session = sessionmaker(bind=engine)session = Session()#主机host_obj = session.query(Host).filter(Host.hostname=='c1').first()for item in host_obj.host_user:    print(item.username)

二、Paramiko

1、使用内置的transport连接

import paramikossh = paramiko.SSHClient()ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())ssh.connect(hostname='192.168.145.130', port=22, username='scm', password='1q2w3e4R')stdin, stdout, stderr = ssh.exec_command('ls')result = stdout.read()print(result)

2、自定义transport

import paramikotransport = paramiko.Transport(('192.168.145.130', 22))transport.connect(username='scm', password='1q2w3e4R')ssh = paramiko.SSHClient()ssh._transport = transportstdin, stdout, stderr = ssh.exec_command('df')result = stdout.read()print(result)transport.close()

3、SFTP

import paramikotransport = paramiko.Transport(('192.168.145.130', 22))transport.connect(username='scm', password='1q2w3e4R')sftp = paramiko.SFTPClient.from_transport(transport)#上传sftp.put('D:\\Study\\cnblog.txt', '/tmp/cn.txt')#下载sftp.get('remote_path', 'local_path')transport.close()

三、堡垒机实现

1、简单实例

注意:需要配置如下信息

vim .bashrc/usr/bin/python3 bridge_server.pylogout
bridge_server.py代码如下:
import paramikoimport sysimport osimport socketimport getpassfrom paramiko.py3compat import u# windows does not have termios...try:    import termios    import tty    has_termios = Trueexcept ImportError:    has_termios = Falsedef interactive_shell(chan):    if has_termios:        posix_shell(chan)    else:        windows_shell(chan)def posix_shell(chan):    import select    oldtty = termios.tcgetattr(sys.stdin)    try:        tty.setraw(sys.stdin.fileno())        tty.setcbreak(sys.stdin.fileno())        chan.settimeout(0.0)        log = open('handle.log', 'a+', encoding='utf-8')        flag = False        temp_list = []        while True:            r, w, e = select.select([chan, sys.stdin], [], [])            if chan in r:                try:                    x = u(chan.recv(1024))                    if len(x) == 0:                        sys.stdout.write('\r\n*** EOF\r\n')                        break                    if flag:                        if x.startswith('\r\n'):                            pass                        else:                            temp_list.append(x)                        flag = False                    sys.stdout.write(x)                    sys.stdout.flush()                except socket.timeout:                    pass            if sys.stdin in r:                x = sys.stdin.read(1)                import json                if len(x) == 0:                    break                if x == '\t':                    flag = True                else:                    temp_list.append(x)                if x == '\r':                    log.write(''.join(temp_list))                    log.flush()                    temp_list.clear()                chan.send(x)    finally:        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)def windows_shell(chan):    import threading    sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n")    def writeall(sock):        while True:            data = sock.recv(256)            if not data:                sys.stdout.write('\r\n*** EOF ***\r\n\r\n')                sys.stdout.flush()                break            sys.stdout.write(data)            sys.stdout.flush()    writer = threading.Thread(target=writeall, args=(chan,))    writer.start()    try:        while True:            d = sys.stdin.read(1)            if not d:                break            chan.send(d)    except EOFError:        # user hit ^Z or F6        passdef run():    default_username = getpass.getuser()    username = input('Username [%s]: ' % default_username)    if len(username) == 0:        username = default_username    hostname = input('Hostname: ')    if len(hostname) == 0:        print('*** Hostname required.')        sys.exit(1)    tran = paramiko.Transport((hostname, 22,))    tran.start_client()    default_auth = "p"    auth = input('Auth by (p)assword or (r)sa key[%s] ' % default_auth)    if len(auth) == 0:        auth = default_auth    if auth == 'r':        default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_rsa')        path = input('RSA key [%s]: ' % default_path)        if len(path) == 0:            path = default_path        try:            key = paramiko.RSAKey.from_private_key_file(path)        except paramiko.PasswordRequiredException:            password = getpass.getpass('RSA key password: ')            key = paramiko.RSAKey.from_private_key_file(path, password)        tran.auth_publickey(username, key)    else:        pw = getpass.getpass('Password for %s@%s: ' % (username, hostname))        tran.auth_password(username, pw)    # 打开一个通道    chan = tran.open_session()    # 获取一个终端    chan.get_pty()    # 激活器    chan.invoke_shell()    interactive_shell(chan)    chan.close()    tran.close()if __name__ == '__main__':    run()

  

转载于:https://www.cnblogs.com/icsnow/p/5858717.html

你可能感兴趣的文章
LXC-Linux Containers介绍
查看>>
7.31实习培训日志-docker sql
查看>>
c#中使用servicestackredis操作redis
查看>>
ios app 真机crash报告分析
查看>>
CRC标准以及简记式
查看>>
SEO搜索引擎
查看>>
关于本地使用tomcat部署web应用,浏览器自动跳转为https的问题
查看>>
一、Text To Speech
查看>>
Java读取并下载网络文件
查看>>
github上构建自己的个人网站
查看>>
在word中粘贴的图片为什么显示不完整
查看>>
SQL Server 数据库的鼠标操作
查看>>
net软件工程师求职简历
查看>>
总线置顶[置顶] Linux bus总线
查看>>
nullnullHandling the Results 处理结果
查看>>
SQL SERVER BOOK
查看>>
JS基础回顾,小练习(判断数组,以及函数)
查看>>
多任务——进程
查看>>
WCF:如何将net.tcp协议寄宿到IIS
查看>>
WebAPI HelpPage支持area
查看>>