mirror of https://github.com/CIRCL/AIL-framework
				
				
				
			
		
			
				
	
	
		
			160 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
			
		
		
	
	
			160 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
| #!/usr/bin/env python3
 | |
| # -*-coding:UTF-8 -*
 | |
| 
 | |
| import os
 | |
| import subprocess
 | |
| import sys
 | |
| import re
 | |
| 
 | |
| all_screen_name = set()
 | |
| 
 | |
| def is_screen_install():
 | |
|     cmd = ['screen', '-v']
 | |
|     p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 | |
|     if p.stdout:
 | |
|         if p.stdout[:14] == b'Screen version':
 | |
|             return True
 | |
|     print(p.stderr)
 | |
|     return False
 | |
| 
 | |
| def exist_screen(screen_name, with_sudoer=False):
 | |
|     if with_sudoer:
 | |
|         cmd_1 = ['sudo', 'screen', '-ls']
 | |
|     else:
 | |
|         cmd_1 = ['screen', '-ls']
 | |
|     cmd_2 = ['egrep', '[0-9]+.{}'.format(screen_name)]
 | |
|     p1 = subprocess.Popen(cmd_1, stdout=subprocess.PIPE)
 | |
|     p2 = subprocess.Popen(cmd_2, stdin=p1.stdout, stdout=subprocess.PIPE)
 | |
|     p1.stdout.close()  # Allow p1 to receive a SIGPIPE if p2 exits.
 | |
|     output = p2.communicate()[0]
 | |
|     if output:
 | |
|         return True
 | |
|     return False
 | |
| 
 | |
| def get_screen_pid(screen_name,  with_sudoer=False):
 | |
|     if with_sudoer:
 | |
|         cmd_1 = ['sudo', 'screen', '-ls']
 | |
|     else:
 | |
|         cmd_1 = ['screen', '-ls']
 | |
|     cmd_2 = ['egrep', '[0-9]+.{}'.format(screen_name)]
 | |
|     p1 = subprocess.Popen(cmd_1, stdout=subprocess.PIPE)
 | |
|     p2 = subprocess.Popen(cmd_2, stdin=p1.stdout, stdout=subprocess.PIPE)
 | |
|     p1.stdout.close()  # Allow p1 to receive a SIGPIPE if p2 exits.
 | |
|     output = p2.communicate()[0]
 | |
|     if output:
 | |
|         # extract pids with screen name
 | |
|         regex_pid_screen_name = b'[0-9]+.' + screen_name.encode()
 | |
|         pids = re.findall(regex_pid_screen_name, output)
 | |
|         # extract pids
 | |
|         all_pids = []
 | |
|         for pid_name in pids:
 | |
|             pid = pid_name.split(b'.')[0].decode()
 | |
|             all_pids.append(pid)
 | |
|         return all_pids
 | |
|     return []
 | |
| 
 | |
| def detach_screen(screen_name):
 | |
|     cmd = ['screen', '-d', screen_name]
 | |
|     p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 | |
|     #if p.stdout:
 | |
|     #    print(p.stdout)
 | |
|     if p.stderr:
 | |
|         print(p.stderr)
 | |
| 
 | |
| def create_screen(screen_name):
 | |
|     if not exist_screen(screen_name):
 | |
|         cmd = ['screen', '-dmS', screen_name]
 | |
|         p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 | |
|         if not p.stderr:
 | |
|             all_screen_name.add(screen_name)
 | |
|             return True
 | |
|         else:
 | |
|             print(p.stderr)
 | |
|     return False
 | |
| 
 | |
| def kill_screen(screen_name, with_sudoer=False):
 | |
|     if get_screen_pid(screen_name, with_sudoer=with_sudoer):
 | |
|         for pid in get_screen_pid(screen_name,  with_sudoer=with_sudoer):
 | |
|             cmd = ['kill', pid]
 | |
|             p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 | |
|             if p.stderr:
 | |
|                 print(p.stderr)
 | |
|             else:
 | |
|                 print('{} killed'.format(pid))
 | |
|         return True
 | |
|     return False
 | |
| 
 | |
| # # TODO: add check if len(window_name) == 20
 | |
| # use: screen -S 'pid.screen_name' -p %window_id% -Q title
 | |
| # if len(windows_name) > 20 (truncated by default)
 | |
| def get_screen_windows_list(screen_name, r_set=True):
 | |
|     # detach screen to avoid incomplete result
 | |
|     detach_screen(screen_name)
 | |
|     if r_set:
 | |
|         all_windows_name = set()
 | |
|     else:
 | |
|         all_windows_name = []
 | |
|     cmd = ['screen', '-S', screen_name, '-Q', 'windows']
 | |
|     p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 | |
|     if p.stdout:
 | |
|         for window_row in p.stdout.split(b'  '):
 | |
|             window_id, window_name = window_row.decode().split()
 | |
|             #print(window_id)
 | |
|             #print(window_name)
 | |
|             #print('---')
 | |
|             if r_set:
 | |
|                 all_windows_name.add(window_name)
 | |
|             else:
 | |
|                 all_windows_name.append(window_name)
 | |
|     if p.stderr:
 | |
|         print(p.stderr)
 | |
|     return all_windows_name
 | |
| 
 | |
| def get_screen_windows_id(screen_name):
 | |
|     # detach screen to avoid incomplete result
 | |
|     detach_screen(screen_name)
 | |
|     all_windows_id = {}
 | |
|     cmd = ['screen', '-S', screen_name, '-Q', 'windows']
 | |
|     p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 | |
|     if p.stdout:
 | |
|         for window_row in p.stdout.split(b'  '):
 | |
|             window_id, window_name = window_row.decode().split()
 | |
|             if window_name not in all_windows_id:
 | |
|                 all_windows_id[window_name] = []
 | |
|             all_windows_id[window_name].append(window_id)
 | |
|     if p.stderr:
 | |
|         print(p.stderr)
 | |
|     return all_windows_id
 | |
| 
 | |
| # script_location ${AIL_BIN}
 | |
| def launch_windows_script(screen_name, window_name, dir_project, script_location, script_name, script_options=''):
 | |
|     venv = os.path.join(dir_project, 'AILENV/bin/python')
 | |
|     cmd = ['screen', '-S', screen_name, '-X', 'screen', '-t', window_name, 'bash', '-c', 'cd {}; ./{} {}; read x'.format(script_location,  script_name, script_options)]
 | |
|     print(cmd)
 | |
|     p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 | |
|     print(p.stdout)
 | |
|     print(p.stderr)
 | |
| 
 | |
| def launch_uniq_windows_script(screen_name, window_name, dir_project, script_location, script_name, script_options='', kill_previous_windows=False):
 | |
|     all_screen_name = get_screen_windows_id(screen_name)
 | |
|     if window_name in all_screen_name:
 | |
|         if kill_previous_windows:
 | |
|             kill_screen_window(screen_name, all_screen_name[window_name][0], force=True)
 | |
|         else:
 | |
|             print('Error: screen {} already contain a windows with this name {}'.format(screen_name, window_name))
 | |
|             return None
 | |
|     launch_windows_script(screen_name, window_name, dir_project, script_location, script_name, script_options=script_options)
 | |
| 
 | |
| def kill_screen_window(screen_name, window_id, force=False):
 | |
|     if force:# kill
 | |
|         cmd = ['screen', '-S', screen_name, '-p', window_id, '-X', 'kill']
 | |
|     else:# send ctr-C
 | |
|         cmd = ['screen', '-S', screen_name, '-p', window_id, '-X', 'stuff', "$'\003'"]
 | |
|     p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 | |
|     print(p.stdout)
 | |
|     print(p.stderr)
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     res = get_screen_windows_list('Script_AIL')
 | |
|     print(res)
 |