欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

聚宽数据下载工具

程序员文章站 2022-06-03 15:57:40
...

聚宽是个非常棒的平台,回测用的数据洗的非常干净,没有未来信息的问题。最近听说他们发新产品了,祝贺!

如果经常用聚宽的话,会经常需要把聚宽研究环境中自己分析数据的结果下载下来,我做了个小工具,能批量自动下载聚宽上保存的文件。

用到了selenium模拟浏览器,不会用的可以搜一下。

github:https://github.com/QuantGin/Quant_For_All

from selenium.common.exceptions import * 
import requests
import time
import os
import traceback
class joinquant:
    def __init__(self,d):
        self.d=d

    def login(self,username,password):
        d=self.d
        d.get('https://www.joinquant.com/research')
        d1=d.find_element_by_class_name('phone')
        d1.clear()
        d1.send_keys(username)
        d1=d.find_element_by_class_name('jq-login__password')
        d1.clear()
        d1.send_keys(password)
        self.wait()
        d.find_element_by_class_name('login-submit').click()
        for i in range(60):
            try:
                d.switch_to.default_content()
                d.switch_to.frame("research")
                d.find_element_by_class_name('fa-folder')
                if d.current_url=='https://www.joinquant.com/default/research/index?target=self&url=/default/research/redirect':
                    self._get_cookie()
                    print('Login was successful!')
                    self.wait(3)
                    return 1
            except Exception as e:
                pass
            self.wait()
        return 0
    def wait(self,n=1):
        time.sleep(1*n)
    def _get_cookie(self):
        d=self.d
        cookie =[item["name"] + ":" + item["value"] for item in d.get_cookies()]
        cookiestr = ';'.join(item for item in cookie)
        cook_map = {}
        for item in cookie :
            str = item.split(':')
            cook_map[str[0]] = str[1]
        cookies = requests.utils.cookiejar_from_dict(cook_map, cookiejar=None, overwrite=True)
        self.cookies=cookies
    def _research(self):
        d=self.d
        d.switch_to.default_content()
        d.execute_script("document.getElementById('kk_nav').style.display='none'")
        d.switch_to.frame("research")
    def download(self,url,file_dir):
        file=file_dir+url.split('/')[-1]
        if os.path.exists(file):
            print('Download',file)
            return 1
        else:
            print('Downloading',file)
            self._get_cookie()
            r=requests.get(url,cookies=self.cookies)
            fout=open(file,'wb')
            fout.write(r.content)
            fout.close
            return 1
    #输出当前路径
    @property
    def pwd(self):
        d=self.d
        self._research()
        d1=d.find_element_by_class_name('breadcrumb')
        s=d1.find_elements_by_xpath(".//*")[-2].get_attribute('innerHTML')
        print(s.split('"')[1].split('/tree')[-1])
        return s.split('"')[1].split('/tree')[-1]
    @property
    def ls(self):
        d=self.d
        self._research()
        d1=d.find_element_by_id('notebook_list')
        l=[key.get_attribute('innerHTML') for key in d1.find_elements_by_class_name('item_name')]
        ll=[]
        for key in l:
            if key!='..':
                ll.append(key)
        print(ll[:3],ll[-3:])
        return ll
    def _clear(self):
        d=self.d
        self._research()
        for i in range(3):
            try:
                d.find_element_by_class_name('modal-content').find_element_by_class_name('btn,btn-default,btn-sm,btn-primary').click()
                break
            except:
                pass
    def click(self,name):
        d=self.d
        self._research()
        d1=d.find_element_by_id('notebook_list')
        d1.find_element_by_link_text(name).click()
        print(name,'clicked')
    def home(self):
        d=self.d
        self._research()
        for i in range(3):
            self._research()
            try:
                for ii in range(10):
                    self._research()
                    d.find_element_by_class_name('fa-folder').click()
                    self.wait()
                    if self.pwd=='':
                        print('cd /')
                        return 1
            except:
                d.get('https://www.joinquant.com/research')
                self.wait()
        return 0
    def cd(self,s_dir):
        d=self.d
        for ii in range(3):
            try:
                self.home()
                _dir=''
                if len(s_dir)>0:
                    for s in s_dir.split('/'):
                        if len(s)>0:
                            _dir+='/'+s
                            for i in range(10):
                                try:
                                    self.click(s)
                                except:pass
                                if self.pwd==_dir:
                                    break
                                self.wait()
                    if _dir==self.pwd:
                        print('cd',_dir)
                        return 1
            except:pass
            self.wait(3)
        return 0
    def get_url(self,name):
        d=self.d
        self._research()
        d2=d.find_element_by_id('notebook_list')
        for d1 in d2.find_elements_by_class_name('item_link'):
            _name=d1.find_element_by_class_name('item_name').get_attribute('innerHTML')
            if _name==name:
                url=d1.get_attribute('href')
                return url
    def select(self,name):
        d=self.d
        self._research()
        d2=d.find_element_by_id('notebook_list')
        for d1 in d2.find_elements_by_class_name('col-md-12')[:]:
            _name=d1.find_element_by_class_name('item_name').get_attribute('innerHTML')
            if _name==name:
                d1.find_element_by_class_name('item_icon,file_icon,icon-fixed-width').click()
                print(name,'selected')
                break
    def click_delete(self):
        d=self.d
        self._research()
        for i in range(10):
            try:
                d.find_element_by_class_name('fa-trash').click()
                break
            except:self.wait(1)
        for i in range(10):
            try:
                d.find_element_by_class_name('modal-content').find_element_by_class_name('btn-danger').click()
            except:self.wait(1)
        self._clear()
        self.wait(1)
    def _download_delete(self,s_dir,file_dir):
        d=self.d
        self.cd(s_dir)
        self.wait(10)
        l=self.ls
        while len(l)>0:
            for name in l[:20]:
                url=self.get_url(name).replace('/edit/','/files/')
                name0=url.split('.')[-1]
                if not name0 in ['ipynb','py'] and url.find('.')!=0 and url.find(s_dir)!=-1:
                    if self.download(url,file_dir):
                        self.select(name)
                        
            self.click_delete()
            l=self.ls
        return 1
    def download_delete(self,s_dir,file_dir):
        while 1:
            try:
                if self._download_delete(s_dir,file_dir):
                    return 1
            except ElementClickInterceptedException:pass
            except StaleElementReferenceException:pass
            except ElementNotInteractableException:pass
            except NoSuchElementException:pass
    def _stop_run(self):
        self._clear()
        #停止并运行
        d=self.d
        self._research()
        d.find_element_by_class_name('fa-stop,fa').click()
        self.wait(5)
        d.find_element_by_link_text('单元格').click()
        self.wait()
        d.find_element_by_link_text('运行所有').click()
        self.wait()
    def _click_run(self):
        self._clear()
        d=self.d
        self._research()
        self.wait()
        d.find_element_by_link_text('单元格').click()
        self.wait()
        d.find_element_by_link_text('运行所有').click()
        self.wait()
    def run(self,name):
        d=self.d
        self._research()
        self.click(name)
        self.wait()
        d.switch_to.window(d.window_handles[-1])
        self._research()
        self._stop_run()
        for i in range(60):
            self.wait()
            d1=d.find_elements_by_class_name('output_subarea,output_text,output_result')
            if len(d1)<1:
                continue
            d1=d1[0]
            html=d1.get_attribute('innerHTML')
            if  html.find('!!success!!')!=-1:
                d.close()
                self.wait()
                d.switch_to.window(d.window_handles[0])
                self.wait()
                return 1
            errors=d.find_elements_by_class_name('output_subarea,output_text,output_error')
            for error in errors:
                if error.get_attribute('innerHTML').find('Traceback')!=-1:
                    error=error.get_attribute('innerHTML')
                    raise(ValueError(error))
    def keep_running(self,name):
        d=self.d
        self._research()
        self.click(name)
        self.wait()
        d.switch_to.window(d.window_handles[-1])
        self._research()
        while 1:
            try:
                self._click_run()
            except:
                print(traceback.format_exc())
            print('click')
            self.wait(10)
if __name__ == "__main__":
    #配置虚拟浏览器
    from selenium import webdriver
    from selenium.common.exceptions import * 
    chrome_options = webdriver.ChromeOptions()
    chrome_options.add_argument("--disable-popup-blocking")
    # chrome_options.add_argument('--headless')
    # chrome_options.add_argument('--no-sandbox')
    d = webdriver.Chrome(options=chrome_options)
    jq=joinquant(d)
    # 登录
    jq.login('username','password')
    # 进入指定路径
    jq.cd('price_daily')
    # 列出路径下文件
    print(jq.ls)
    # 保持指定notebook运行
    self.keep_running('XXX.ipynb')
    # 下载并在网站上删除指定路径下的所有文件XXX
    jq.download_delete('XXX','../XXX/XXX/')

 

相关标签: 量化交易 python