一、数据库操作
1、创建表、插入数据和一对多查询
#!/usr/bin/env python# -*- coding: utf-8 -*-# Author: wanghuafengfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Indexfrom sqlalchemy.orm import sessionmaker, relationshipfrom sqlalchemy import create_engineengine = create_engine("mysql+pymysql://root:123456@192.168.100.188:3306/s13", max_overflow =5)Base = declarative_base()#单表class Test(Base): __tablename__ = 'test' nid = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(32))#一对多class Group(Base): __tablename__ = 'group' nid = Column(Integer, primary_key=True, autoincrement=True) caption = Column(String(32))class User(Base): __tablename__ = 'user' nid = Column(Integer, primary_key=True, autoincrement=True) username = Column(String(32)) #外键 group_id = Column(Integer, ForeignKey("group.nid")) # 只用于查询 #uuu代表虚拟列:[1 wang, 3 xiaoming, 4 xiaoxiao] #relationship与ForeignKey一般在一起 group = relationship("Group", backref='uuu') #只是对print对象的时候有用 def __repr__(self): #return "" % (self.nid, self.username, self.group_id) temp = "%s - %s - %s" % (self.nid, self.username, self.group_id) return temp#创建表def init_db(): Base.metadata.create_all(engine)def drop_db(): Base.metadata.drop_all(engine)init_db()#创建组Session = sessionmaker(bind=engine)session = Session()session.add(Group(caption = 'dba'))session.add(Group(caption = 'dbd'))session.commit()#只获取用户ret = session.query(User).filter(User.username=='wang').all()print(ret)ret = session.query(User).all()obj = ret[0]print(obj)print(obj.nid)print(obj.username)print(obj.group_id)ret = session.query(User.username).all()print(ret)#左连接isouter=True#同时取两个表session.query(User, Group)sql = session.query(User, Group).join(Group, isouter=True)sql = session.query(User.username, Group.caption).join(Group, isouter=True)print(sql)ret = session.query(User.username, Group.caption).join(Group, isouter=True).all()#select * from user left join group on user.group_id = group.nidprint(ret)#新方式(正向查询):relationship在这个表里并查询该表的数据ret = session.query(User).all()for obj in ret: # obj.group:obj代表user表的每一行数据 # obj.group为group对象 print(obj.nid, obj.username, obj.group_id, obj.group, obj.group.nid, obj.group.caption)#列出组中的所有人# ret = session.query(User.username, Group.caption).join(Group, isouter=True).filter(Group.caption == 'dba').all()# print(ret)#新方式(反向查询):relationship不在这个表里并查询其他表的数据obj = session.query(Group).filter(Group.caption == 'dba').first()print(obj.nid, obj.caption)print(obj.uuu)
2、多对多关联
#!/usr/bin/env python# -*- coding: utf-8 -*-# Author: wanghuafengfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Indexfrom sqlalchemy.orm import sessionmaker, relationshipfrom sqlalchemy import create_engineengine = create_engine("mysql+pymysql://root:123456@192.168.100.188:3306/s13", max_overflow =5)Base = declarative_base()#多对多class Host(Base): __tablename__ = 'host' nid = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(32)) port = Column(String(32)) ip = Column(String(32))class HostUser(Base): __tablename__ = 'host_user' nid = Column(Integer, primary_key=True, autoincrement=True) username = Column(String(32))class HostToHostUser(Base): __tablename__ = 'host_to_hostuser' #增加nid,方便以后删除 nid = Column(Integer, primary_key=True, autoincrement=True) host_id = Column(Integer, ForeignKey('host.nid')) host_user_id = Column(Integer, ForeignKey('host_user.nid'))#创建表def init_db(): Base.metadata.create_all(engine)def drop_db(): Base.metadata.drop_all(engine)init_db()Session = sessionmaker(bind=engine)session = Session()#循环插入数据session.add_all([ Host(hostname='c1', port='22', ip='1.2.1.2'), Host(hostname='c2', port='22', ip='1.2.1.3'), Host(hostname='c3', port='22', ip='1.2.1.1'), Host(hostname='c4', port='22', ip='1.2.1.4'), Host(hostname='c5', port='22', ip='1.2.1.5'),])session.commit()session.add_all([ HostUser(username='root'), HostUser(username='mysql'), HostUser(username='svn'), HostUser(username='git'), HostUser(username='oracle'),])session.commit()session.add_all([ HostToHostUser(host_id='1', host_user_id=1), HostToHostUser(host_id='1', host_user_id=2), HostToHostUser(host_id='1', host_user_id=3), HostToHostUser(host_id='2', host_user_id=4), HostToHostUser(host_id='2', host_user_id=5), HostToHostUser(host_id='2', host_user_id=1), HostToHostUser(host_id='3', host_user_id=1), HostToHostUser(host_id='3', host_user_id=2), HostToHostUser(host_id='3', host_user_id=3), HostToHostUser(host_id='4', host_user_id=4), HostToHostUser(host_id='4', host_user_id=5), HostToHostUser(host_id='4', host_user_id=1), HostToHostUser(host_id='5', host_user_id=4), HostToHostUser(host_id='5', host_user_id=5), HostToHostUser(host_id='5', host_user_id=1),])session.commit()#多对多操作数据#获取主机1中的所有用户host_obj = session.query(Host).filter(Host.hostname == 'c1').first()#host_obj.nid(找到主机id)host_2_host_user = session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id == host_obj.nid).all()print(host_2_host_user)#[(1,), (2,), (3,)]r = zip(*host_2_host_user)#print(list(r)[0])#[1, 2, 3]users = session.query(HostUser.username).filter(HostUser.nid.in_(list(r)[0])).all()print(users)
3、多对多查询第一种方式
#!/usr/bin/env python# -*- coding: utf-8 -*-# Author: wanghuafengfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Indexfrom sqlalchemy.orm import sessionmaker, relationshipfrom sqlalchemy import create_engineengine = create_engine("mysql+pymysql://root:123456@192.168.100.188:3306/s13", max_overflow =5)Base = declarative_base()#多对多class Host(Base): __tablename__ = 'host' nid = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(32)) port = Column(String(32)) ip = Column(String(32))class HostUser(Base): __tablename__ = 'host_user' nid = Column(Integer, primary_key=True, autoincrement=True) username = Column(String(32))class HostToHostUser(Base): __tablename__ = 'host_to_hostuser' #增加nid,方便以后删除 nid = Column(Integer, primary_key=True, autoincrement=True) host_id = Column(Integer, ForeignKey('host.nid')) host_user_id = Column(Integer, ForeignKey('host_user.nid')) host = relationship("Host", backref='h') host_user = relationship("HostUser", backref='u')Session = sessionmaker(bind=engine)session = Session()#获取主机1中的所有用户#主机host_obj = session.query(Host).filter(Host.hostname=='c1').first()#host_to_hostuser表中的对象#print(host_obj.h)for item in host_obj.h: print(item.host_user, item.host_user.nid, item.host_user.username)
4、多对多查询第二种方式
#!/usr/bin/env python# -*- coding: utf-8 -*-# Author: wanghuafengfrom sqlalchemy import create_engine, and_, or_, func, Tablefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, DateTimefrom sqlalchemy.orm import sessionmaker, relationshipengine = create_engine("mysql+pymysql://root:2008bjAY@192.168.145.130:3306/s13", max_overflow =5)Base = declarative_base()#多对多HostToHostUser = Table('host_to_hostuser', Base.metadata, Column('host_id',ForeignKey('host.nid'), primary_key=True), Column('host_user_id', ForeignKey('host_user.nid'), primary_key=True),)class Host(Base): __tablename__ = 'host' nid = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(32)) port = Column(String(32)) ip = Column(String(32)) host_user = relationship('HostUser', secondary=HostToHostUser, backref='h')class HostUser(Base): __tablename__ = 'host_user' nid = Column(Integer, primary_key=True, autoincrement=True) username = Column(String(32))Session = sessionmaker(bind=engine)session = Session()#主机host_obj = session.query(Host).filter(Host.hostname=='c1').first()for item in host_obj.host_user: print(item.username)
5、多对多查询第三种方式
#!/usr/bin/env python# -*- coding: utf-8 -*-# Author: wanghuafengfrom sqlalchemy import create_engine, and_, or_, func, Tablefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, DateTimefrom sqlalchemy.orm import sessionmaker, relationshipengine = create_engine("mysql+pymysql://root:2008bjAY@192.168.145.130:3306/s13", max_overflow =5)Base = declarative_base()#多对多class Host(Base): __tablename__ = 'host' nid = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(32)) port = Column(String(32)) ip = Column(String(32)) host_user = relationship("HostUser", secondary = HostToHostUser.__table__,backref='u')class HostUser(Base): __tablename__ = 'host_user' nid = Column(Integer, primary_key=True, autoincrement=True) username = Column(String(32))class HostToHostUser(Base): __tablename__ = 'host_to_hostuser' #增加nid,方便以后删除 nid = Column(Integer, primary_key=True, autoincrement=True) host_id = Column(Integer, ForeignKey('host.nid')) host_user_id = Column(Integer, ForeignKey('host_user.nid'))Session = sessionmaker(bind=engine)session = Session()#主机host_obj = session.query(Host).filter(Host.hostname=='c1').first()for item in host_obj.host_user: print(item.username)
二、Paramiko
1、使用内置的transport连接
import paramikossh = paramiko.SSHClient()ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())ssh.connect(hostname='192.168.145.130', port=22, username='scm', password='1q2w3e4R')stdin, stdout, stderr = ssh.exec_command('ls')result = stdout.read()print(result)
2、自定义transport
import paramikotransport = paramiko.Transport(('192.168.145.130', 22))transport.connect(username='scm', password='1q2w3e4R')ssh = paramiko.SSHClient()ssh._transport = transportstdin, stdout, stderr = ssh.exec_command('df')result = stdout.read()print(result)transport.close()
3、SFTP
import paramikotransport = paramiko.Transport(('192.168.145.130', 22))transport.connect(username='scm', password='1q2w3e4R')sftp = paramiko.SFTPClient.from_transport(transport)#上传sftp.put('D:\\Study\\cnblog.txt', '/tmp/cn.txt')#下载sftp.get('remote_path', 'local_path')transport.close()
三、堡垒机实现
1、简单实例
注意:需要配置如下信息
vim .bashrc/usr/bin/python3 bridge_server.pylogout
bridge_server.py代码如下:
import paramikoimport sysimport osimport socketimport getpassfrom paramiko.py3compat import u# windows does not have termios...try: import termios import tty has_termios = Trueexcept ImportError: has_termios = Falsedef interactive_shell(chan): if has_termios: posix_shell(chan) else: windows_shell(chan)def posix_shell(chan): import select oldtty = termios.tcgetattr(sys.stdin) try: tty.setraw(sys.stdin.fileno()) tty.setcbreak(sys.stdin.fileno()) chan.settimeout(0.0) log = open('handle.log', 'a+', encoding='utf-8') flag = False temp_list = [] while True: r, w, e = select.select([chan, sys.stdin], [], []) if chan in r: try: x = u(chan.recv(1024)) if len(x) == 0: sys.stdout.write('\r\n*** EOF\r\n') break if flag: if x.startswith('\r\n'): pass else: temp_list.append(x) flag = False sys.stdout.write(x) sys.stdout.flush() except socket.timeout: pass if sys.stdin in r: x = sys.stdin.read(1) import json if len(x) == 0: break if x == '\t': flag = True else: temp_list.append(x) if x == '\r': log.write(''.join(temp_list)) log.flush() temp_list.clear() chan.send(x) finally: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)def windows_shell(chan): import threading sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n") def writeall(sock): while True: data = sock.recv(256) if not data: sys.stdout.write('\r\n*** EOF ***\r\n\r\n') sys.stdout.flush() break sys.stdout.write(data) sys.stdout.flush() writer = threading.Thread(target=writeall, args=(chan,)) writer.start() try: while True: d = sys.stdin.read(1) if not d: break chan.send(d) except EOFError: # user hit ^Z or F6 passdef run(): default_username = getpass.getuser() username = input('Username [%s]: ' % default_username) if len(username) == 0: username = default_username hostname = input('Hostname: ') if len(hostname) == 0: print('*** Hostname required.') sys.exit(1) tran = paramiko.Transport((hostname, 22,)) tran.start_client() default_auth = "p" auth = input('Auth by (p)assword or (r)sa key[%s] ' % default_auth) if len(auth) == 0: auth = default_auth if auth == 'r': default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_rsa') path = input('RSA key [%s]: ' % default_path) if len(path) == 0: path = default_path try: key = paramiko.RSAKey.from_private_key_file(path) except paramiko.PasswordRequiredException: password = getpass.getpass('RSA key password: ') key = paramiko.RSAKey.from_private_key_file(path, password) tran.auth_publickey(username, key) else: pw = getpass.getpass('Password for %s@%s: ' % (username, hostname)) tran.auth_password(username, pw) # 打开一个通道 chan = tran.open_session() # 获取一个终端 chan.get_pty() # 激活器 chan.invoke_shell() interactive_shell(chan) chan.close() tran.close()if __name__ == '__main__': run()